[tarantool-patches] [PATCH 4/5] swim: introduce quit message

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Apr 9 21:12:11 MSK 2019


Quit allows to gracefully leave a cluster. Other members will not
consider the quited instance as dead, and will drop it much
earlier than it would happen via failure detection.

Quit works as follows: a special message is send to each member.
Members, got that message, will mark the source as 'left' and
will keep and disseminate that change for one round. In the best
case after one round the left member will be marked as such in
the whole cluster. 'Left' member will not be added back because,
it is prohibited explicitly to add new 'left' members.

Part of #3234
---
 src/lib/swim/swim.c           | 118 +++++++++++++++++++++++++++++++++-
 src/lib/swim/swim.h           |  10 +++
 src/lib/swim/swim_constants.h |   2 +
 src/lib/swim/swim_io.c        |   8 ++-
 src/lib/swim/swim_io.h        |   4 ++
 src/lib/swim/swim_proto.c     |  11 ++++
 src/lib/swim/swim_proto.h     |  30 +++++++++
 test/unit/swim.c              |  83 +++++++++++++++++++++++-
 test/unit/swim.result         |  15 ++++-
 test/unit/swim_test_utils.c   |  11 ++++
 test/unit/swim_test_utils.h   |  12 ++++
 11 files changed, 299 insertions(+), 5 deletions(-)

diff --git a/src/lib/swim/swim.c b/src/lib/swim/swim.c
index d5ecc6622..f0383554d 100644
--- a/src/lib/swim/swim.c
+++ b/src/lib/swim/swim.c
@@ -885,8 +885,11 @@ swim_decrease_event_ttl(struct swim *swim)
 	rlist_foreach_entry_safe(member, &swim->dissemination_queue,
 				 in_dissemination_queue,
 				 tmp) {
-		if (--member->status_ttl == 0)
+		if (--member->status_ttl == 0) {
 			rlist_del_entry(member, in_dissemination_queue);
+			if (member->status == MEMBER_LEFT)
+				swim_delete_member(swim, member);
+		}
 	}
 }
 
@@ -1026,6 +1029,8 @@ swim_check_acks(struct ev_loop *loop, struct ev_timer *t, int events)
 				continue;
 			}
 			break;
+		case MEMBER_LEFT:
+			continue;
 		default:
 			unreachable();
 		}
