* [tarantool-patches] [PATCH v2] replication: implement replication_shutdown()
@ 2018-08-06 7:09 Serge Petrenko
2018-08-08 16:21 ` Vladimir Davydov
0 siblings, 1 reply; 2+ messages in thread
From: Serge Petrenko @ 2018-08-06 7:09 UTC (permalink / raw)
To: tarantool-patches; +Cc: georgy, Serge Petrenko
Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).
Fix this by implementing replication_shutdown and relay_halt functions.
replication_shutdown calls relay_halt to stop every relay thread that is
using tx.
Closes #3485
---
https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown
Changes in v2:
- instead of setting tx_in_use flag
in relay and checking it in tx, send a
message from relay to tx to set the flag.
src/box/box.cc | 2 +-
src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++++++++--
src/box/relay.h | 10 ++++++
src/box/replication.cc | 30 ++++++++++++++++++
src/box/replication.h | 6 ++++
5 files changed, 129 insertions(+), 4 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..f212c0fa8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
* initialized
*/
if (is_box_configured) {
+ replication_shutdown();
#if 0
session_free();
- replication_free();
user_cache_free();
schema_free();
module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..60cb11932 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -63,6 +63,9 @@ struct relay_status_msg {
struct relay *relay;
/** Replica vclock. */
struct vclock vclock;
+ /** A flag to notify tx on creation / before removal
+ * of tx_pipe/relay_pipe. */
+ bool tx_in_use;
};
/**
@@ -82,10 +85,22 @@ struct relay_gc_msg {
struct vclock vclock;
};
+/**
+ * Cbus message sent by tx thread to stop relay on shutdown.
+ */
+struct relay_halt_msg {
+ /** Parent. */
+ struct cmsg msg;
+ /** Relay instance. */
+ struct relay *relay;
+};
+
/** State of a replication relay. */
struct relay {
/** The thread in which we relay data to the replica. */
struct cord cord;
+ /** The main fiber in cord to be canceled upon relay halt. */
+ struct fiber *main_fiber;
/** Replica connection */
struct ev_io io;
/** Request sync */
@@ -120,6 +135,11 @@ struct relay {
struct cpipe tx_pipe;
/** A pipe from 'tx' thread to 'relay' */
struct cpipe relay_pipe;
+ /**
+ * A flag indicating that we executed relay_subscribe_f and
+ * have tx_pipe and relay_pipe ready.
+ */
+ bool tx_in_use;
/** Status message */
struct relay_status_msg status_msg;
/**
@@ -152,6 +172,12 @@ relay_get_state(const struct relay *relay)
return relay->state;
}
+bool
+relay_uses_tx(const struct relay *relay)
+{
+ return relay->tx_in_use;
+}
+
const struct vclock *
relay_vclock(const struct relay *relay)
{
@@ -198,6 +224,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
relay->state = RELAY_FOLLOW;
}
+static void
+relay_main_fiber_halt(struct cmsg *msg)
+{
+ struct relay_halt_msg *m = (struct relay_halt_msg *)msg;
+ struct relay *relay = m->relay;
+
+ assert(relay->main_fiber != NULL);
+ fiber_cancel(relay->main_fiber);
+ relay->main_fiber = NULL;
+
+ free(m);
+}
+
+void
+relay_halt(struct relay *relay)
+{
+ assert(relay->state == RELAY_FOLLOW);
+
+ static const struct cmsg_hop route[] ={
+ {relay_main_fiber_halt, NULL}
+ };
+ struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m));
+ if (m == NULL) {
+ /*
+ * Out of memory during shutdown. Do nothing.
+ */
+ say_warn("failed to allocate relay halt message");
+ return;
+ }
+ cmsg_init(&m->msg, route);
+ m->relay = relay;
+ cpipe_push(&relay->relay_pipe, &m->msg);
+}
+
static void
relay_stop(struct relay *relay)
{
@@ -311,6 +371,7 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ status->relay->tx_in_use = status->tx_in_use;
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
@@ -468,6 +529,7 @@ relay_subscribe_f(va_list ap)
fiber_schedule_cb, fiber());
cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
NULL, NULL, cbus_process);
+ relay->main_fiber = fiber();
/* Setup garbage collection trigger. */
struct trigger on_close_log = {
RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
@@ -478,6 +540,16 @@ relay_subscribe_f(va_list ap)
relay_set_cord_name(relay->io.fd);
+ static const struct cmsg_hop route[] = {
+ {tx_status_update, NULL}
+ };
+ /* Notify tx that relay thread is started. */
+ cmsg_init(&relay->status_msg.msg, route);
+ vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock);
+ relay->status_msg.relay = relay;
+ relay->status_msg.tx_in_use = true;
+ cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+
char name[FIBER_NAME_MAX];
snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
struct fiber *reader = fiber_new_xc(name, relay_reader_f);
@@ -525,12 +597,11 @@ relay_subscribe_f(va_list ap)
if (vclock_sum(&relay->status_msg.vclock) ==
vclock_sum(send_vclock))
continue;
- static const struct cmsg_hop route[] = {
- {tx_status_update, NULL}
- };
+
cmsg_init(&relay->status_msg.msg, route);
vclock_copy(&relay->status_msg.vclock, send_vclock);
relay->status_msg.relay = relay;
+ relay->status_msg.tx_in_use = true;
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
/* Collect xlog files received by the replica. */
relay_schedule_pending_gc(relay, send_vclock);
@@ -542,6 +613,14 @@ relay_subscribe_f(va_list ap)
if (!fiber_is_dead(reader))
fiber_cancel(reader);
fiber_join(reader);
+
+ /* Notify tx that relay is stopping. */
+ cmsg_init(&relay->status_msg.msg, route);
+ vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock);
+ relay->status_msg.relay = relay;
+ relay->status_msg.tx_in_use = false;
+ cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+
cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
NULL, NULL, cbus_process);
cbus_endpoint_destroy(&relay->endpoint, cbus_process);
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..deaba34d4 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
struct relay *
relay_new(struct replica *replica);
+/** Stop a running relay.. Called on shutdown. */
+void
+relay_halt(struct relay *relay);
+
/** Destroy and delete the relay */
void
relay_delete(struct relay *relay);
@@ -73,6 +77,12 @@ relay_get_diag(struct relay *relay);
enum relay_state
relay_get_state(const struct relay *relay);
+/**
+ * Return whether relay_subscribe_f was already started
+ * and pipes between tx and relay were created.
+ */
+bool
+relay_uses_tx(const struct relay *relay);
/**
* Returns relay's vclock
* @param relay relay
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..9b4968777 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
fiber_cond_signal(&replicaset.applier.cond);
}
+void
+replication_shutdown()
+{
+ struct replica *replica, *next;
+
+ replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+ if (replica->id == instance_id)
+ continue;
+ if (replica->applier != NULL) {
+ replica_clear_applier(replica);
+ /*
+ * We're exiting, so control won't be passed
+ * to appliers and we don't need to stop them.
+ */
+ }
+ if (replica->id != REPLICA_ID_NIL) {
+ if (relay_get_state(replica->relay) == RELAY_FOLLOW &&
+ relay_uses_tx(replica->relay)) {
+ replica->id = REPLICA_ID_NIL;
+ relay_halt(replica->relay);
+ }
+ } else {
+ replica_hash_remove(&replicaset.hash, replica);
+ replica_delete(replica);
+ }
+ }
+
+ replication_free();
+}
+
/**
* Update the replica set with new "applier" objects
* upon reconfiguration of box.cfg.replication.
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..08f9df258 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -333,6 +333,12 @@ replica_on_relay_stop(struct replica *replica);
void
replica_check_id(uint32_t replica_id);
+/*
+ * Stop replication and delete all replicas and replicaset.
+ */
+void
+replication_shutdown();
+
/**
* Register the universally unique identifier of a remote replica and
* a matching replica-set-local identifier in the _cluster registry.
--
2.15.2 (Apple Git-101.1)
^ permalink raw reply [flat|nested] 2+ messages in thread
* Re: [tarantool-patches] [PATCH v2] replication: implement replication_shutdown()
2018-08-06 7:09 [tarantool-patches] [PATCH v2] replication: implement replication_shutdown() Serge Petrenko
@ 2018-08-08 16:21 ` Vladimir Davydov
0 siblings, 0 replies; 2+ messages in thread
From: Vladimir Davydov @ 2018-08-08 16:21 UTC (permalink / raw)
To: Serge Petrenko; +Cc: tarantool-patches, georgy
On Mon, Aug 06, 2018 at 10:09:17AM +0300, Serge Petrenko wrote:
> Relay threads keep using tx upon shutdown, which leads to occasional
> segmentation faults and assertion fails (e.g. in replication test
> suite).
>
> Fix this by implementing replication_shutdown and relay_halt functions.
> replication_shutdown calls relay_halt to stop every relay thread that is
> using tx.
>
> Closes #3485
> ---
> https://github.com/tarantool/tarantool/issues/3485
> https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown
>
> Changes in v2:
> - instead of setting tx_in_use flag
> in relay and checking it in tx, send a
> message from relay to tx to set the flag.
>
> src/box/box.cc | 2 +-
> src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++++++++--
> src/box/relay.h | 10 ++++++
> src/box/replication.cc | 30 ++++++++++++++++++
> src/box/replication.h | 6 ++++
> 5 files changed, 129 insertions(+), 4 deletions(-)
I see some discrepancy between the patch submitted for review and the
code pushed to the branch:
vlad@esperanza tarantool$ date
Wed Aug 8 19:12:29 MSK 2018
vlad@esperanza tarantool$ git remote update origin
Fetching origin
vlad@esperanza tarantool$ git status
On branch sergepetrenko/gh-3485-replication-shutdown
Your branch is up-to-date with 'origin/sergepetrenko/gh-3485-replication-shutdown'.
nothing to commit, working tree clean
vlad@esperanza tarantool$ git show --oneline --stat
920dc83a replication: implement replication_shutdown()
src/box/box.cc | 2 +-
src/box/relay.cc | 85 ++++++++++++++++++++++++++++++++++++++++++--
src/box/relay.h | 10 ++++++
src/box/replication.cc | 30 ++++++++++++++++
src/box/replication.h | 6 ++++
test/replication/gc.result | 12 +++++++
test/replication/gc.test.lua | 4 +++
7 files changed, 145 insertions(+), 4 deletions(-)
Are you trying to conceal what you intend to do to the tests?
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 4cacbc840..60cb11932 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -63,6 +63,9 @@ struct relay_status_msg {
> struct relay *relay;
> /** Replica vclock. */
> struct vclock vclock;
> + /** A flag to notify tx on creation / before removal
> + * of tx_pipe/relay_pipe. */
Malformed comment style.
> + bool tx_in_use;
> };
>
> /**
> @@ -82,10 +85,22 @@ struct relay_gc_msg {
> struct vclock vclock;
> };
>
> +/**
> + * Cbus message sent by tx thread to stop relay on shutdown.
> + */
> +struct relay_halt_msg {
> + /** Parent. */
> + struct cmsg msg;
> + /** Relay instance. */
> + struct relay *relay;
> +};
> +
> /** State of a replication relay. */
> struct relay {
> /** The thread in which we relay data to the replica. */
> struct cord cord;
> + /** The main fiber in cord to be canceled upon relay halt. */
> + struct fiber *main_fiber;
> /** Replica connection */
> struct ev_io io;
> /** Request sync */
> @@ -120,6 +135,11 @@ struct relay {
> struct cpipe tx_pipe;
> /** A pipe from 'tx' thread to 'relay' */
> struct cpipe relay_pipe;
> + /**
> + * A flag indicating that we executed relay_subscribe_f and
> + * have tx_pipe and relay_pipe ready.
> + */
> + bool tx_in_use;
I rather dislike this flag. I think it should be a part of cbus
subsystem. Say,
bool cpipe_is_created(cpipe) { return cpipe->endpoint != NULL }
or something like that.
> /** Status message */
> struct relay_status_msg status_msg;
> /**
> @@ -152,6 +172,12 @@ relay_get_state(const struct relay *relay)
> return relay->state;
> }
>
> +bool
> +relay_uses_tx(const struct relay *relay)
> +{
> + return relay->tx_in_use;
> +}
> +
No point in exporting this function. You can check that relay_pipe is
available right in relay_halt.
> const struct vclock *
> relay_vclock(const struct relay *relay)
> {
> @@ -198,6 +224,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
> relay->state = RELAY_FOLLOW;
> }
>
> +static void
> +relay_main_fiber_halt(struct cmsg *msg)
> +{
> + struct relay_halt_msg *m = (struct relay_halt_msg *)msg;
> + struct relay *relay = m->relay;
> +
> + assert(relay->main_fiber != NULL);
> + fiber_cancel(relay->main_fiber);
> + relay->main_fiber = NULL;
Can't you simply use fiber() here? AFAIU cbus messages are processed by
the "main" relay fiber anyway, see relay_subscribe_f.
BTW this code isn't covered by any test:
https://coveralls.io/builds/18341862/source?filename=src/box/relay.cc#L228
Please make sure it is.
> +
> + free(m);
> +}
> +
> +void
> +relay_halt(struct relay *relay)
'halt' is an exact synonym of 'stop', and we already have relay_stop().
I think this one should be called relay_cancel() or relay_abort() or
something like that. Please come up with a better name to avoid
confusion.
> +{
> + assert(relay->state == RELAY_FOLLOW);
> +
> + static const struct cmsg_hop route[] ={
> + {relay_main_fiber_halt, NULL}
> + };
> + struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m));
> + if (m == NULL) {
> + /*
> + * Out of memory during shutdown. Do nothing.
> + */
> + say_warn("failed to allocate relay halt message");
> + return;
> + }
> + cmsg_init(&m->msg, route);
> + m->relay = relay;
> + cpipe_push(&relay->relay_pipe, &m->msg);
> +}
AFAIR cpipe_push() doesn't necessarily flushes the input.
cpipe_flush_input(), may be?
Also, I think you should wait for the relay thread to exit (see
cord_join), otherwise you may proceed to tx destruction while it
can still access tx data.
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 48956d2ed..9b4968777 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
> fiber_cond_signal(&replicaset.applier.cond);
> }
>
> +void
> +replication_shutdown()
So now we have replication_shutdown and replication_free, both public,
and replication_free is only used in replication_shutdown. Not good.
We typically call subsys constructor something_init() and destructor
something_free(). That said, I guess this code should be a part of
replication_free() and replication_shutdown() shouldn't exist.
> +{
> + struct replica *replica, *next;
> +
> + replica_hash_foreach_safe(&replicaset.hash, replica, next) {
> + if (replica->id == instance_id)
> + continue;
Why? Don't we want to delete all replicas, including ourselves?
> + if (replica->applier != NULL) {
> + replica_clear_applier(replica);
> + /*
> + * We're exiting, so control won't be passed
> + * to appliers and we don't need to stop them.
> + */
> + }
> + if (replica->id != REPLICA_ID_NIL) {
> + if (relay_get_state(replica->relay) == RELAY_FOLLOW &&
> + relay_uses_tx(replica->relay)) {
> + replica->id = REPLICA_ID_NIL;
> + relay_halt(replica->relay);
> + }
> + } else {
> + replica_hash_remove(&replicaset.hash, replica);
> + replica_delete(replica);
Don't we want to delete all replicas here? Can we?
What about replicas on the replicaset.anon list. Shouldn't we delete
them, too?
> + }
> + }
> +
> + replication_free();
> +}
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2018-08-08 16:21 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-06 7:09 [tarantool-patches] [PATCH v2] replication: implement replication_shutdown() Serge Petrenko
2018-08-08 16:21 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox