From: Serge Petrenko <sergepetrenko@tarantool.org> To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, vdavydov.dev@gmail.com, Serge Petrenko <sergepetrenko@tarantool.org> Subject: [PATCH v3/2] replication: implement replication_shutdown(). Date: Tue, 14 Aug 2018 11:18:54 +0300 [thread overview] Message-ID: <20180814081854.24391-1-sergepetrenko@tarantool.org> (raw) Relay threads keep using tx upon shutdown, which leads to occasional segmentation faults and assertion fails (e.g. in replication test suite). Fix this by finishing implementation of replication_free() and introducing relay_cancel(). replication_free calls relay_cancel to stop every relay thread that is using tx. Also add delays to replication/gc.test to fix tests on travis. Closes #3485 --- https://github.com/tarantool/tarantool/issues/3485 https://github.com/tarantool/tarantool/tree/sp/alt-gh-3485-replication-shutdown This is the second variant of the patch, as discussed yesterday with Vladimir and Georgy. Instead of sending a message via cpipe, we just send a signal to relay. Relay does pthread_exit in signal handler. Changes in v3: - eliminate tx_in_use flag. - do not send a message to relay, just send a signal with pthread_kill() and upon recieving the signal exit from relay. - wait for the relay thread with tt_pthread_join() before proceeding with tx destruction. - make sure all the replicas are deleted, including the ones in replicaset.anon list. Changes in v2: - instead of setting tx_in_use flag in relay and checking it in tx, send a message from relay to tx to set the flag. src/box/box.cc | 2 +- src/box/relay.cc | 28 ++++++++++++++++++++++++++ src/box/relay.h | 4 ++++ src/box/replication.cc | 48 +++++++++++++++++++++++++++++++++++++------- src/box/replication.h | 3 +++ test/replication/gc.result | 15 ++++++++++++++ test/replication/gc.test.lua | 6 +++++- 7 files changed, 97 insertions(+), 9 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..c5e05bc15 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1597,9 +1597,9 @@ box_free(void) * initialized */ if (is_box_configured) { + replication_free(); #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..6a9267200 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -198,6 +198,14 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +void +relay_cancel(struct relay *relay) +{ + assert(relay->state == RELAY_FOLLOW); + pthread_kill(relay->cord.id, SIGUSR1); + tt_pthread_join(relay->cord.id, NULL); +} + static void relay_stop(struct relay *relay) { @@ -452,6 +460,12 @@ relay_send_heartbeat(struct relay *relay) } } +void +relay_on_signal(int) +{ + pthread_exit(NULL); +} + /** * A libev callback invoked when a relay client socket is ready * for read. This currently only happens when the client closes @@ -460,6 +474,20 @@ relay_send_heartbeat(struct relay *relay) static int relay_subscribe_f(va_list ap) { + /* + * Set a signal handler to terminate the relay upon + * tarantool exit. + */ + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGUSR1); + pthread_sigmask(SIG_UNBLOCK, &set, NULL); + struct sigaction act; + act.sa_handler = &relay_on_signal; + sigfillset(&act.sa_mask); + act.sa_flags = SA_RESTART; + sigaction(SIGUSR1, &act, NULL); + struct relay *relay = va_arg(ap, struct relay *); struct recovery *r = relay->r; diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..53bf68eb8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Cancel a running relay. Called on shutdown. */ +void +relay_cancel(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..acf20fc2c 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -91,13 +91,6 @@ replication_init(void) latch_create(&replicaset.applier.order_latch); } -void -replication_free(void) -{ - free(replicaset.replica_by_id); - fiber_cond_destroy(&replicaset.applier.cond); -} - void replica_check_id(uint32_t replica_id) { @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica) trigger_clear(&replica->on_applier_state); } +void +replication_free(void) +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) { + replica_hash_remove(&replicaset.hash, replica); + /* + * Local replica doesn't have neither applier + * nor relay, so free it right away. + */ + TRASH(replica); + free(replica); + continue; + } + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { + replica->id = REPLICA_ID_NIL; + relay_cancel(replica->relay); + } + } else { + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); + } + } + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) { + replica_clear_applier(replica); + replica_delete(replica); + } + free(replicaset.replica_by_id); + fiber_cond_destroy(&replicaset.applier.cond); +} + static void replica_on_applier_sync(struct replica *replica) { diff --git a/src/box/replication.h b/src/box/replication.h index e8b391af2..a8baafd12 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -154,6 +154,9 @@ replication_disconnect_timeout(void) void replication_init(void); +/** + * Stop replication and delete all replicas and replicaset. + */ void replication_free(void); diff --git a/test/replication/gc.result b/test/replication/gc.result index 3f9db26ce..aa59747db 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -152,6 +152,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -311,6 +314,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') test_run:cleanup_cluster() --- ... +fiber.sleep(0.1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -395,6 +404,9 @@ replica_port ~= nil box.cfg{replication = replica_port} --- ... +fiber.sleep(0.1) +--- +... -- Stop the replica and write a few WALs. test_run:cmd("stop server replica") --- @@ -425,6 +437,9 @@ box.snapshot() --- - ok ... +fiber.sleep(0.1) +--- +... #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') --- - true diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 96f11f8d4..28e0996f5 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -78,6 +78,7 @@ box.snapshot() -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') @@ -144,6 +145,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() xlog_count = #fio.glob('./master/*.xlog') -- the replica may have managed to download all data @@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') -- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() +fiber.sleep(0.1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- @@ -185,7 +188,7 @@ test_run:cmd("start server replica") replica_port = test_run:eval('replica', 'return box.cfg.listen')[1] replica_port ~= nil box.cfg{replication = replica_port} - +fiber.sleep(0.1) -- Stop the replica and write a few WALs. test_run:cmd("stop server replica") test_run:cmd("cleanup server replica") @@ -195,6 +198,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +fiber.sleep(0.1) #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') -- Delete the replica from the cluster table and check that -- 2.15.2 (Apple Git-101.1)
reply other threads:[~2018-08-14 8:18 UTC|newest] Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20180814081854.24391-1-sergepetrenko@tarantool.org \ --to=sergepetrenko@tarantool.org \ --cc=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=vdavydov.dev@gmail.com \ --subject='Re: [PATCH v3/2] replication: implement replication_shutdown().' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox