From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 8 Aug 2018 19:21:09 +0300 From: Vladimir Davydov Subject: Re: [tarantool-patches] [PATCH v2] replication: implement replication_shutdown() Message-ID: <20180808162109.l3wwxo36izmu4p5c@esperanza> References: <20180806070917.20621-1-sergepetrenko@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20180806070917.20621-1-sergepetrenko@tarantool.org> To: Serge Petrenko Cc: tarantool-patches@freelists.org, georgy@tarantool.org List-ID: On Mon, Aug 06, 2018 at 10:09:17AM +0300, Serge Petrenko wrote: > Relay threads keep using tx upon shutdown, which leads to occasional > segmentation faults and assertion fails (e.g. in replication test > suite). > > Fix this by implementing replication_shutdown and relay_halt functions. > replication_shutdown calls relay_halt to stop every relay thread that is > using tx. > > Closes #3485 > --- > https://github.com/tarantool/tarantool/issues/3485 > https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown > > Changes in v2: > - instead of setting tx_in_use flag > in relay and checking it in tx, send a > message from relay to tx to set the flag. > > src/box/box.cc | 2 +- > src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++++++++-- > src/box/relay.h | 10 ++++++ > src/box/replication.cc | 30 ++++++++++++++++++ > src/box/replication.h | 6 ++++ > 5 files changed, 129 insertions(+), 4 deletions(-) I see some discrepancy between the patch submitted for review and the code pushed to the branch: vlad@esperanza tarantool$ date Wed Aug 8 19:12:29 MSK 2018 vlad@esperanza tarantool$ git remote update origin Fetching origin vlad@esperanza tarantool$ git status On branch sergepetrenko/gh-3485-replication-shutdown Your branch is up-to-date with 'origin/sergepetrenko/gh-3485-replication-shutdown'. nothing to commit, working tree clean vlad@esperanza tarantool$ git show --oneline --stat 920dc83a replication: implement replication_shutdown() src/box/box.cc | 2 +- src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++-- src/box/relay.h | 10 ++++++ src/box/replication.cc | 30 ++++++++++++++++ src/box/replication.h | 6 ++++ test/replication/gc.result | 12 +++++++ test/replication/gc.test.lua | 4 +++ 7 files changed, 145 insertions(+), 4 deletions(-) Are you trying to conceal what you intend to do to the tests? > diff --git a/src/box/relay.cc b/src/box/relay.cc > index 4cacbc840..60cb11932 100644 > --- a/src/box/relay.cc > +++ b/src/box/relay.cc > @@ -63,6 +63,9 @@ struct relay_status_msg { > struct relay *relay; > /** Replica vclock. */ > struct vclock vclock; > + /** A flag to notify tx on creation / before removal > + * of tx_pipe/relay_pipe. */ Malformed comment style. > + bool tx_in_use; > }; > > /** > @@ -82,10 +85,22 @@ struct relay_gc_msg { > struct vclock vclock; > }; > > +/** > + * Cbus message sent by tx thread to stop relay on shutdown. > + */ > +struct relay_halt_msg { > + /** Parent. */ > + struct cmsg msg; > + /** Relay instance. */ > + struct relay *relay; > +}; > + > /** State of a replication relay. */ > struct relay { > /** The thread in which we relay data to the replica. */ > struct cord cord; > + /** The main fiber in cord to be canceled upon relay halt. */ > + struct fiber *main_fiber; > /** Replica connection */ > struct ev_io io; > /** Request sync */ > @@ -120,6 +135,11 @@ struct relay { > struct cpipe tx_pipe; > /** A pipe from 'tx' thread to 'relay' */ > struct cpipe relay_pipe; > + /** > + * A flag indicating that we executed relay_subscribe_f and > + * have tx_pipe and relay_pipe ready. > + */ > + bool tx_in_use; I rather dislike this flag. I think it should be a part of cbus subsystem. Say, bool cpipe_is_created(cpipe) { return cpipe->endpoint != NULL } or something like that. > /** Status message */ > struct relay_status_msg status_msg; > /** > @@ -152,6 +172,12 @@ relay_get_state(const struct relay *relay) > return relay->state; > } > > +bool > +relay_uses_tx(const struct relay *relay) > +{ > + return relay->tx_in_use; > +} > + No point in exporting this function. You can check that relay_pipe is available right in relay_halt. > const struct vclock * > relay_vclock(const struct relay *relay) > { > @@ -198,6 +224,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync, > relay->state = RELAY_FOLLOW; > } > > +static void > +relay_main_fiber_halt(struct cmsg *msg) > +{ > + struct relay_halt_msg *m = (struct relay_halt_msg *)msg; > + struct relay *relay = m->relay; > + > + assert(relay->main_fiber != NULL); > + fiber_cancel(relay->main_fiber); > + relay->main_fiber = NULL; Can't you simply use fiber() here? AFAIU cbus messages are processed by the "main" relay fiber anyway, see relay_subscribe_f. BTW this code isn't covered by any test: https://coveralls.io/builds/18341862/source?filename=src/box/relay.cc#L228 Please make sure it is. > + > + free(m); > +} > + > +void > +relay_halt(struct relay *relay) 'halt' is an exact synonym of 'stop', and we already have relay_stop(). I think this one should be called relay_cancel() or relay_abort() or something like that. Please come up with a better name to avoid confusion. > +{ > + assert(relay->state == RELAY_FOLLOW); > + > + static const struct cmsg_hop route[] ={ > + {relay_main_fiber_halt, NULL} > + }; > + struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m)); > + if (m == NULL) { > + /* > + * Out of memory during shutdown. Do nothing. > + */ > + say_warn("failed to allocate relay halt message"); > + return; > + } > + cmsg_init(&m->msg, route); > + m->relay = relay; > + cpipe_push(&relay->relay_pipe, &m->msg); > +} AFAIR cpipe_push() doesn't necessarily flushes the input. cpipe_flush_input(), may be? Also, I think you should wait for the relay thread to exit (see cord_join), otherwise you may proceed to tx destruction while it can still access tx data. > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 48956d2ed..9b4968777 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event) > fiber_cond_signal(&replicaset.applier.cond); > } > > +void > +replication_shutdown() So now we have replication_shutdown and replication_free, both public, and replication_free is only used in replication_shutdown. Not good. We typically call subsys constructor something_init() and destructor something_free(). That said, I guess this code should be a part of replication_free() and replication_shutdown() shouldn't exist. > +{ > + struct replica *replica, *next; > + > + replica_hash_foreach_safe(&replicaset.hash, replica, next) { > + if (replica->id == instance_id) > + continue; Why? Don't we want to delete all replicas, including ourselves? > + if (replica->applier != NULL) { > + replica_clear_applier(replica); > + /* > + * We're exiting, so control won't be passed > + * to appliers and we don't need to stop them. > + */ > + } > + if (replica->id != REPLICA_ID_NIL) { > + if (relay_get_state(replica->relay) == RELAY_FOLLOW && > + relay_uses_tx(replica->relay)) { > + replica->id = REPLICA_ID_NIL; > + relay_halt(replica->relay); > + } > + } else { > + replica_hash_remove(&replicaset.hash, replica); > + replica_delete(replica); Don't we want to delete all replicas here? Can we? What about replicas on the replicaset.anon list. Shouldn't we delete them, too? > + } > + } > + > + replication_free(); > +}