@@ -1114,7 +1119,9 @@ swim_upsert_member(struct swim *swim, const struct swim_member_def *def,
 {
 	struct swim_member *member = swim_find_member(swim, &def->uuid);
 	if (member == NULL) {
-		if (def->status == MEMBER_DEAD && swim->gc_mode == SWIM_GC_ON) {
+		if (def->status == MEMBER_LEFT ||
+		    (def->status == MEMBER_DEAD &&
+		     swim->gc_mode == SWIM_GC_ON)) {
 			/*
 			 * Do not 'resurrect' dead members to
 			 * prevent 'ghost' members. Ghost member
@@ -1282,6 +1289,47 @@ swim_process_dissemination(struct swim *swim, const char **pos, const char *end)
 	return swim_process_members(swim, prefix, pos, end);
 }
 
+/**
+ * Decode a quit message. Schedule dissemination, change status.
+ */
+static int
+swim_process_quit(struct swim *swim, const char **pos, const char *end,
+		  const struct tt_uuid *uuid)
+{
+	say_verbose("SWIM %d: process quit", swim_fd(swim));
+	const char *prefix = "invald quit message:";
+	uint32_t size;
+	if (swim_decode_map(pos, end, &size, prefix, "root") != 0)
+		return -1;
+	if (size != 1) {
+		diag_set(SwimError, "%s map of size 1 is expected", prefix);
+		return -1;
+	}
+	uint64_t tmp;
+	if (swim_decode_uint(pos, end, &tmp, prefix, "a key") != 0)
+		return -1;
+	if (tmp != SWIM_QUIT_INCARNATION) {
+		diag_set(SwimError, "%s a key should be incarnation", prefix);
+		return -1;
+	}
+	if (swim_decode_uint(pos, end, &tmp, prefix, "incarnation") != 0)
+		return -1;
+	struct swim_member *m = swim_find_member(swim, uuid);
+	if (m == NULL)
+		return 0;
+	/*
+	 * Check for 'self' in case this instance took UUID of a
+	 * quited instance.
+	 */
+	if (m != swim->self) {
+		swim_update_member_inc_status(swim, m, MEMBER_LEFT, tmp);
+	} else if (tmp >= m->incarnation) {
+		m->incarnation++;
+		swim_on_member_update(swim, m);
+	}
+	return 0;
+}
+
 /** Process a new message. */
 static void
 swim_on_input(struct swim_scheduler *scheduler, const char *pos,
@@ -1325,6 +1373,10 @@ swim_on_input(struct swim_scheduler *scheduler, const char *pos,
 			if (swim_process_dissemination(swim, &pos, end) != 0)
 				goto error;
 			break;
+		case SWIM_QUIT:
+			if (swim_process_quit(swim, &pos, end, &uuid) != 0)
+				goto error;
+			break;
 		default:
 			diag_set(SwimError, "%s unexpected key", prefix);
 			goto error;
@@ -1590,6 +1642,68 @@ swim_delete(struct swim *swim)
 	free(swim->shuffled);
 }
 
+/**
+ * Quit message is broadcasted in the same way as round messages,
+ * step by step, with the only difference that quit round steps
+ * follow each other without delays.
+ */
+static void
+swim_quit_step_complete(struct swim_task *task,
+			struct swim_scheduler *scheduler, int rc)
+{
+	(void) rc;
+	struct swim *swim = swim_by_scheduler(scheduler);
+	if (rlist_empty(&swim->round_queue)) {
+		swim_delete(swim);
+		return;
+	}
+	struct swim_member *m =
+		rlist_shift_entry(&swim->round_queue, struct swim_member,
+				  in_round_queue);
+	swim_task_send(task, &m->addr, scheduler);
+}
+
+/**
+ * Encode 'quit' command.
+ * @retval Number of key-values added to the packet's root map.
+ */
+static inline int
+swim_encode_quit(struct swim *swim, struct swim_packet *packet)
+{
+	struct swim_quit_bin bin;
+	char *pos = swim_packet_alloc(packet, sizeof(bin));
+	if (pos == NULL)
+		return 0;
+	swim_quit_bin_create(&bin, swim->self->incarnation);
+	memcpy(pos, &bin, sizeof(bin));
+	return 1;
+}
+
+void
+swim_quit(struct swim *swim)
+{
+	assert(swim_is_configured(swim));
+	swim_ev_timer_stop(loop(), &swim->round_tick);
+	swim_ev_timer_stop(loop(), &swim->wait_ack_tick);
+	swim_scheduler_stop_input(&swim->scheduler);
+	/* Start the last round - quiting. */
+	if (swim_new_round(swim) != 0) {
+		diag_log();
+		swim_delete(swim);
+		return;
+	}
+	struct swim_task *task = &swim->round_step_task;
+	swim_task_destroy(task);
+	swim_task_create(task, swim_quit_step_complete, swim_task_delete_cb,
+			 "quit");
+	char *header = swim_packet_alloc(&task->packet, 1);
+	int rc = swim_encode_src_uuid(swim, &task->packet) +
+		 swim_encode_quit(swim, &task->packet);
+	assert(rc == 2);
+	mp_encode_map(header, rc);
+	swim_quit_step_complete(task, &swim->scheduler, 0);
+}
+
 const struct swim_member *
 swim_self(struct swim *swim)
 {
diff --git a/src/lib/swim/swim.h b/src/lib/swim/swim.h
index ec924f36f..f8dfdde87 100644
--- a/src/lib/swim/swim.h
+++ b/src/lib/swim/swim.h
@@ -144,6 +144,16 @@ swim_info(struct swim *swim, struct info_handler *info);
 int
 swim_size(const struct swim *swim);
 
+/**
+ * Gracefully leave the cluster, broadcast a notification.
+ * Members, received it, will remove a record about this instance
+ * from their tables, and will not consider it dead. @a swim
+ * object can not be used after quit and should be treated as
+ * deleted.
+ */
+void
+swim_quit(struct swim *swim);
+
 /** Get a SWIM member, describing this instance. */
 const struct swim_member *
 swim_self(struct swim *swim);
diff --git a/src/lib/swim/swim_constants.h b/src/lib/swim/swim_constants.h
index 104f09f47..7869ddf3e 100644
--- a/src/lib/swim/swim_constants.h
+++ b/src/lib/swim/swim_constants.h
@@ -42,6 +42,8 @@ enum swim_member_status {
 	 * the membership after some unacknowledged pings.
 	 */
 	MEMBER_DEAD,
+	/** The member has voluntary left the cluster. */
+	MEMBER_LEFT,
 	swim_member_status_MAX,
 };
 
diff --git a/src/lib/swim/swim_io.c b/src/lib/swim/swim_io.c
index 504f64f32..56c2facc8 100644
--- a/src/lib/swim/swim_io.c
+++ b/src/lib/swim/swim_io.c
@@ -157,6 +157,12 @@ swim_scheduler_bind(struct swim_scheduler *scheduler,
 	return 0;
 }
 
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler)
+{
+	swim_ev_io_stop(loop(), &scheduler->input);
+}
+
 void
 swim_scheduler_destroy(struct swim_scheduler *scheduler)
 {
@@ -172,7 +178,7 @@ swim_scheduler_destroy(struct swim_scheduler *scheduler)
 	}
 	swim_transport_destroy(&scheduler->transport);
 	swim_ev_io_stop(loop(), &scheduler->output);
-	swim_ev_io_stop(loop(), &scheduler->input);
+	swim_scheduler_stop_input(scheduler);
 }
 
 static void
diff --git a/src/lib/swim/swim_io.h b/src/lib/swim/swim_io.h
index 25c6ea2e9..a6032127d 100644
--- a/src/lib/swim/swim_io.h
+++ b/src/lib/swim/swim_io.h
@@ -171,6 +171,10 @@ int
 swim_scheduler_bind(struct swim_scheduler *scheduler,
 		    const struct sockaddr_in *addr);
 
+/** Stop accepting new packets from the network. */
+void
+swim_scheduler_stop_input(struct swim_scheduler *scheduler);
+
 /** Destroy scheduler, its queues, close the socket. */
 void
 swim_scheduler_destroy(struct swim_scheduler *scheduler);
diff --git a/src/lib/swim/swim_proto.c b/src/lib/swim/swim_proto.c
index 6b3197790..fa02b61c4 100644
--- a/src/lib/swim/swim_proto.c
+++ b/src/lib/swim/swim_proto.c
@@ -38,6 +38,7 @@
 const char *swim_member_status_strs[] = {
 	"alive",
 	"dead",
+	"left",
 };
 
 const char *swim_fd_msg_type_strs[] = {
@@ -451,3 +452,13 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 	}
 	return 0;
 }
+
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation)
+{
+	header->k_quit = SWIM_QUIT;
+	header->m_quit = 0x81;
+	header->k_incarnation = SWIM_QUIT_INCARNATION;
+	header->m_incarnation = 0xcf;
+	header->v_incarnation = mp_bswap_u64(incarnation);
+}
diff --git a/src/lib/swim/swim_proto.h b/src/lib/swim/swim_proto.h
index 826443a3b..6ae4475c0 100644
--- a/src/lib/swim/swim_proto.h
+++ b/src/lib/swim/swim_proto.h
@@ -84,6 +84,12 @@
  * |         },                                                  |
  * |         ...                                                 |
  * |     ],                                                      |
+ * |                                                             |
+ * |               OR/AND                                        |
+ * |                                                             |
+ * |     SWIM_QUIT: {                                            |
+ * |         SWIM_QUIT_INCARNATION: uint                         |
+ * |     }                                                       |
  * | }                                                           |
  * +-------------------------------------------------------------+
  */
