[PATCH v3/1] replication: implement replication_shutdown().

Serge Petrenko sergepetrenko at tarantool.org
Tue Aug 14 11:07:07 MSK 2018


Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).

Fix this by finishing implementation of replication_free() and introducing
relay_cancel().
replication_free calls relay_cancel to stop every relay thread that is
using tx.
Also add delays to replication/gc.test to fix tests on travis.

Closes #3485
---
https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/tree/sergepetrenko/gh-3485-replication-shutdown

This is the first variant of the patch, as discussed yesterday
with Vladimir and Georgy. Here we send a message to relay via
cpipe to make it stop. I made sure the message is flushed and
delivered.

Changes in v3:
  - eliminate tx_in_use flag. Instead
    just check that the pipe between
    relay and tx is created.
  - in relay just cancel fiber(), do not
    store main_fiber, since they're the same
    in that case.
  - ensure that message is correctly flushed
    with cpipe_set_max_input() and
    cpipe_flush_input().
  - wait for the relay thread with cord_join()
    before proceeding with tx destruction.
  - make sure all the replicas are deleted,
    including the ones in replicaset.anon
    list.
  
Changes in v2:
  - instead of setting tx_in_use flag
    in relay and checking it in tx, send a
    message from relay to tx to set the flag.

 src/box/box.cc               |  2 +-
 src/box/relay.cc             | 44 ++++++++++++++++++++++++++++++++++++++++
 src/box/relay.h              |  4 ++++
 src/box/replication.cc       | 48 +++++++++++++++++++++++++++++++++++++-------
 src/box/replication.h        |  3 +++
 src/cbus.c                   | 12 +++++------
 src/cbus.h                   | 10 +++++++++
 test/replication/gc.result   | 12 +++++++++++
 test/replication/gc.test.lua |  4 ++++
 9 files changed, 125 insertions(+), 14 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..c5e05bc15 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
 	 * initialized
 	 */
 	if (is_box_configured) {
+	        replication_free();
 #if 0
 		session_free();
-		replication_free();
 		user_cache_free();
 		schema_free();
 		module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..bc1c87236 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -198,6 +198,50 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	relay->state = RELAY_FOLLOW;
 }
 
+static void
+relay_main_fiber_cancel(struct cmsg *msg)
+{
+	(void)msg;
+
+	fiber_cancel(fiber());
+
+}
+
+void
+relay_cancel(struct relay *relay)
+{
+	assert(relay->state == RELAY_FOLLOW);
+	if (!cpipe_is_created(&relay->relay_pipe) ||
+		relay->endpoint.n_pipes == 0)
+		return;
+	static const struct cmsg_hop route[] ={
+		{relay_main_fiber_cancel, NULL}
+	};
+	struct cmsg *msg = (struct cmsg *)malloc(sizeof(*msg));
+	if (msg == NULL) {
+		/*
+		 * Out of memory during shutdown. Do nothing.
+		 */
+		say_warn("failed to allocate relay cancel message");
+		return;
+	}
+	cmsg_init(msg, route);
+	cpipe_set_max_input(&relay->relay_pipe, 1);
+	cpipe_push(&relay->relay_pipe, msg);
+	/*
+	* Relay sends an unpair message to tx upon exit, we have
+	* to process that message first. Ev loop is stopped at
+	* this point, so do this manually.
+	*/
+	struct cbus_endpoint *endpoint = cbus_find_endpoint("tx");
+	while (true) {
+		cbus_process(endpoint);
+		if (relay->endpoint.n_pipes == 0)
+			break;
+	}
+	cord_join(&relay->cord);
+}
+
 static void
 relay_stop(struct relay *relay)
 {
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..53bf68eb8 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Cancel a running relay. Called on shutdown. */
+void
+relay_cancel(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..acf20fc2c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,13 +91,6 @@ replication_init(void)
 	latch_create(&replicaset.applier.order_latch);
 }
 
-void
-replication_free(void)
-{
-	free(replicaset.replica_by_id);
-	fiber_cond_destroy(&replicaset.applier.cond);
-}
-
 void
 replica_check_id(uint32_t replica_id)
 {
@@ -242,6 +235,47 @@ replica_clear_applier(struct replica *replica)
 	trigger_clear(&replica->on_applier_state);
 }
 
+void
+replication_free(void)
+{
+	struct replica *replica, *next;
+
+	replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+		if (replica->id == instance_id) {
+			replica_hash_remove(&replicaset.hash, replica);
+			/*
+			 * Local replica doesn't have neither applier
+			 * nor relay, so free it right away.
+			 */
+			TRASH(replica);
+			free(replica);
+			continue;
+		}
+		if (replica->applier != NULL) {
+			replica_clear_applier(replica);
+			/*
+			 * We're exiting, so control won't be passed
+			 * to appliers and we don't need to stop them.
+			 */
+		}
+		if (replica->id != REPLICA_ID_NIL) {
+			if (relay_get_state(replica->relay) == RELAY_FOLLOW) {
+				replica->id = REPLICA_ID_NIL;
+				relay_cancel(replica->relay);
+			}
+		} else {
+			replica_hash_remove(&replicaset.hash, replica);
+			replica_delete(replica);
+		}
+	}
+	rlist_foreach_entry_safe(replica, &replicaset.anon, in_anon, next) {
+		replica_clear_applier(replica);
+		replica_delete(replica);
+	}
+	free(replicaset.replica_by_id);
+	fiber_cond_destroy(&replicaset.applier.cond);
+}
+
 static void
 replica_on_applier_sync(struct replica *replica)
 {
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..a8baafd12 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -154,6 +154,9 @@ replication_disconnect_timeout(void)
 void
 replication_init(void);
 
+/**
+ * Stop replication and delete all replicas and replicaset.
+ */
 void
 replication_free(void);
 
diff --git a/src/cbus.c b/src/cbus.c
index b3b1280e7..a7d331250 100644
--- a/src/cbus.c
+++ b/src/cbus.c
@@ -74,12 +74,12 @@ cbus_find_endpoint_locked(struct cbus *bus, const char *name)
 	return NULL;
 }
 
