Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2] replication: implement replication_shutdown()
@ 2018-08-06  7:09 Serge Petrenko
  2018-08-08 16:21 ` Vladimir Davydov
  0 siblings, 1 reply; 2+ messages in thread
From: Serge Petrenko @ 2018-08-06  7:09 UTC (permalink / raw)
  To: tarantool-patches; +Cc: georgy, Serge Petrenko

Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).

Fix this by implementing replication_shutdown and relay_halt functions.
replication_shutdown calls relay_halt to stop every relay thread that is
using tx.

Closes #3485
---
https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown

Changes in v2:
  - instead of setting tx_in_use flag
    in relay and checking it in tx, send a
    message from relay to tx to set the flag.

 src/box/box.cc         |  2 +-
 src/box/relay.cc       | 85 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/box/relay.h        | 10 ++++++
 src/box/replication.cc | 30 ++++++++++++++++++
 src/box/replication.h  |  6 ++++
 5 files changed, 129 insertions(+), 4 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..f212c0fa8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
 	 * initialized
 	 */
 	if (is_box_configured) {
+	        replication_shutdown();
 #if 0
 		session_free();
-		replication_free();
 		user_cache_free();
 		schema_free();
 		module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..60cb11932 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -63,6 +63,9 @@ struct relay_status_msg {
 	struct relay *relay;
 	/** Replica vclock. */
 	struct vclock vclock;
+	/** A flag to notify tx on creation / before removal
+	 * of tx_pipe/relay_pipe. */
+	bool tx_in_use;
 };
 
 /**
@@ -82,10 +85,22 @@ struct relay_gc_msg {
 	struct vclock vclock;
 };
 
+/**
+ * Cbus message sent by tx thread to stop relay on shutdown.
+ */
+struct relay_halt_msg {
+	/** Parent. */
+	struct cmsg msg;
+	/** Relay instance. */
+	struct relay *relay;
+};
+
 /** State of a replication relay. */
 struct relay {
 	/** The thread in which we relay data to the replica. */
 	struct cord cord;
+	/** The main fiber in cord to be canceled upon relay halt. */
+	struct fiber *main_fiber;
 	/** Replica connection */
 	struct ev_io io;
 	/** Request sync */
@@ -120,6 +135,11 @@ struct relay {
 	struct cpipe tx_pipe;
 	/** A pipe from 'tx' thread to 'relay' */
 	struct cpipe relay_pipe;
+	/**
+	 * A flag indicating that we executed relay_subscribe_f and
+	 * have tx_pipe and relay_pipe ready.
+	 */
+	bool tx_in_use;
 	/** Status message */
 	struct relay_status_msg status_msg;
 	/**
@@ -152,6 +172,12 @@ relay_get_state(const struct relay *relay)
 	return relay->state;
 }
 
+bool
+relay_uses_tx(const struct relay *relay)
+{
+	return relay->tx_in_use;
+}
+
 const struct vclock *
 relay_vclock(const struct relay *relay)
 {
@@ -198,6 +224,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 }
 
+static void
+relay_main_fiber_halt(struct cmsg *msg)
+{
+	struct relay_halt_msg *m = (struct relay_halt_msg *)msg;
+	struct relay *relay = m->relay;
+
+	assert(relay->main_fiber != NULL);
+	fiber_cancel(relay->main_fiber);
+	relay->main_fiber = NULL;
+
+	free(m);
+}
+
+void
+relay_halt(struct relay *relay)
+{
+	assert(relay->state == RELAY_FOLLOW);
+
+	static const struct cmsg_hop route[] ={
+		{relay_main_fiber_halt, NULL}
+	};
+	struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m));
+	if (m == NULL) {
+		/*
+		 * Out of memory during shutdown. Do nothing.
+		 */
+		say_warn("failed to allocate relay halt message");
+		return;
+	}
+	cmsg_init(&m->msg, route);
+	m->relay = relay;
+	cpipe_push(&relay->relay_pipe, &m->msg);
+}
+
 static void
 relay_stop(struct relay *relay)
 {
@@ -311,6 +371,7 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	status->relay->tx_in_use = status->tx_in_use;
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
@@ -468,6 +529,7 @@ relay_subscribe_f(va_list ap)
 			     fiber_schedule_cb, fiber());
 	cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
 		  NULL, NULL, cbus_process);
+	relay->main_fiber = fiber();
 	/* Setup garbage collection trigger. */
 	struct trigger on_close_log = {
 		RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
@@ -478,6 +540,16 @@ relay_subscribe_f(va_list ap)
 
 	relay_set_cord_name(relay->io.fd);
 
+	static const struct cmsg_hop route[] = {
+		{tx_status_update, NULL}
+	};
+	/* Notify tx that relay thread is started. */
+	cmsg_init(&relay->status_msg.msg, route);
+	vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock);
+	relay->status_msg.relay = relay;
+	relay->status_msg.tx_in_use = true;
+	cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+
 	char name[FIBER_NAME_MAX];
 	snprintf(name, sizeof(name), "%s:%s", fiber()->name, "reader");
 	struct fiber *reader = fiber_new_xc(name, relay_reader_f);
@@ -525,12 +597,11 @@ relay_subscribe_f(va_list ap)
 		if (vclock_sum(&relay->status_msg.vclock) ==
 		    vclock_sum(send_vclock))
 			continue;
-		static const struct cmsg_hop route[] = {
-			{tx_status_update, NULL}
-		};
+
 		cmsg_init(&relay->status_msg.msg, route);
 		vclock_copy(&relay->status_msg.vclock, send_vclock);
 		relay->status_msg.relay = relay;
+		relay->status_msg.tx_in_use = true;
 		cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
 		/* Collect xlog files received by the replica. */
 		relay_schedule_pending_gc(relay, send_vclock);
@@ -542,6 +613,14 @@ relay_subscribe_f(va_list ap)
 	if (!fiber_is_dead(reader))
 		fiber_cancel(reader);
 	fiber_join(reader);
+
+	/* Notify tx that relay is stopping. */
+	cmsg_init(&relay->status_msg.msg, route);
+	vclock_copy(&relay->status_msg.vclock, &relay->tx.vclock);
+	relay->status_msg.relay = relay;
+	relay->status_msg.tx_in_use = false;
+	cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
+
 	cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
 		    NULL, NULL, cbus_process);
 	cbus_endpoint_destroy(&relay->endpoint, cbus_process);
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..deaba34d4 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Stop a running relay.. Called on shutdown. */
+void
+relay_halt(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
@@ -73,6 +77,12 @@ relay_get_diag(struct relay *relay);
 enum relay_state
 relay_get_state(const struct relay *relay);
 
+/**
+ * Return whether relay_subscribe_f was already started
+ * and pipes between tx and relay were created.
+ */
+bool
+relay_uses_tx(const struct relay *relay);
 /**
  * Returns relay's vclock
  * @param relay relay
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..9b4968777 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void *event)
 	fiber_cond_signal(&replicaset.applier.cond);
 }
 
+void
+replication_shutdown()
+{
+	struct replica *replica, *next;
+
+	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+		if (replica->id == instance_id)
+			continue;
+		if (replica->applier != NULL) {
+			replica_clear_applier(replica);
+			/*
+			 * We're exiting, so control won't be passed
+			 * to appliers and we don't need to stop them.
+			 */
+		}
+		if (replica->id != REPLICA_ID_NIL) {
+			if (relay_get_state(replica->relay) == RELAY_FOLLOW &&
+			    relay_uses_tx(replica->relay)) {
+				replica->id = REPLICA_ID_NIL;
+				relay_halt(replica->relay);
+			}
+		} else {
+			replica_hash_remove(&replicaset.hash, replica);
+			replica_delete(replica);
+		}
+	}
+
+	replication_free();
+}
+
 /**
  * Update the replica set with new "applier" objects
  * upon reconfiguration of box.cfg.replication.
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..08f9df258 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -333,6 +333,12 @@ replica_on_relay_stop(struct replica *replica);
 void
 replica_check_id(uint32_t replica_id);
 
+/*
+ * Stop replication and delete all replicas and replicaset.
+ */
+void
+replication_shutdown();
+
 /**
  * Register the universally unique identifier of a remote replica and
  * a matching replica-set-local identifier in the  _cluster registry.
-- 
2.15.2 (Apple Git-101.1)

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2018-08-08 16:21 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-06  7:09 [tarantool-patches] [PATCH v2] replication: implement replication_shutdown() Serge Petrenko
2018-08-08 16:21 ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox