[Tarantool-patches] [DRAFT v1] replication: track information about replica

Sergey Kaplun skaplun at tarantool.org
Mon Jun 29 23:31:34 MSK 2020


This is a draft for the patch.
The patch allows to track information about changing relay state. At
every change of relay state timestamp, vclock, new state (and error
message if exists) will be saved at _cluster space.

The patch adds trigger list at relay, that is invoked when relay changes
its state.  The trigger that updates _cluster space is setted when a
replica is registered.
---

This is a draft for the patch. Nevertheless I would like to hear as much
criticism as possible. Also it's important for me to hear @kostja's
opinion here.

Originaly the task splits into the parts [1]:

1) Pass extra info from downstreams (box.info.listen) to be saved on
master. Persist it on master.
2) Persist last status change with its timestamp and vclock on master
for each downstream.

This patch is the second part. The most unclear moment for me is how to
separate local change attempts and applier's changes (see FIXME below).

As we've discussed with Alexander Turenko, there are several ways how
the first part can be implemented:

1) Use additional bytes inside greeting to transport information to
master. The bad thing is that greeting is about authorization, not the
additional information.

2) Add a new type of the request (eg IPROTO_INFO). An instance receiving
such a request should provide information about itself. So far, this is
only one field yet. Is it reasonable to add new protocol fields for one
feature?

Thoughts?

[1]: https://github.com/tarantool/tarantool/issues/3363#issuecomment-622382549

Branch: https://github.com/tarantool/tarantool/tree/skaplun/gh-3363-track-replica-status-change
Issue: https://github.com/tarantool/tarantool/issues/3363

 src/box/alter.cc        | 75 ++++++++++++++++++++++++++++++++++++++++-
 src/box/box.cc          | 10 +++++-
 src/box/lua/upgrade.lua |  8 +++++
 src/box/relay.cc        | 44 ++++++++++++++++++++++--
 src/box/relay.h         | 12 +++++++
 src/box/replication.cc  | 10 +++++-
 src/box/replication.h   | 14 ++++++++
 7 files changed, 167 insertions(+), 6 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index bb4254878..ab348f27e 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -49,6 +49,8 @@
 #include <stdio.h> /* snprintf() */
 #include <ctype.h>
 #include "replication.h" /* for replica_set_id() */
+#include "relay.h"
+#include "box/box.h"
 #include "session.h" /* to fetch the current user. */
 #include "vclock.h" /* VCLOCK_MAX */
 #include "xrow.h"
@@ -4165,6 +4167,61 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 	return 0;
 }
 