-static struct cbus_endpoint *
-cbus_find_endpoint(struct cbus *bus, const char *name)
+struct cbus_endpoint *
+cbus_find_endpoint(const char *name)
 {
-	tt_pthread_mutex_lock(&bus->mutex);
-	struct cbus_endpoint *endpoint = cbus_find_endpoint_locked(bus, name);
-	tt_pthread_mutex_unlock(&bus->mutex);
+	tt_pthread_mutex_lock(&cbus.mutex);
+	struct cbus_endpoint *endpoint = cbus_find_endpoint_locked(&cbus, name);
+	tt_pthread_mutex_unlock(&cbus.mutex);
 	return endpoint;
 }
 
@@ -527,7 +527,7 @@ cbus_pair(const char *dest_name, const char *src_name,
 	msg.src_pipe = src_pipe;
 	fiber_cond_create(&msg.cond);
 
-	struct cbus_endpoint *endpoint = cbus_find_endpoint(&cbus, src_name);
+	struct cbus_endpoint *endpoint = cbus_find_endpoint(src_name);
 	assert(endpoint != NULL);
 
 	cpipe_create(dest_pipe, dest_name);
diff --git a/src/cbus.h b/src/cbus.h
index 4a12d9e79..ce3a35af3 100644
--- a/src/cbus.h
+++ b/src/cbus.h
@@ -158,6 +158,12 @@ cpipe_create(struct cpipe *pipe, const char *consumer);
 void
 cpipe_destroy(struct cpipe *pipe);
 
+static inline bool
+cpipe_is_created(struct cpipe *pipe)
+{
+	return pipe->endpoint != NULL;
+}
+
 /**
  * Set pipe max size of staged push area. The default is infinity.
  * If staged push cap is set, the pushed messages are flushed
@@ -284,6 +290,10 @@ cbus_init();
 void
 cbus_free();
 
+/** Find an endpoint by name in the global singleton bus. */
+struct cbus_endpoint *
+cbus_find_endpoint(const char *name);
+
 /**
  * Connect the cord to cbus as a named reciever.
  * @param name a destination name
diff --git a/test/replication/gc.result b/test/replication/gc.result
index 3f9db26ce..999378e9e 100644
--- a/test/replication/gc.result
+++ b/test/replication/gc.result
@@ -152,6 +152,9 @@ box.snapshot()
 ---
 - ok
 ...
+wait_gc(1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -311,6 +314,9 @@ box.snapshot()
 ---
 - ok
 ...
+wait_gc(1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -330,6 +336,9 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
 test_run:cleanup_cluster()
 ---
 ...
+fiber.sleep(0.1)
+---
+...
 #box.info.gc().checkpoints == 1 or box.info.gc()
 ---
 - true
@@ -425,6 +434,9 @@ box.snapshot()
 ---
 - ok
 ...
+fiber.sleep(0.1)
+---
+...
 #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
 ---
 - true
diff --git a/test/replication/gc.test.lua b/test/replication/gc.test.lua
index 96f11f8d4..6f0cb942b 100644
--- a/test/replication/gc.test.lua
+++ b/test/replication/gc.test.lua
@@ -78,6 +78,7 @@ box.snapshot()
 -- Invoke garbage collection. Check that it doesn't remove
 -- xlogs needed by the replica.
 box.snapshot()
+wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 #fio.glob('./master/*.xlog') == 2 or fio.listdir('./master')
 
@@ -144,6 +145,7 @@ _ = s:auto_increment{}
 box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
+wait_gc(1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 xlog_count = #fio.glob('./master/*.xlog')
 -- the replica may have managed to download all data
@@ -154,6 +156,7 @@ xlog_count == 3 or xlog_count == 2 or fio.listdir('./master')
 -- The xlog should only be deleted after the replica
 -- is unregistered.
 test_run:cleanup_cluster()
+fiber.sleep(0.1)
 #box.info.gc().checkpoints == 1 or box.info.gc()
 #fio.glob('./master/*.xlog') == 1 or fio.listdir('./master')
 --
@@ -195,6 +198,7 @@ _ = s:auto_increment{}
 box.snapshot()
 _ = s:auto_increment{}
 box.snapshot()
+fiber.sleep(0.1)
 #fio.glob('./master/*.xlog') == 3 or fio.listdir('./master')
 
 -- Delete the replica from the cluster table and check that
-- 
2.15.2 (Apple Git-101.1)




More information about the Tarantool-patches mailing list