@@ -128,6 +134,7 @@ enum swim_body_key {
 	SWIM_ANTI_ENTROPY,
 	SWIM_FAILURE_DETECTION,
 	SWIM_DISSEMINATION,
+	SWIM_QUIT,
 };
 
 /**
@@ -450,6 +457,29 @@ swim_meta_def_decode(struct swim_meta_def *def, const char **pos,
 
 /** }}}                     Meta component                      */
 
+enum swim_quit_key {
+	/** Incarnation to ignore old quit messages. */
+	SWIM_QUIT_INCARNATION = 0,
+};
+
+/** Quit section. Describes voluntary quit from the cluster. */
+struct PACKED swim_quit_bin {
+	/** mp_encode_uint(SWIM_QUIT) */
+	uint8_t k_quit;
+	/** mp_encode_map(1) */
+	uint8_t m_quit;
+
+	/** mp_encode_uint(SWIM_QUIT_INCARNATION) */
+	uint8_t k_incarnation;
+	/** mp_encode_uint(64bit incarnation) */
+	uint8_t m_incarnation;
+	uint64_t v_incarnation;
+};
+
+/** Initialize quit section. */
+void
+swim_quit_bin_create(struct swim_quit_bin *header, uint64_t incarnation);
+
 /**
  * Helpers to decode some values - map, array, etc with
  * appropriate checks. All of them set diagnostics on an error
diff --git a/test/unit/swim.c b/test/unit/swim.c
index 84ca01ac3..c2d94fb57 100644
--- a/test/unit/swim.c
+++ b/test/unit/swim.c
@@ -467,10 +467,90 @@ swim_test_undead(void)
 	swim_finish_test();
 }
 
+static void
+swim_test_quit(void)
+{
+	swim_start_test(9);
+	int size = 3;
+	struct swim_cluster *cluster = swim_cluster_new(size);
+	for (int i = 0; i < size; ++i) {
+		for (int j = 0; j < size; ++j)
+			swim_cluster_add_link(cluster, i, j);
+	}
+	swim_cluster_quit_node(cluster, 0);
+	is(swim_cluster_wait_status_everywhere(cluster, 0, MEMBER_LEFT, 0),
+	   0, "'quit' is sent to all the members without delays between "\
+	   "dispatches")
+	/*
+	 * Return the instance back and check that it refutes the
+	 * old LEFT status.
+	 */
+	swim_cluster_restart_node(cluster, 0);
+	is(swim_cluster_wait_incarnation(cluster, 0, 0, 1, 2), 0,
+	   "quited member S1 has returned and refuted the old status");
+	fail_if(swim_cluster_wait_fullmesh(cluster, 2) != 0);
+	/*
+	 * Not trivial test. A member can receive its own 'quit'
+	 * message. It can be reproduced if a member has quited.
+	 * Then another member took the spare UUID, and then
+	 * received the 'quit' message with the same UUID. Of
+	 * course, it should be refuted.
+	 */
+	struct swim *s0 = swim_cluster_node(cluster, 0);
+	struct tt_uuid s0_uuid = *swim_member_uuid(swim_self(s0));
+	struct swim *s1 = swim_cluster_node(cluster, 1);
+	swim_remove_member(s1, &s0_uuid);
+	struct swim *s2 = swim_cluster_node(cluster, 2);
+	swim_remove_member(s2, &s0_uuid);
+	swim_cluster_quit_node(cluster, 0);
+
+	/* Steal UUID of the quited node. */
+	swim_cluster_block_io(cluster, 1);
+	is(swim_cluster_update_uuid(cluster, 1, &s0_uuid), 0, "another "\
+	   "member S2 has taken the quited UUID");
+
+	/* Ensure that S1 is not added back to S3 on quit. */
+	swim_run_for(1);
+	is(swim_cluster_member_status(cluster, 2, 0), swim_member_status_MAX,
+	   "S3 did not add S1 back when received its 'quit'");
+
+	/* Now allow S2 to get the 'self-quit' message. */
+	swim_cluster_unblock_io(cluster, 1);
+	is(swim_cluster_wait_incarnation(cluster, 1, 1, 1, 0), 0,
+	   "S2 finally got 'quit' message from S1, but with its 'own' UUID - "\
+	   "refute it")
+	swim_cluster_delete(cluster);
+
+	/**
+	 * Test that if a new member has arrived with LEFT status
+	 * via dissemination or anti-entropy - it is not added.
+	 * Even if GC is off.
+	 */
+	cluster = swim_cluster_new(3);
+	swim_cluster_set_gc(cluster, SWIM_GC_OFF);
+	swim_cluster_interconnect(cluster, 0, 2);
+	swim_cluster_interconnect(cluster, 1, 2);
+
+	swim_cluster_quit_node(cluster, 0);
+	swim_run_for(2);
+	is(swim_cluster_member_status(cluster, 2, 0), MEMBER_LEFT,
+	   "S3 sees S1 as left");
+	is(swim_cluster_member_status(cluster, 1, 0), swim_member_status_MAX,
+	   "S2 does not see S1 at all");
+	swim_run_for(2);
+	is(swim_cluster_member_status(cluster, 2, 0), swim_member_status_MAX,
+	   "after more time S1 is dropped from S3");
+	is(swim_cluster_member_status(cluster, 1, 0), swim_member_status_MAX,
+	   "and still is not added to S2 - left members can not be added");
+
+	swim_cluster_delete(cluster);
+	swim_finish_test();
+}
+
 static int
 main_f(va_list ap)
 {
-	swim_start_test(12);
+	swim_start_test(13);
 
 	(void) ap;
 	swim_test_ev_init();
@@ -488,6 +568,7 @@ main_f(va_list ap)
 	swim_test_too_big_packet();
 	swim_test_undead();
 	swim_test_packet_loss();
+	swim_test_quit();
 
 	swim_test_transport_free();
 	swim_test_ev_free();
diff --git a/test/unit/swim.result b/test/unit/swim.result
index 615327e27..bd157feff 100644
--- a/test/unit/swim.result
+++ b/test/unit/swim.result
@@ -1,5 +1,5 @@
 	*** main_f ***
-1..12
+1..13
 	*** swim_test_one_link ***
     1..6
     ok 1 - no rounds - no fullmesh
@@ -116,4 +116,17 @@ ok 11 - subtests
     ok 5 - drop rate = 90.00, but the failure is disseminated
 ok 12 - subtests
 	*** swim_test_packet_loss: done ***
+	*** swim_test_quit ***
+    1..9
+    ok 1 - 'quit' is sent to all the members without delays between dispatches
+    ok 2 - quited member S1 has returned and refuted the old status
+    ok 3 - another member S2 has taken the quited UUID
+    ok 4 - S3 did not add S1 back when received its 'quit'
+    ok 5 - S2 finally got 'quit' message from S1, but with its 'own' UUID - refute it
+    ok 6 - S3 sees S1 as left
+    ok 7 - S2 does not see S1 at all
+    ok 8 - after more time S1 is dropped from S3
+    ok 9 - and still is not added to S2 - left members can not be added
+ok 13 - subtests
+	*** swim_test_quit: done ***
 	*** main_f: done ***
diff --git a/test/unit/swim_test_utils.c b/test/unit/swim_test_utils.c
index 8964e345c..da8dd4386 100644
--- a/test/unit/swim_test_utils.c
+++ b/test/unit/swim_test_utils.c
@@ -197,6 +197,17 @@ swim_cluster_node(struct swim_cluster *cluster, int i)
 	return cluster->node[i].swim;
 }
 