+static int
+relay_on_state_f(struct trigger *trigger, void *event)
+{
+	struct relay *relay = (struct relay *)event;
+	(void)trigger;
+	if (relay_get_state(relay) != RELAY_OFF) {
+		struct replica *replica = relay_replica(relay);
+		const struct tt_uuid *uuid = &replica->uuid;
+		assert(replica_by_uuid(uuid) != NULL);
+		assert(replica->id != REPLICA_ID_NIL);
+		if (boxk(IPROTO_UPDATE, BOX_CLUSTER_ID, "[%u]["
+		         "[%s%u%lf]" /* last row time */
+		         "[%s%u%s]" /* vclock */
+		         "[%s%u%s]" /* relay state */
+		         "]",
+		         (unsigned) replica->id,
+		         "=", 3, relay_last_row_time(relay),
+		         "=", 4, vclock_to_string(relay_vclock(relay)),
+		         "=", 5, relay_get_state_str(relay)
+		        ) != 0) {
+			diag_raise();
+		}
+		int rc;
+		switch (relay_get_state(relay)) {
+		case RELAY_STOPPED: {
+			struct error *e =
+				diag_last_error(relay_get_diag(relay));
+			assert(e != NULL);
+			rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+				 "[%u][[%s%u%s]]",
+				 (unsigned) replica->id,
+				 "=", 6, e->errmsg);
+			break;
+		}
+		case RELAY_FOLLOW:
+			rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+				 "[%u][[%s%uNIL]]",
+				 (unsigned) replica->id, "=", 6);
+			break;
+		default:
+			unreachable();
+		}
+		if (rc != 0)
+			diag_raise();
+	}
+	return 0;
+}
+
+static inline void
+relay_add_on_state(struct relay *relay, struct trigger *trigger)
+{
+	trigger_create(trigger, relay_on_state_f, NULL, NULL);
+	trigger_add(relay_on_state(relay), trigger);
+}
+
 /**
  * A record with id of the new instance has been synced to the
  * write ahead log. Update the cluster configuration cache
@@ -4183,9 +4240,15 @@ register_replica(struct trigger *trigger, void * /* event */)
 	struct replica *replica = replica_by_uuid(&uuid);
 	if (replica != NULL) {
 		replica_set_id(replica, id);
+		if (id != instance_id)
+			relay_add_on_state(replica->relay,
+				&replica->on_relay_state);
 	} else {
 		try {
 			replica = replicaset_add(id, &uuid);
+			if (id != instance_id)
+				relay_add_on_state(replica->relay,
+					&replica->on_relay_state);
 			/* Can't throw exceptions from on_commit trigger */
 		} catch(Exception *e) {
 			panic("Can't register replica: %s", e->errmsg);
@@ -4205,6 +4268,11 @@ unregister_replica(struct trigger *trigger, void * /* event */)
 
 	struct replica *replica = replica_by_uuid(&old_uuid);
 	assert(replica != NULL);
+	uint32_t replica_id;
+	if (tuple_field_u32(old_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
+		return -1;
+	if (replica_id != instance_id)
+		trigger_clear(&replica->on_relay_state);
 	replica_clear_id(replica);
 	return 0;
 }
@@ -4240,8 +4308,13 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
 		uint32_t replica_id;
 		if (tuple_field_u32(new_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
 			return -1;
-		if (replica_check_id(replica_id) != 0)
+		/*
+		 * FIXME how to separate local changes
+		 * from applier's changes?
+		 */
+		if (replica_check_id_format(replica_id) != 0)
 			return -1;
+		
 		tt_uuid replica_uuid;
 		if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID,
 				    &replica_uuid) != 0)
diff --git a/src/box/box.cc b/src/box/box.cc
index 7c8b70b27..684b5f520 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1479,7 +1479,15 @@ box_session_push(const char *data, const char *data_end)
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
-	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
+	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "["
+		 "%u" /* replica id */
+		 "%s" /* uuid */
+		 "NIL" /* ip+port */
+		 "NIL" /* timestamp */
+		 "NIL" /* vclock */
+		 "NIL" /* relay status */
+		 "NIL" /* err str if exist*/
+		 "]",
 		 (unsigned) id, tt_uuid_str(uuid)) != 0)
 		diag_raise();
 	assert(replica_by_uuid(uuid)->id == id);
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index 075cc236e..ff748ccb1 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -270,6 +270,14 @@ local function initial_1_7_5()
     format = {}
     format[1] = {name='id', type='unsigned'}
     format[2] = {name='uuid', type='string'}
+    -- additional fields have to be nullable because they useless
+    -- for instance itself
+    format[3] = {name='host', type='string', is_nullable = true}
+    format[4] = {name='relay_timestamp', type='double', is_nullable = true}
+    format[5] = {name='relay_vclock', type='string', is_nullable = true}
+    format[6] = {name='relay_status', type='string', is_nullable = true}
+    -- sets only if status is RELAY_STOPPED
+    format[7] = {name='relay_err', type='map', is_nullable = true}
     _space:insert{_cluster.id, ADMIN, '_cluster', 'memtx', 0, MAP, format}
     -- primary key: node id
     log.info("create index primary on _cluster")
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..7f71ad433 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -139,6 +139,8 @@ struct relay {
 	double last_row_time;
 	/** Relay sync state. */
 	enum relay_state state;
+	/** Triggers invoked on state change. */
+	struct rlist on_state;
 
 	struct {
 		/* Align to prevent false-sharing with tx thread */
@@ -148,6 +150,40 @@ struct relay {
 	} tx;
 };
 
+const char *
+relay_get_state_str(const struct relay *relay)
+{
+	switch(relay->state) {
+	case RELAY_OFF:
+		return "off";
+	case RELAY_FOLLOW:
+		return "follow";
+	case RELAY_STOPPED:
+		return "stopped";
+	default:
+		return "<unknown relay state>";
+	}
+}
+
+static inline void
+relay_set_state(struct relay *relay, enum relay_state state)
+{
+	relay->state = state;
+	trigger_run_xc(&relay->on_state, relay);
+}
+
+struct rlist *
+relay_on_state(struct relay *relay)
+{
+	return &relay->on_state;
+}
+
+struct replica *
+relay_replica(struct relay *relay)
+{
+	return relay->replica;
+}
+
 struct diag*
 relay_get_diag(struct relay *relay)
 {
@@ -193,7 +229,8 @@ relay_new(struct replica *replica)
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
-	relay->state = RELAY_OFF;
+	rlist_create(&relay->on_state);
+	relay_set_state(relay, RELAY_OFF);
 	return relay;
 }
 
@@ -210,7 +247,7 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	diag_clear(&relay->diag);
 	coio_create(&relay->io, fd);
 	relay->sync = sync;
-	relay->state = RELAY_FOLLOW;
+	relay_set_state(relay, RELAY_FOLLOW);
 	relay->last_row_time = ev_monotonic_now(loop());
 }
 
@@ -257,7 +294,7 @@ relay_stop(struct relay *relay)
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
-	relay->state = RELAY_STOPPED;
+	relay_set_state(relay, RELAY_STOPPED);
 	/*
 	 * Needed to track whether relay thread is running or not
 	 * for relay_cancel(). Id is reset to a positive value
@@ -272,6 +309,7 @@ relay_delete(struct relay *relay)
 	if (relay->state == RELAY_FOLLOW)
 		relay_stop(relay);
 	fiber_cond_destroy(&relay->reader_cond);
+	trigger_destroy(&relay->on_state);
 	diag_destroy(&relay->diag);
 	TRASH(relay);
 	free(relay);
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..ea9567ad2 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -77,6 +77,10 @@ relay_get_diag(struct relay *relay);
 enum relay_state
 relay_get_state(const struct relay *relay);
 
+/** Return the current state of relay as a string. */
+const char *
+relay_get_state_str(const struct relay *relay);
+
 /**
  * Returns relay's vclock
  * @param relay relay
@@ -127,4 +131,12 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
 		uint32_t replica_id_filter);
 
+/** Get pointer to triggers list. */
+struct rlist *
+relay_on_state(struct relay *relay);
+
+/** Return replica associated with this relay. */
+struct replica *
+relay_replica(struct relay *relay);
+
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 273a7cb66..538e6e1b4 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -114,7 +114,7 @@ replication_free(void)
 }
 
 int
