From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Serge Petrenko Subject: [PATCH v3/1] replication: implement replication_shutdown(). Date: Tue, 14 Aug 2018 11:07:07 +0300 Message-Id: <20180814080707.19747-1-sergepetrenko@tarantool.org> To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, vdavydov.dev@gmail.com, Serge Petrenko List-ID: Relay threads keep using tx upon shutdown, which leads to occasional segmentation faults and assertion fails (e.g. in replication test suite). Fix this by finishing implementation of replication_free() and introducing relay_cancel(). replication_free calls relay_cancel to stop every relay thread that is using tx. Also add delays to replication/gc.test to fix tests on travis. Closes #3485 --- https://github.com/tarantool/tarantool/issues/3485 https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown This is the first variant of the patch, as discussed yesterday with Vladimir and Georgy. Here we send a message to relay via cpipe to make it stop. I made sure the message is flushed and delivered. Changes in v3: - eliminate tx_in_use flag. Instead just check that the pipe between relay and tx is created. - in relay just cancel fiber(), do not store main_fiber, since they're the same in that case. - ensure that message is correctly flushed with cpipe_set_max_input() and cpipe_flush_input(). - wait for the relay thread with cord_join() before proceeding with tx destruction. - make sure all the replicas are deleted, including the ones in replicaset.anon list. Changes in v2: - instead of setting tx_in_use flag in relay and checking it in tx, send a message from relay to tx to set the flag. src/box/box.cc | 2 +- src/box/relay.cc | 44 ++++++++++++++++++++++++++++++++++++++++ src/box/relay.h | 4 ++++ src/box/replication.cc | 48 +++++++++++++++++++++++++++++++++++++------- src/box/replication.h | 3 +++ src/cbus.c | 12 +++++------ src/cbus.h | 10 +++++++++ test/replication/gc.result | 12 +++++++++++ test/replication/gc.test.lua | 4 ++++ 9 files changed, 125 insertions(+), 14 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..c5e05bc15 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1597,9 +1597,9 @@ box_free(void) * initialized */ if (is_box_configured) { + replication_free(); #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..bc1c87236 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -198,6 +198,50 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +static void +relay_main_fiber_cancel(struct cmsg *msg) +{ + (void)msg; + + fiber_cancel(fiber()); + +} + +void +relay_cancel(struct relay *relay) +{ + assert(relay->state == RELAY_FOLLOW); + if (!cpipe_is_created(&relay->relay_pipe) || + relay->endpoint.n_pipes == 0) + return; + static const struct cmsg_hop route[] ={ + {relay_main_fiber_cancel, NULL} + }; + struct cmsg *msg = (struct cmsg *)malloc(sizeof(*msg)); + if (msg == NULL) { + /* + * Out of memory during shutdown. Do nothing. + */ + say_warn("failed to allocate relay cancel message"); + return; + } + cmsg_init(msg, route); + cpipe_set_max_input(&relay->relay_pipe, 1); + cpipe_push(&relay->relay_pipe, msg); + /* + * Relay sends an unpair message to tx upon exit, we have + * to process that message first. Ev loop is stopped at + * this point, so do this manually. + */ + struct cbus_endpoint *endpoint = cbus_find_endpoint("tx"); + while (true) { + cbus_process(endpoint); + if (relay->endpoint.n_pipes == 0) + break; + } + cord_join(&relay->cord); +} + static void relay_stop(struct relay *relay) { diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..53bf68eb8 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Cancel a running relay. Called on shutdown. */ +void +relay_cancel(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..acf20fc2c 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -91,13 +91,6 @@ replication_init(void) latch_create(&replicaset.applier.order_latch); } -void -replication_free(void) -{ - free(replicaset.replica_by_id); - fiber_cond_destroy(&replicaset.applier.cond); -} - void replica_check_id(uint32_t replica_id) { @@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica) trigger_clear(&replica->on_applier_state); } +void +replication_free(void) +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) { + replica_hash_remove(&replicaset.hash, replica); + /* + * Local replica doesn't have neither applier + * nor relay, so free it right away. + */ + TRASH(replica); + free(replica); + continue; + } + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW) { + replica->id = REPLICA_ID_NIL; + relay_cancel(replica->relay); + } + } else { + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); + } + } + rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) { + replica_clear_applier(replica); + replica_delete(replica); + } + free(replicaset.replica_by_id); + fiber_cond_destroy(&replicaset.applier.cond); +} + static void replica_on_applier_sync(struct replica *replica) { diff --git a/src/box/replication.h b/src/box/replication.h index e8b391af2..a8baafd12 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -154,6 +154,9 @@ replication_disconnect_timeout(void) void replication_init(void); +/** + * Stop replication and delete all replicas and replicaset. + */ void replication_free(void); diff --git a/src/cbus.c b/src/cbus.c index b3b1280e7..a7d331250 100644 --- a/src/cbus.c +++ b/src/cbus.c @@ -74,12 +74,12 @@ cbus_find_endpoint_locked(struct cbus *bus, const char *name) return NULL; } -static struct cbus_endpoint * -cbus_find_endpoint(struct cbus *bus, const char *name) +struct cbus_endpoint * +cbus_find_endpoint(const char *name) { - tt_pthread_mutex_lock(&bus->mutex); - struct cbus_endpoint *endpoint = cbus_find_endpoint_locked(bus, name); - tt_pthread_mutex_unlock(&bus->mutex); + tt_pthread_mutex_lock(&cbus.mutex); + struct cbus_endpoint *endpoint = cbus_find_endpoint_locked(&cbus, name); + tt_pthread_mutex_unlock(&cbus.mutex); return endpoint; } @@ -527,7 +527,7 @@ cbus_pair(const char *dest_name, const char *src_name, msg.src_pipe = src_pipe; fiber_cond_create(&msg.cond); - struct cbus_endpoint *endpoint = cbus_find_endpoint(&cbus, src_name); + struct cbus_endpoint *endpoint = cbus_find_endpoint(src_name); assert(endpoint != NULL); cpipe_create(dest_pipe, dest_name); diff --git a/src/cbus.h b/src/cbus.h index 4a12d9e79..ce3a35af3 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -158,6 +158,12 @@ cpipe_create(struct cpipe *pipe, const char *consumer); void cpipe_destroy(struct cpipe *pipe); +static inline bool +cpipe_is_created(struct cpipe *pipe) +{ + return pipe->endpoint != NULL; +} + /** * Set pipe max size of staged push area. The default is infinity. * If staged push cap is set, the pushed messages are flushed @@ -284,6 +290,10 @@ cbus_init(); void cbus_free(); +/** Find an endpoint by name in the global singleton bus. */ +struct cbus_endpoint * +cbus_find_endpoint(const char *name); + /** * Connect the cord to cbus as a named reciever. * @param name a destination name diff --git a/test/replication/gc.result b/test/replication/gc.result index 3f9db26ce..999378e9e 100644 --- a/test/replication/gc.result +++ b/test/replication/gc.result @@ -152,6 +152,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -311,6 +314,9 @@ box.snapshot() --- - ok ... +wait_gc(1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') test_run:cleanup_cluster() --- ... +fiber.sleep(0.1) +--- +... #box.info.gc().checkpoints == 1 or box.info.gc() --- - true @@ -425,6 +434,9 @@ box.snapshot() --- - ok ... +fiber.sleep(0.1) +--- +... #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') --- - true diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua index 96f11f8d4..6f0cb942b 100644 --- a/test/replication/gc.test.lua +++ b/test/replication/gc.test.lua @@ -78,6 +78,7 @@ box.snapshot() -- Invoke garbage collection. Check that it doesn't remove -- xlogs needed by the replica. box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 2 or fio.listdir('./master') @@ -144,6 +145,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +wait_gc(1) #box.info.gc().checkpoints == 1 or box.info.gc() xlog_count = #fio.glob('./master/*.xlog') -- the replica may have managed to download all data @@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master') -- The xlog should only be deleted after the replica -- is unregistered. test_run:cleanup_cluster() +fiber.sleep(0.1) #box.info.gc().checkpoints == 1 or box.info.gc() #fio.glob('./master/*.xlog') == 1 or fio.listdir('./master') -- @@ -195,6 +198,7 @@ _ = s:auto_increment{} box.snapshot() _ = s:auto_increment{} box.snapshot() +fiber.sleep(0.1) #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master') -- Delete the replica from the cluster table and check that -- 2.15.2 (Apple Git-101.1)