+void
+swim_cluster_quit_node(struct swim_cluster *cluster, int i)
+{
+	assert(i >= 0 && i < cluster->size);
+	struct swim_node *n = &cluster->node[i];
+	assert(tt_uuid_is_equal(&n->uuid,
+				swim_member_uuid(swim_self(n->swim))));
+	swim_quit(n->swim);
+	n->swim = NULL;
+}
+
 void
 swim_cluster_restart_node(struct swim_cluster *cluster, int i)
 {
diff --git a/test/unit/swim_test_utils.h b/test/unit/swim_test_utils.h
index 6e172fb85..5e1c192a1 100644
--- a/test/unit/swim_test_utils.h
+++ b/test/unit/swim_test_utils.h
@@ -72,6 +72,10 @@ swim_error_check_match(const char *msg);
 struct swim *
 swim_cluster_node(struct swim_cluster *cluster, int i);
 
+/** Quit a member with id @a id. */
+void
+swim_cluster_quit_node(struct swim_cluster *cluster, int i);
+
 /** Drop and create again a SWIM instance with id @a i. */
 void
 swim_cluster_restart_node(struct swim_cluster *cluster, int i);
@@ -94,6 +98,14 @@ swim_cluster_set_drop(struct swim_cluster *cluster, int i, double value);
 int
 swim_cluster_add_link(struct swim_cluster *cluster, int to_id, int from_id);
 
+/** Add a bidirectional link between two SWIM instances. */
+static inline void
+swim_cluster_interconnect(struct swim_cluster *cluster, int to_id, int from_id)
+{
+	swim_cluster_add_link(cluster, to_id, from_id);
+	swim_cluster_add_link(cluster, from_id, to_id);
+}
+
 enum swim_member_status
 swim_cluster_member_status(struct swim_cluster *cluster, int node_id,
 			   int member_id);
-- 
2.17.2 (Apple Git-113)





More information about the Tarantool-patches mailing list