-replica_check_id(uint32_t replica_id)
+replica_check_id_format(uint32_t replica_id)
 {
 	if (replica_id == REPLICA_ID_NIL) {
 		diag_set(ClientError, ER_REPLICA_ID_IS_RESERVED,
@@ -126,6 +126,14 @@ replica_check_id(uint32_t replica_id)
 			  (unsigned) replica_id);
 		return -1;
 	}
+	return 0;
+}
+
+int
+replica_check_id(uint32_t replica_id)
+{
+	if (replica_check_id_format(replica_id) != 0)
+		return -1;
 	/*
 	 * It's okay to update the instance id while it is joining to
 	 * a cluster as long as the id is set by the time bootstrap is
diff --git a/src/box/replication.h b/src/box/replication.h
index 93a25c8a7..fedfa3478 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -291,6 +291,10 @@ struct replica {
 	 * Trigger invoked when the applier changes its state.
 	 */
 	struct trigger on_applier_state;
+	/**
+	 * Trigger invoked when the relay changes its state.
+	 */
+	struct trigger on_relay_state;
 	/**
 	 * During initial connect or reconnect we require applier
 	 * to sync with the master before the replica can leave
@@ -372,6 +376,16 @@ replica_on_relay_stop(struct replica *replica);
 #if defined(__cplusplus)
 } /* extern "C" */
 
+/**
+ * Check format id.
+ */
+int
+replica_check_id_format(uint32_t replica_id);
+
+/**
+ * Check format id and check replica is not RO at joining
+ * to master.
+ */
 int
 replica_check_id(uint32_t replica_id);
 
-- 
2.24.1



More information about the Tarantool-patches mailing list