* [PATCH v3/3] replication: handle replication shutdown correctly. @ 2018-08-15 11:03 Serge Petrenko 2018-08-15 13:49 ` Vladimir Davydov 0 siblings, 1 reply; 6+ messages in thread From: Serge Petrenko @ 2018-08-15 11:03 UTC (permalink / raw) To: tarantool-patches; +Cc: georgy, vdavydov.dev, Serge Petrenko Relay threads keep using tx upon shutdown, which leads to occasional segmentation faults and assertion fails (e.g. in replication test suite). Fix this by finishing implementation of replication_free() and introducing relay_cancel(). replication_free calls relay_cancel to stop every relay thread that is using tx. Also add delays to replication/gc.test to fix tests on travis. Closes #3485 --- https://github.com/tarantool/tarantool/issues/3485 https://github.com/tarantool/tarantool/tree/sp/alt-gh-3485-replication-shutdown This is the third variant of the patch, as discussed yesterday with Vladimir and Georgy. Instead of sending a message via cpipe or sending a signal with pthread_kill, we cancel a thread with pthread_cancel. Changes in v3: - eliminate tx_in_use flag. - do not send a message to relay, just cancel a thread with pthread_cancel() and wait for it to exit with pthread_join(). - wait for the relay thread with tt_pthread_join() before proceeding with tx destruction. - add macros tt_pthread_setcanceltype(), tt_pthread_setcancelstate(), tt_pthread_cancel() to tt_pthread.h - make sure all the replicas are deleted, including the ones in replicaset.anon list. Changes in v2: - instead of setting tx_in_use flag in relay and checking it in tx, send a message from relay to tx to set the flag. src/box/box.cc | 2 +- src/box/relay.cc | 11 ++++++++++ src/box/relay.h | 4 ++++ src/box/replication.cc | 48 +++++++++++++++++++++++++++++++++++++------- src/box/replication.h | 3 +++ src/tt_pthread.h | 15 ++++++++++++++ test/replication/gc.result | 15 ++++++++++++++ test/replication/gc.test.lua | 6 +++++- 8 files changed, 95 insertions(+), 9 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..c5e05bc15 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1597,9 +1597,9 @@ box_free(void) * initialized */ if (is_box_configured) { + replication_free(); #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..81dc88a06 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +void +relay_cancel(struct relay *relay) +{ + assert(relay->state == RELAY_FOLLOW); + tt_pthread_cancel(relay->cord.id); + tt_pthread_join(relay->cord.id, NULL); +} + static void relay_stop(struct relay *relay) { @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay) static int relay_subscribe_f(va_list ap) { + /* Allow to be cancelled. */ + tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); struct relay *relay = va_arg(ap, struct relay *); struct recovery *r = relay->r; diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..53bf68eb8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Cancel a running relay. Called on shutdown. */ +void +relay_cancel(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..acf20fc2c 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -91,13 +91,6 @@ replication_init(void) latch_create(&replicaset.applier.order_latch); } -void -replication_free(void) -{ - free(replicaset.replica_by_id); - fiber_cond_destroy(&replicaset.applier.cond); -} - void replica_check_id(uint32_t replica_id) { @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica) trigger_clear(&replica->on_applier_state); } +void +replication_free(void) +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) { + replica_hash_remove(&replicaset.hash, replica); + /* + * Local replica doesn't have neither applier + * nor relay, so free it right away. + */ + TRASH(replica); + free(replica); + continue; + } + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { + replica->id = REPLICA_ID_NIL; + relay_cancel(replica->relay); + } + } else { + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); + } + } + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) { + replica_clear_applier(replica); + replica_delete(replica); + } + free(replicaset.replica_by_id); + fiber_cond_destroy(&replicaset.applier.cond); +} + static void replica_on_applier_sync(struct replica *replica) { diff --git a/src/box/replication.h b/src/box/replication.h index e8b391af2..a8baafd12 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -154,6 +154,9 @@ replication_disconnect_timeout(void) void replication_init(void); +/** + * Stop replication and delete all replicas and replicaset. + */ void replication_free(void); diff --git a/src/tt_pthread.h b/src/tt_pthread.h index 60ade50ae..144425ccb 100644 --- a/src/tt_pthread.h +++ b/src/tt_pthread.h @@ -277,6 +277,21 @@ tt_pthread_error(e__); \ }) +#define tt_pthread_cancel(thread) \ +({ int e__ = pthread_cancel(thread); \ + tt_pthread_error(e__); \ + }) + +#define tt_pthread_setcancelstate(state, ret) \ +({ int e__ = pthread_setcancelstate(state, ret); \ + tt_pthread_error(e__); \ +}) + +#define tt_pthread_setcanceltype(type, ret) \ +({ int e__ = pthread_setcanceltype(type, ret); \ + tt_pthread_error(e__); \ +}) + #define tt_pthread_key_create(key, dtor) \ ({ int e__ = pthread_key_create(key, dtor); \ tt_pthread_error(e__); \ diff --git a/test/replication/gc.result b/test/replication/gc.result index 3f9db26ce..aa59747db 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -152,6 +152,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -311,6 +314,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') test_run:cleanup_cluster() --- ... +fiber.sleep(0.1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -395,6 +404,9 @@ replica_port ~= nil box.cfg{replication = replica_port} --- ... +fiber.sleep(0.1) +--- +... -- Stop the replica and write a few WALs. test_run:cmd("stop server replica") --- @@ -425,6 +437,9 @@ box.snapshot() --- - ok ... +fiber.sleep(0.1) +--- +... #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') --- - true diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 96f11f8d4..28e0996f5 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -78,6 +78,7 @@ box.snapshot() -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') @@ -144,6 +145,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() xlog_count = #fio.glob('./master/*.xlog') -- the replica may have managed to download all data @@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') -- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() +fiber.sleep(0.1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- @@ -185,7 +188,7 @@ test_run:cmd("start server replica") replica_port = test_run:eval('replica', 'return box.cfg.listen')[1] replica_port ~= nil box.cfg{replication = replica_port} - +fiber.sleep(0.1) -- Stop the replica and write a few WALs. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") @@ -195,6 +198,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +fiber.sleep(0.1) #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') -- Delete the replica from the cluster table and check that -- 2.15.2 (Apple Git-101.1) ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH v3/3] replication: handle replication shutdown correctly. 2018-08-15 11:03 [PATCH v3/3] replication: handle replication shutdown correctly Serge Petrenko @ 2018-08-15 13:49 ` Vladimir Davydov 2018-08-15 16:13 ` Serge Petrenko 0 siblings, 1 reply; 6+ messages in thread From: Vladimir Davydov @ 2018-08-15 13:49 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, georgy On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote: > diff --git a/src/box/box.cc b/src/box/box.cc > index ae4959d6f..c5e05bc15 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -1597,9 +1597,9 @@ box_free(void) > * initialized > */ > if (is_box_configured) { > + replication_free(); > #if 0 > session_free(); > - replication_free(); Nit: better move replication_free() after #endif, where all other destructor are called. > user_cache_free(); > schema_free(); > module_free(); > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 4cacbc840..81dc88a06 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync, > relay->state = RELAY_FOLLOW; > } > > +void > +relay_cancel(struct relay *relay) > +{ > + assert(relay->state == RELAY_FOLLOW); IMO it would be better to make this function work both if the relay thread is running and if it is not (do nothing in the latter case). Also relay->state is set to RELAY_FOLLOW on initial join, when relay thread isn't running. In this case, you must not call pthread cancel/join. > + tt_pthread_cancel(relay->cord.id); > + tt_pthread_join(relay->cord.id, NULL); > +} > + > static void > relay_stop(struct relay *relay) > { > @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay) > static int > relay_subscribe_f(va_list ap) > { > + /* Allow to be cancelled. */ > + tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); > + tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); These cancelstate/type are set by default for all new threads and so we don't need to set them explicitly, no? > struct relay *relay = va_arg(ap, struct relay *); > struct recovery *r = relay->r; > > diff --git a/src/box/relay.h b/src/box/relay.h > index 2988e6b0d..53bf68eb8 100644 > --- a/src/box/relay.h > +++ b/src/box/relay.h > @@ -61,6 +61,10 @@ enum relay_state { > struct relay * > relay_new(struct replica *replica); > > +/** Cancel a running relay. Called on shutdown. */ > +void > +relay_cancel(struct relay *relay); > + > /** Destroy and delete the relay */ > void > relay_delete(struct relay *relay); > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 48956d2ed..acf20fc2c 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -91,13 +91,6 @@ replication_init(void) > latch_create(&replicaset.applier.order_latch); > } > > -void > -replication_free(void) > -{ > - free(replicaset.replica_by_id); > - fiber_cond_destroy(&replicaset.applier.cond); > -} > - > void > replica_check_id(uint32_t replica_id) > { > @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica) > trigger_clear(&replica->on_applier_state); > } > > +void > +replication_free(void) > +{ After some consideration, I still don't think it's a good idea to add the code that frees replicas in this patch, because it's a completely separate change, which has nothing to do with the issue you're fixing. The more code you add the more difficult it will be to cherry-pick this patch, and it also increases the likelihood of introducing a bug. Please remove this code so that replication_free only cancels relay threads. We will probably need to free memory on shutdown (to silence valgrind and asan), but let's do it separately and when we get to it. > + struct replica *replica, *next; > + > + replica_hash_foreach_safe(&replicaset.hash, replica, next) { > + if (replica->id == instance_id) { > + replica_hash_remove(&replicaset.hash, replica); > + /* > + * Local replica doesn't have neither applier > + * nor relay, so free it right away. > + */ > + TRASH(replica); > + free(replica); > + continue; > + } > + if (replica->applier != NULL) { > + replica_clear_applier(replica); > + /* > + * We're exiting, so control won't be passed > + * to appliers and we don't need to stop them. > + */ > + } > + if (replica->id != REPLICA_ID_NIL) { > + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { > + replica->id = REPLICA_ID_NIL; > + relay_cancel(replica->relay); Please add a comment here why we need to stop relay threads on shutdown. > + } > + } else { > + replica_hash_remove(&replicaset.hash, replica); > + replica_delete(replica); > + } > + } > + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) { > + replica_clear_applier(replica); > + replica_delete(replica); > + } > + free(replicaset.replica_by_id); > + fiber_cond_destroy(&replicaset.applier.cond); > +} > + > static void > replica_on_applier_sync(struct replica *replica) > { > diff --git a/src/box/replication.h b/src/box/replication.h > index e8b391af2..a8baafd12 100644 > --- a/src/box/replication.h > +++ b/src/box/replication.h > @@ -154,6 +154,9 @@ replication_disconnect_timeout(void) > void > replication_init(void); > > +/** > + * Stop replication and delete all replicas and replicaset. > + */ This comment is not really needed - it's clear from the name of this function that it is a destructor. Besides, replication_init() doesn't have a comment so let's leave this declaration as is. > void > replication_free(void); > > diff --git a/src/tt_pthread.h b/src/tt_pthread.h > index 60ade50ae..144425ccb 100644 > --- a/src/tt_pthread.h > +++ b/src/tt_pthread.h > @@ -277,6 +277,21 @@ > tt_pthread_error(e__); \ > }) > > +#define tt_pthread_cancel(thread) \ > +({ int e__ = pthread_cancel(thread); \ > + tt_pthread_error(e__); \ > + }) Extra space. > + +#define tt_pthread_setcancelstate(state, ret) \ > +({ int e__ = pthread_setcancelstate(state, ret); \ > + tt_pthread_error(e__); \ > +}) > + > +#define tt_pthread_setcanceltype(type, ret) \ > +({ int e__ = pthread_setcanceltype(type, ret); \ > + tt_pthread_error(e__); \ > +}) > + > #define tt_pthread_key_create(key, dtor) \ > ({ int e__ = pthread_key_create(key, dtor); \ > tt_pthread_error(e__); \ > diff --git a/test/replication/gc.result b/test/replication/gc.result > index 3f9db26ce..aa59747db 100644 > --- a/test/replication/gc.result > +++ b/test/replication/gc.result > @@ -152,6 +152,9 @@ box.snapshot() > --- > - ok > ... > +wait_gc(1) > +--- > +... I don't understand how this is relevant to the issue you're fixing. If the test is buggy, you should fix it in a separate patch. > #box.info.gc().checkpoints == 1 or box.info.gc() > --- > - true > @@ -311,6 +314,9 @@ box.snapshot() > --- > - ok > ... > +wait_gc(1) > +--- > +... > #box.info.gc().checkpoints == 1 or box.info.gc() > --- > - true > @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') > test_run:cleanup_cluster() > --- > ... > +fiber.sleep(0.1) > +--- > +... Please don't use sleeps. Use wait_gc() instead. ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH v3/3] replication: handle replication shutdown correctly. 2018-08-15 13:49 ` Vladimir Davydov @ 2018-08-15 16:13 ` Serge Petrenko 2018-08-15 18:47 ` Vladimir Davydov 0 siblings, 1 reply; 6+ messages in thread From: Serge Petrenko @ 2018-08-15 16:13 UTC (permalink / raw) To: Vladimir Davydov; +Cc: tarantool-patches, Georgy Kirichenko Hi! Fixed your comments. The new diff is below. The branch remains: > 15 авг. 2018 г., в 16:49, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а): > > On Wed, Aug 15, 2018 at 02:03:03PM +0300, Serge Petrenko wrote: >> diff --git a/src/box/box.cc b/src/box/box.cc >> index ae4959d6f..c5e05bc15 100644 >> --- a/src/box/box.cc >> +++ b/src/box/box.cc >> @@ -1597,9 +1597,9 @@ box_free(void) >> * initialized >> */ >> if (is_box_configured) { >> + replication_free(); >> #if 0 >> session_free(); >> - replication_free(); > > Nit: better move replication_free() after #endif, where all other > destructor are called. Fixed. > >> user_cache_free(); >> schema_free(); >> module_free(); >> diff --git a/src/box/relay.cc b/src/box/relay.cc >> index 4cacbc840..81dc88a06 100644 >> --- a/src/box/relay.cc >> +++ b/src/box/relay.cc >> @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync, >> relay->state = RELAY_FOLLOW; >> } >> >> +void >> +relay_cancel(struct relay *relay) >> +{ >> + assert(relay->state == RELAY_FOLLOW); > > IMO it would be better to make this function work both if the relay > thread is running and if it is not (do nothing in the latter case). > > Also relay->state is set to RELAY_FOLLOW on initial join, when > relay thread isn't running. In this case, you must not call pthread > cancel/join. Fixed both. > >> + tt_pthread_cancel(relay->cord.id); >> + tt_pthread_join(relay->cord.id, NULL); >> +} >> + >> static void >> relay_stop(struct relay *relay) >> { >> @@ -460,6 +468,9 @@ relay_send_heartbeat(struct relay *relay) >> static int >> relay_subscribe_f(va_list ap) >> { >> + /* Allow to be cancelled. */ >> + tt_pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); >> + tt_pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); > > These cancelstate/type are set by default for all new threads and so we > don't need to set them explicitly, no? Yes, you’re right. Removed. > >> struct relay *relay = va_arg(ap, struct relay *); >> struct recovery *r = relay->r; >> >> diff --git a/src/box/relay.h b/src/box/relay.h >> index 2988e6b0d..53bf68eb8 100644 >> --- a/src/box/relay.h >> +++ b/src/box/relay.h >> @@ -61,6 +61,10 @@ enum relay_state { >> struct relay * >> relay_new(struct replica *replica); >> >> +/** Cancel a running relay. Called on shutdown. */ >> +void >> +relay_cancel(struct relay *relay); >> + >> /** Destroy and delete the relay */ >> void >> relay_delete(struct relay *relay); >> diff --git a/src/box/replication.cc b/src/box/replication.cc >> index 48956d2ed..acf20fc2c 100644 >> --- a/src/box/replication.cc >> +++ b/src/box/replication.cc >> @@ -91,13 +91,6 @@ replication_init(void) >> latch_create(&replicaset.applier.order_latch); >> } >> >> -void >> -replication_free(void) >> -{ >> - free(replicaset.replica_by_id); >> - fiber_cond_destroy(&replicaset.applier.cond); >> -} >> - >> void >> replica_check_id(uint32_t replica_id) >> { >> @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica) >> trigger_clear(&replica->on_applier_state); >> } >> >> +void >> +replication_free(void) >> +{ > > After some consideration, I still don't think it's a good idea to add > the code that frees replicas in this patch, because it's a completely > separate change, which has nothing to do with the issue you're fixing. > The more code you add the more difficult it will be to cherry-pick this > patch, and it also increases the likelihood of introducing a bug. > Please remove this code so that replication_free only cancels relay > threads. > > We will probably need to free memory on shutdown (to silence valgrind > and asan), but let's do it separately and when we get to it. Ok. Let’s do it in a separate patch. > >> + struct replica *replica, *next; >> + >> + replica_hash_foreach_safe(&replicaset.hash, replica, next) { >> + if (replica->id == instance_id) { >> + replica_hash_remove(&replicaset.hash, replica); >> + /* >> + * Local replica doesn't have neither applier >> + * nor relay, so free it right away. >> + */ >> + TRASH(replica); >> + free(replica); >> + continue; >> + } >> + if (replica->applier != NULL) { >> + replica_clear_applier(replica); >> + /* >> + * We're exiting, so control won't be passed >> + * to appliers and we don't need to stop them. >> + */ >> + } >> + if (replica->id != REPLICA_ID_NIL) { >> + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { >> + replica->id = REPLICA_ID_NIL; >> + relay_cancel(replica->relay); > > Please add a comment here why we need to stop relay threads on shutdown. Done. > >> + } >> + } else { >> + replica_hash_remove(&replicaset.hash, replica); >> + replica_delete(replica); >> + } >> + } >> + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) { >> + replica_clear_applier(replica); >> + replica_delete(replica); >> + } >> + free(replicaset.replica_by_id); >> + fiber_cond_destroy(&replicaset.applier.cond); >> +} >> + >> static void >> replica_on_applier_sync(struct replica *replica) >> { >> diff --git a/src/box/replication.h b/src/box/replication.h >> index e8b391af2..a8baafd12 100644 >> --- a/src/box/replication.h >> +++ b/src/box/replication.h >> @@ -154,6 +154,9 @@ replication_disconnect_timeout(void) >> void >> replication_init(void); >> >> +/** >> + * Stop replication and delete all replicas and replicaset. >> + */ > > This comment is not really needed - it's clear from the name of this > function that it is a destructor. Besides, replication_init() doesn't > have a comment so let's leave this declaration as is. Ok, removed. > >> void >> replication_free(void); >> >> diff --git a/src/tt_pthread.h b/src/tt_pthread.h >> index 60ade50ae..144425ccb 100644 >> --- a/src/tt_pthread.h >> +++ b/src/tt_pthread.h >> @@ -277,6 +277,21 @@ >> tt_pthread_error(e__); \ >> }) >> >> +#define tt_pthread_cancel(thread) \ >> +({ int e__ = pthread_cancel(thread); \ >> + tt_pthread_error(e__); \ >> + }) > > Extra space. Fixed. > >> + > +#define tt_pthread_setcancelstate(state, ret) \ >> +({ int e__ = pthread_setcancelstate(state, ret); \ >> + tt_pthread_error(e__); \ >> +}) >> + >> +#define tt_pthread_setcanceltype(type, ret) \ >> +({ int e__ = pthread_setcanceltype(type, ret); \ >> + tt_pthread_error(e__); \ >> +}) >> + >> #define tt_pthread_key_create(key, dtor) \ >> ({ int e__ = pthread_key_create(key, dtor); \ >> tt_pthread_error(e__); \ >> diff --git a/test/replication/gc.result b/test/replication/gc.result >> index 3f9db26ce..aa59747db 100644 >> --- a/test/replication/gc.result >> +++ b/test/replication/gc.result >> @@ -152,6 +152,9 @@ box.snapshot() >> --- >> - ok >> ... >> +wait_gc(1) >> +--- >> +... > > I don't understand how this is relevant to the issue you're fixing. > If the test is buggy, you should fix it in a separate patch. Removed, this is not relevant to the issue. > >> #box.info.gc().checkpoints == 1 or box.info.gc() >> --- >> - true >> @@ -311,6 +314,9 @@ box.snapshot() >> --- >> - ok >> ... >> +wait_gc(1) >> +--- >> +... >> #box.info.gc().checkpoints == 1 or box.info.gc() >> --- >> - true >> @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') >> test_run:cleanup_cluster() >> --- >> ... >> +fiber.sleep(0.1) >> +--- >> +... > > Please don't use sleeps. Use wait_gc() instead. Here’s the new diff: --- src/box/box.cc | 2 +- src/box/relay.cc | 15 +++++++++++++++ src/box/relay.h | 4 ++++ src/box/replication.cc | 43 ++++++++++++++++++++++++++++++++++++------- src/tt_pthread.h | 5 +++++ 5 files changed, 61 insertions(+), 8 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..28bfdd5fb 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1599,13 +1599,13 @@ box_free(void) if (is_box_configured) { #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); tuple_free(); port_free(); #endif + replication_free(); sequence_free(); gc_free(); engine_shutdown(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..a89a7732d 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -198,6 +198,16 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +void +relay_cancel(struct relay *relay) +{ + /* Check that the thread is running first. */ + if (relay->cord.id != 0) { + tt_pthread_cancel(relay->cord.id); + tt_pthread_join(relay->cord.id, NULL); + } +} + static void relay_stop(struct relay *relay) { @@ -211,6 +221,11 @@ relay_stop(struct relay *relay) recovery_delete(relay->r); relay->r = NULL; relay->state = RELAY_STOPPED; + /* + * Needed to track whether relay thread is running or not + * for relay_cancel(). Id is reset upon cord_create(). + */ + relay->cord.id = 0; } void diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..53bf68eb8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Cancel a running relay. Called on shutdown. */ +void +relay_cancel(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..083ae6407 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -91,13 +91,6 @@ replication_init(void) latch_create(&replicaset.applier.order_latch); } -void -replication_free(void) -{ - free(replicaset.replica_by_id); - fiber_cond_destroy(&replicaset.applier.cond); -} - void replica_check_id(uint32_t replica_id) { @@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica) trigger_clear(&replica->on_applier_state); } +void +replication_free(void) +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) { + replica_hash_remove(&replicaset.hash, replica); + /* + * Local replica doesn't have neither applier + * nor relay, so ignore it. + */ + continue; + } + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + /* + * Relay threads keep sending messages + * to tx via cbus upon shutdown, which + * could lead to segfaults. So cancel + * them. + */ + relay_cancel(replica->relay); + } + } + + free(replicaset.replica_by_id); + fiber_cond_destroy(&replicaset.applier.cond); +} + static void replica_on_applier_sync(struct replica *replica) { diff --git a/src/tt_pthread.h b/src/tt_pthread.h index 60ade50ae..d83694460 100644 --- a/src/tt_pthread.h +++ b/src/tt_pthread.h @@ -277,6 +277,11 @@ tt_pthread_error(e__); \ }) +#define tt_pthread_cancel(thread) \ +({ int e__ = pthread_cancel(thread); \ + tt_pthread_error(e__); \ +}) + #define tt_pthread_key_create(key, dtor) \ ({ int e__ = pthread_key_create(key, dtor); \ tt_pthread_error(e__); \ -- 2.15.2 (Apple Git-101.1) ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [PATCH v3/3] replication: handle replication shutdown correctly. 2018-08-15 16:13 ` Serge Petrenko @ 2018-08-15 18:47 ` Vladimir Davydov 2018-08-16 6:05 ` [tarantool-patches] " Serge Petrenko 0 siblings, 1 reply; 6+ messages in thread From: Vladimir Davydov @ 2018-08-15 18:47 UTC (permalink / raw) To: Serge Petrenko; +Cc: tarantool-patches, Georgy Kirichenko On Wed, Aug 15, 2018 at 07:13:28PM +0300, Serge Petrenko wrote: > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 48956d2ed..083ae6407 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -91,13 +91,6 @@ replication_init(void) > latch_create(&replicaset.applier.order_latch); > } > > -void > -replication_free(void) > -{ > - free(replicaset.replica_by_id); > - fiber_cond_destroy(&replicaset.applier.cond); > -} > - > void > replica_check_id(uint32_t replica_id) > { > @@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica) > trigger_clear(&replica->on_applier_state); > } > > +void > +replication_free(void) > +{ > + struct replica *replica, *next; > + > + replica_hash_foreach_safe(&replicaset.hash, replica, next) { > + if (replica->id == instance_id) { > + replica_hash_remove(&replicaset.hash, replica); > + /* > + * Local replica doesn't have neither applier > + * nor relay, so ignore it. > + */ > + continue; > + } > + if (replica->applier != NULL) { > + replica_clear_applier(replica); > + /* > + * We're exiting, so control won't be passed > + * to appliers and we don't need to stop them. > + */ > + } You don't need this code either. I want this loop to be as simple as /* * <explain why> */ replicaset_foreach(replica) relay_cancel(replica->relay); Then you wouldn't even need to move the definition of replication_free. > + if (replica->id != REPLICA_ID_NIL) { > + /* > + * Relay threads keep sending messages > + * to tx via cbus upon shutdown, which > + * could lead to segfaults. So cancel > + * them. > + */ > + relay_cancel(replica->relay); > + } > + } > + > + free(replicaset.replica_by_id); > + fiber_cond_destroy(&replicaset.applier.cond); > +} ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3/3] replication: handle replication shutdown correctly. 2018-08-15 18:47 ` Vladimir Davydov @ 2018-08-16 6:05 ` Serge Petrenko 2018-08-16 8:55 ` Vladimir Davydov 0 siblings, 1 reply; 6+ messages in thread From: Serge Petrenko @ 2018-08-16 6:05 UTC (permalink / raw) To: Vladimir Davydov; +Cc: Georgy Kirichenko, tarantool-patches > 15 авг. 2018 г., в 21:47, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а): > > On Wed, Aug 15, 2018 at 07:13:28PM +0300, Serge Petrenko wrote: >> diff --git a/src/box/replication.cc b/src/box/replication.cc >> index 48956d2ed..083ae6407 100644 >> --- a/src/box/replication.cc >> +++ b/src/box/replication.cc >> @@ -91,13 +91,6 @@ replication_init(void) >> latch_create(&replicaset.applier.order_latch); >> } >> >> -void >> -replication_free(void) >> -{ >> - free(replicaset.replica_by_id); >> - fiber_cond_destroy(&replicaset.applier.cond); >> -} >> - >> void >> replica_check_id(uint32_t replica_id) >> { >> @@ -242,6 +235,42 @@ replica_clear_applier(struct replica *replica) >> trigger_clear(&replica->on_applier_state); >> } >> >> +void >> +replication_free(void) >> +{ >> + struct replica *replica, *next; >> + >> + replica_hash_foreach_safe(&replicaset.hash, replica, next) { >> + if (replica->id == instance_id) { >> + replica_hash_remove(&replicaset.hash, replica); >> + /* >> + * Local replica doesn't have neither applier >> + * nor relay, so ignore it. >> + */ >> + continue; >> + } >> + if (replica->applier != NULL) { >> + replica_clear_applier(replica); >> + /* >> + * We're exiting, so control won't be passed >> + * to appliers and we don't need to stop them. >> + */ >> + } > > You don't need this code either. I want this loop to be as simple as > > /* > * <explain why> > */ > replicaset_foreach(replica) > relay_cancel(replica->relay); > > Then you wouldn't even need to move the definition of replication_free. Done as requested. The new diff is below. The branch remains https://github.com/tarantool/tarantool/tree/sp/alt2-gh-3485-replication-shutdown > >> + if (replica->id != REPLICA_ID_NIL) { >> + /* >> + * Relay threads keep sending messages >> + * to tx via cbus upon shutdown, which >> + * could lead to segfaults. So cancel >> + * them. >> + */ >> + relay_cancel(replica->relay); >> + } >> + } >> + >> + free(replicaset.replica_by_id); >> + fiber_cond_destroy(&replicaset.applier.cond); >> +} > --- src/box/box.cc | 2 +- src/box/relay.cc | 16 ++++++++++++++++ src/box/relay.h | 4 ++++ src/box/replication.cc | 10 ++++++++++ src/tt_pthread.h | 5 +++++ 5 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..28bfdd5fb 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1599,13 +1599,13 @@ box_free(void) if (is_box_configured) { #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); tuple_free(); port_free(); #endif + replication_free(); sequence_free(); gc_free(); engine_shutdown(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..edd1c80b0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -198,6 +198,16 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +void +relay_cancel(struct relay *relay) +{ + /* Check that the thread is running first. */ + if (relay->cord.id != 0) { + tt_pthread_cancel(relay->cord.id); + tt_pthread_join(relay->cord.id, NULL); + } +} + static void relay_stop(struct relay *relay) { @@ -211,6 +221,12 @@ relay_stop(struct relay *relay) recovery_delete(relay->r); relay->r = NULL; relay->state = RELAY_STOPPED; + /* + * Needed to track whether relay thread is running or not + * for relay_cancel(). Id is reset to a positive value + * upon cord_create(). + */ + relay->cord.id = 0; } void diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..53bf68eb8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Cancel a running relay. Called on shutdown. */ +void +relay_cancel(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..4b0700f90 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -94,6 +94,16 @@ replication_init(void) void replication_free(void) { + /* + * Relay threads keep sending messages + * to tx via cbus upon shutdown, which + * could lead to segfaults. So cancel + * them. + */ + replicaset_foreach(replica) { + relay_cancel(replica->relay); + } + free(replicaset.replica_by_id); fiber_cond_destroy(&replicaset.applier.cond); } diff --git a/src/tt_pthread.h b/src/tt_pthread.h index 60ade50ae..d83694460 100644 --- a/src/tt_pthread.h +++ b/src/tt_pthread.h @@ -277,6 +277,11 @@ tt_pthread_error(e__); \ }) +#define tt_pthread_cancel(thread) \ +({ int e__ = pthread_cancel(thread); \ + tt_pthread_error(e__); \ +}) + #define tt_pthread_key_create(key, dtor) \ ({ int e__ = pthread_key_create(key, dtor); \ tt_pthread_error(e__); \ -- 2.15.2 (Apple Git-101.1) ^ permalink raw reply [flat|nested] 6+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v3/3] replication: handle replication shutdown correctly. 2018-08-16 6:05 ` [tarantool-patches] " Serge Petrenko @ 2018-08-16 8:55 ` Vladimir Davydov 0 siblings, 0 replies; 6+ messages in thread From: Vladimir Davydov @ 2018-08-16 8:55 UTC (permalink / raw) To: Serge Petrenko; +Cc: Georgy Kirichenko, tarantool-patches Pushed to 1.10 ^ permalink raw reply [flat|nested] 6+ messages in thread
end of thread, other threads:[~2018-08-16 8:55 UTC | newest] Thread overview: 6+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2018-08-15 11:03 [PATCH v3/3] replication: handle replication shutdown correctly Serge Petrenko 2018-08-15 13:49 ` Vladimir Davydov 2018-08-15 16:13 ` Serge Petrenko 2018-08-15 18:47 ` Vladimir Davydov 2018-08-16 6:05 ` [tarantool-patches] " Serge Petrenko 2018-08-16 8:55 ` Vladimir Davydov
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox