From: Serge Petrenko <sergepetrenko@tarantool.org> To: tarantool-patches@freelists.org Cc: georgy@tarantool.org, Serge Petrenko <sergepetrenko@tarantool.org> Subject: [tarantool-patches] [PATCH v2] replication: implement replication_shutdown() Date: Mon, 6 Aug 2018 10:09:17 +0300 [thread overview] Message-ID: <20180806070917.20621-1-sergepetrenko@tarantool.org> (raw) Relay threads keep using tx upon shutdown, which leads to occasional segmentation faults and assertion fails (e.g. in replication test suite). Fix this by implementing replication_shutdown and relay_halt functions. replication_shutdown calls relay_halt to stop every relay thread that is using tx. Closes #3485 --- https://github.com/tarantool/tarantool/issues/3485 https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown Changes in v2: - instead of setting tx_in_use flag in relay and checking it in tx, send a message from relay to tx to set the flag. src/box/box.cc | 2 +- src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/box/relay.h | 10 ++++++ src/box/replication.cc | 30 ++++++++++++++++++ src/box/replication.h | 6 ++++ 5 files changed, 129 insertions(+), 4 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index ae4959d6f..f212c0fa8 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1597,9 +1597,9 @@ box_free(void) * initialized */ if (is_box_configured) { + replication_shutdown(); #if 0 session_free(); - replication_free(); user_cache_free(); schema_free(); module_free(); diff --git a/src/box/relay.cc b/src/box/relay.cc index 4cacbc840..60cb11932 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -63,6 +63,9 @@ struct relay_status_msg { struct relay *relay; /** Replica vclock. */ struct vclock vclock; + /** A flag to notify tx on creation / before removal + * of tx_pipe/relay_pipe. */ + bool tx_in_use; }; /** @@ -82,10 +85,22 @@ struct relay_gc_msg { struct vclock vclock; }; +/** + * Cbus message sent by tx thread to stop relay on shutdown. + */ +struct relay_halt_msg { + /** Parent. */ + struct cmsg msg; + /** Relay instance. */ + struct relay *relay; +}; + /** State of a replication relay. */ struct relay { /** The thread in which we relay data to the replica. */ struct cord cord; + /** The main fiber in cord to be canceled upon relay halt. */ + struct fiber *main_fiber; /** Replica connection */ struct ev_io io; /** Request sync */ @@ -120,6 +135,11 @@ struct relay { struct cpipe tx_pipe; /** A pipe from 'tx' thread to 'relay' */ struct cpipe relay_pipe; + /** + * A flag indicating that we executed relay_subscribe_f and + * have tx_pipe and relay_pipe ready. + */ + bool tx_in_use; /** Status message */ struct relay_status_msg status_msg; /** @@ -152,6 +172,12 @@ relay_get_state(const struct relay *relay) return relay->state; } +bool +relay_uses_tx(const struct relay *relay) +{ + return relay->tx_in_use; +} + const struct vclock * relay_vclock(const struct relay *relay) { @@ -198,6 +224,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync, relay->state = RELAY_FOLLOW; } +static void +relay_main_fiber_halt(struct cmsg *msg) +{ + struct relay_halt_msg *m = (struct relay_halt_msg *)msg; + struct relay *relay = m->relay; + + assert(relay->main_fiber != NULL); + fiber_cancel(relay->main_fiber); + relay->main_fiber = NULL; + + free(m); +} + +void +relay_halt(struct relay *relay) +{ + assert(relay->state == RELAY_FOLLOW); + + static const struct cmsg_hop route[] ={ + {relay_main_fiber_halt, NULL} + }; + struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m)); + if (m == NULL) { + /* + * Out of memory during shutdown. Do nothing. + */ + say_warn("failed to allocate relay halt message"); + return; + } + cmsg_init(&m->msg, route); + m->relay = relay; + cpipe_push(&relay->relay_pipe, &m->msg); +} + static void relay_stop(struct relay *relay) { @@ -311,6 +371,7 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + status->relay->tx_in_use = status->tx_in_use; static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; @@ -468,6 +529,7 @@ relay_subscribe_f(va_list ap) fiber_schedule_cb, fiber()); cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); + relay->main_fiber = fiber(); /* Setup garbage collection trigger. */ struct trigger on_close_log = { RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL @@ -478,6 +540,16 @@ relay_subscribe_f(va_list ap) relay_set_cord_name(relay->io.fd); + static const struct cmsg_hop route[] = { + {tx_status_update, NULL} + }; + /* Notify tx that relay thread is started. */ + cmsg_init(&relay->status_msg.msg, route); + vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock); + relay->status_msg.relay = relay; + relay->status_msg.tx_in_use = true; + cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); + char name[FIBER_NAME_MAX]; snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader"); struct fiber *reader = fiber_new_xc(name, relay_reader_f); @@ -525,12 +597,11 @@ relay_subscribe_f(va_list ap) if (vclock_sum(&relay->status_msg.vclock) == vclock_sum(send_vclock)) continue; - static const struct cmsg_hop route[] = { - {tx_status_update, NULL} - }; + cmsg_init(&relay->status_msg.msg, route); vclock_copy(&relay->status_msg.vclock, send_vclock); relay->status_msg.relay = relay; + relay->status_msg.tx_in_use = true; cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); /* Collect xlog files received by the replica. */ relay_schedule_pending_gc(relay, send_vclock); @@ -542,6 +613,14 @@ relay_subscribe_f(va_list ap) if (!fiber_is_dead(reader)) fiber_cancel(reader); fiber_join(reader); + + /* Notify tx that relay is stopping. */ + cmsg_init(&relay->status_msg.msg, route); + vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock); + relay->status_msg.relay = relay; + relay->status_msg.tx_in_use = false; + cpipe_push(&relay->tx_pipe, &relay->status_msg.msg); + cbus_unpair(&relay->tx_pipe, &relay->relay_pipe, NULL, NULL, cbus_process); cbus_endpoint_destroy(&relay->endpoint, cbus_process); diff --git a/src/box/relay.h b/src/box/relay.h index 2988e6b0d..deaba34d4 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -61,6 +61,10 @@ enum relay_state { struct relay * relay_new(struct replica *replica); +/** Stop a running relay.. Called on shutdown. */ +void +relay_halt(struct relay *relay); + /** Destroy and delete the relay */ void relay_delete(struct relay *relay); @@ -73,6 +77,12 @@ relay_get_diag(struct relay *relay); enum relay_state relay_get_state(const struct relay *relay); +/** + * Return whether relay_subscribe_f was already started + * and pipes between tx and relay were created. + */ +bool +relay_uses_tx(const struct relay *relay); /** * Returns relay's vclock * @param relay relay diff --git a/src/box/replication.cc b/src/box/replication.cc index 48956d2ed..9b4968777 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) fiber_cond_signal(&replicaset.applier.cond); } +void +replication_shutdown() +{ + struct replica *replica, *next; + + replica_hash_foreach_safe(&replicaset.hash, replica, next) { + if (replica->id == instance_id) + continue; + if (replica->applier != NULL) { + replica_clear_applier(replica); + /* + * We're exiting, so control won't be passed + * to appliers and we don't need to stop them. + */ + } + if (replica->id != REPLICA_ID_NIL) { + if (relay_get_state(replica->relay) == RELAY_FOLLOW && + relay_uses_tx(replica->relay)) { + replica->id = REPLICA_ID_NIL; + relay_halt(replica->relay); + } + } else { + replica_hash_remove(&replicaset.hash, replica); + replica_delete(replica); + } + } + + replication_free(); +} + /** * Update the replica set with new "applier" objects * upon reconfiguration of box.cfg.replication. diff --git a/src/box/replication.h b/src/box/replication.h index e8b391af2..08f9df258 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -333,6 +333,12 @@ replica_on_relay_stop(struct replica *replica); void replica_check_id(uint32_t replica_id); +/* + * Stop replication and delete all replicas and replicaset. + */ +void +replication_shutdown(); + /** * Register the universally unique identifier of a remote replica and * a matching replica-set-local identifier in the _cluster registry. -- 2.15.2 (Apple Git-101.1)
next reply other threads:[~2018-08-06 7:09 UTC|newest] Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-08-06 7:09 Serge Petrenko [this message] 2018-08-08 16:21 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20180806070917.20621-1-sergepetrenko@tarantool.org \ --to=sergepetrenko@tarantool.org \ --cc=georgy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH v2] replication: implement replication_shutdown()' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox