From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id BF94628316 for ; Tue, 31 Jul 2018 14:34:07 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id dJ4s_1FMnO_4 for ; Tue, 31 Jul 2018 14:34:07 -0400 (EDT) Received: from smtp53.i.mail.ru (smtp53.i.mail.ru [94.100.177.113]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 70B1B28304 for ; Tue, 31 Jul 2018 14:34:07 -0400 (EDT) From: Serge Petrenko Subject: [tarantool-patches] [PATCH] replication: implement replication_shutdown() Date: Tue, 31 Jul 2018 21:32:42 +0300 Message-Id: <20180731183242.29169-1-sergepetrenko@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, Serge Petrenko Relay threads keep using tx upon shutdown, which leads to occasional segmentation faults and assertion fails (e.g. in replication test suite). Fix this by implementing replication_shutdown and relay_halt functions. replication_shutdown calls relay_halt to stop every relay thread that is using tx. Closes #3485 --- FYI, I couldn't come up with a test case for this patch. My best effort was to run replication test suite multiple times and make sure that there were less crashes. https://github.com/tarantool/tarantool/issues/3485 https://github.com/tarantool/tarantool/compare/sergepetrenko/gh-3485-replication-shutdown src/box/box.cc | 2 +- src/box/relay.cc | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/box/relay.h | 10 +++++++++ src/box/replication.cc | 30 +++++++++++++++++++++++++ src/box/replication.h | 6 +++++ src/main.cc | 1 + 6 files changed, 108 insertions(+), 1 deletion(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..f212c0fa8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1597,9 +1597,9 @@ box_free(void) * initialized */ if (is_box_configured) { + replication_shutdown(); #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..0a81ac5db 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -82,10 +82,22 @@ struct relay_gc_msg { struct vclock vclock; }; +/** + * Cbus message sent by tx thread to stop relay on shutdown. + */ +struct relay_halt_msg { + /** Parent. */ + struct cmsg msg; + /** Relay instance. */ + struct relay *relay; +}; + /** State of a replication relay. */ struct relay { /** The thread in which we relay data to the replica. */ struct cord cord; + /** The main fiber in cord to be canceled upon relay halt. */ + struct fiber *main_fiber; /** Replica connection */ struct ev_io io; /** Request sync */ @@ -120,6 +132,11 @@ struct relay { struct cpipe tx_pipe; /** A pipe from 'tx' thread to 'relay' */ struct cpipe relay_pipe; + /** + * A flag indicating that we executed relay_subscribe_f and + * have tx_pipe and relay_pipe ready. + */ + bool tx_in_use; /** Status message */ struct relay_status_msg status_msg; /** @@ -152,6 +169,12 @@ relay_get_state(const struct relay *relay) return relay->state; } +bool +relay_uses_tx(const struct relay *relay) +{ + return relay->tx_in_use; +} + const struct vclock * relay_vclock(const struct relay *relay) { @@ -198,6 +221,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +static void +relay_main_fiber_halt(struct cmsg *msg) +{ + struct relay_halt_msg *m = (struct relay_halt_msg *)msg; + struct relay *relay = m->relay; + + assert(relay->main_fiber != NULL); + fiber_cancel(relay->main_fiber); + relay->main_fiber = NULL; + + free(m); +} + +void +relay_halt(struct relay *relay) +{ + assert(relay->state == RELAY_FOLLOW); + + static const struct cmsg_hop route[] ={ + {relay_main_fiber_halt, NULL} + }; + struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m)); + if (m == NULL) { + /* + * Out of memory during shutdown. Do nothing. + */ + say_warn("failed to allocate relay halt message"); + return; + } + cmsg_init(&m->msg, route); + m->relay = relay; + cpipe_push(&relay->relay_pipe, &m->msg); +} + static void relay_stop(struct relay *relay) { @@ -468,6 +525,8 @@ relay_subscribe_f(va_list ap) fiber_schedule_cb, fiber()); cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); + relay->main_fiber = fiber(); + relay->tx_in_use = true; /* Setup garbage collection trigger. */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL @@ -545,6 +604,7 @@ relay_subscribe_f(va_list ap) cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); + relay->tx_in_use = false; if (!diag_is_empty(&relay->diag)) { /* An error has occurred while reading ACKs of xlog. */ diag_move(&relay->diag, diag_get()); diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..deaba34d4 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Stop a running relay.. Called on shutdown. */ +void +relay_halt(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); @@ -73,6 +77,12 @@ relay_get_diag(struct relay *relay); enum relay_state relay_get_state(const struct relay *relay); +/** + * Return whether relay_subscribe_f was already started + * and pipes between tx and relay were created. + */ +bool +relay_uses_tx(const struct relay *relay); /** * Returns relay's vclock * @param relay relay diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..9b4968777 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) fiber_cond_signal(&replicaset.applier.cond); } +void +replication_shutdown() +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) + continue; + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW && + relay_uses_tx(replica->relay)) { + replica->id = REPLICA_ID_NIL; + relay_halt(replica->relay); + } + } else { + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); + } + } + + replication_free(); +} + /** * Update the replica set with new "applier" objects * upon reconfiguration of box.cfg.replication. diff --git a/src/box/replication.h b/src/box/replication.h index e8b391af2..08f9df258 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -333,6 +333,12 @@ replica_on_relay_stop(struct replica *replica); void replica_check_id(uint32_t replica_id); +/* + * Stop replication and delete all replicas and replicaset. + */ +void +replication_shutdown(); + /** * Register the universally unique identifier of a remote replica and * a matching replica-set-local identifier in the _cluster registry. diff --git a/src/main.cc b/src/main.cc index a36a2b0d0..9cb3243b6 100644 --- a/src/main.cc +++ b/src/main.cc @@ -76,6 +76,7 @@ #include "box/lua/init.h" /* box_lua_init() */ #include "box/session.h" #include "systemd.h" +#include "box/replication.h" /* replication_shutdown() */ static pid_t master_pid = getpid(); static struct pidfh *pid_file_handle; -- 2.15.2 (Apple Git-101.1)