Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [DRAFT v1] replication: track information about replica
@ 2020-06-29 20:31 Sergey Kaplun
  2020-07-02 15:53 ` Cyrill Gorcunov
  0 siblings, 1 reply; 3+ messages in thread
From: Sergey Kaplun @ 2020-06-29 20:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Vladislav Shpilevoy

This is a draft for the patch.
The patch allows to track information about changing relay state. At
every change of relay state timestamp, vclock, new state (and error
message if exists) will be saved at _cluster space.

The patch adds trigger list at relay, that is invoked when relay changes
its state.  The trigger that updates _cluster space is setted when a
replica is registered.
---

This is a draft for the patch. Nevertheless I would like to hear as much
criticism as possible. Also it's important for me to hear @kostja's
opinion here.

Originaly the task splits into the parts [1]:

1) Pass extra info from downstreams (box.info.listen) to be saved on
master. Persist it on master.
2) Persist last status change with its timestamp and vclock on master
for each downstream.

This patch is the second part. The most unclear moment for me is how to
separate local change attempts and applier's changes (see FIXME below).

As we've discussed with Alexander Turenko, there are several ways how
the first part can be implemented:

1) Use additional bytes inside greeting to transport information to
master. The bad thing is that greeting is about authorization, not the
additional information.

2) Add a new type of the request (eg IPROTO_INFO). An instance receiving
such a request should provide information about itself. So far, this is
only one field yet. Is it reasonable to add new protocol fields for one
feature?

Thoughts?

[1]: https://github.com/tarantool/tarantool/issues/3363#issuecomment-622382549

Branch: https://github.com/tarantool/tarantool/tree/skaplun/gh-3363-track-replica-status-change
Issue: https://github.com/tarantool/tarantool/issues/3363

 src/box/alter.cc        | 75 ++++++++++++++++++++++++++++++++++++++++-
 src/box/box.cc          | 10 +++++-
 src/box/lua/upgrade.lua |  8 +++++
 src/box/relay.cc        | 44 ++++++++++++++++++++++--
 src/box/relay.h         | 12 +++++++
 src/box/replication.cc  | 10 +++++-
 src/box/replication.h   | 14 ++++++++
 7 files changed, 167 insertions(+), 6 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index bb4254878..ab348f27e 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -49,6 +49,8 @@
 #include <stdio.h> /* snprintf() */
 #include <ctype.h>
 #include "replication.h" /* for replica_set_id() */
+#include "relay.h"
+#include "box/box.h"
 #include "session.h" /* to fetch the current user. */
 #include "vclock.h" /* VCLOCK_MAX */
 #include "xrow.h"
@@ -4165,6 +4167,61 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 	return 0;
 }
 
+static int
+relay_on_state_f(struct trigger *trigger, void *event)
+{
+	struct relay *relay = (struct relay *)event;
+	(void)trigger;
+	if (relay_get_state(relay) != RELAY_OFF) {
+		struct replica *replica = relay_replica(relay);
+		const struct tt_uuid *uuid = &replica->uuid;
+		assert(replica_by_uuid(uuid) != NULL);
+		assert(replica->id != REPLICA_ID_NIL);
+		if (boxk(IPROTO_UPDATE, BOX_CLUSTER_ID, "[%u]["
+		         "[%s%u%lf]" /* last row time */
+		         "[%s%u%s]" /* vclock */
+		         "[%s%u%s]" /* relay state */
+		         "]",
+		         (unsigned) replica->id,
+		         "=", 3, relay_last_row_time(relay),
+		         "=", 4, vclock_to_string(relay_vclock(relay)),
+		         "=", 5, relay_get_state_str(relay)
+		        ) != 0) {
+			diag_raise();
+		}
+		int rc;
+		switch (relay_get_state(relay)) {
+		case RELAY_STOPPED: {
+			struct error *e =
+				diag_last_error(relay_get_diag(relay));
+			assert(e != NULL);
+			rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+				 "[%u][[%s%u%s]]",
+				 (unsigned) replica->id,
+				 "=", 6, e->errmsg);
+			break;
+		}
+		case RELAY_FOLLOW:
+			rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+				 "[%u][[%s%uNIL]]",
+				 (unsigned) replica->id, "=", 6);
+			break;
+		default:
+			unreachable();
+		}
+		if (rc != 0)
+			diag_raise();
+	}
+	return 0;
+}
+
+static inline void
+relay_add_on_state(struct relay *relay, struct trigger *trigger)
+{
+	trigger_create(trigger, relay_on_state_f, NULL, NULL);
+	trigger_add(relay_on_state(relay), trigger);
+}
+
 /**
  * A record with id of the new instance has been synced to the
  * write ahead log. Update the cluster configuration cache
@@ -4183,9 +4240,15 @@ register_replica(struct trigger *trigger, void * /* event */)
 	struct replica *replica = replica_by_uuid(&uuid);
 	if (replica != NULL) {
 		replica_set_id(replica, id);
+		if (id != instance_id)
+			relay_add_on_state(replica->relay,
+				&replica->on_relay_state);
 	} else {
 		try {
 			replica = replicaset_add(id, &uuid);
+			if (id != instance_id)
+				relay_add_on_state(replica->relay,
+					&replica->on_relay_state);
 			/* Can't throw exceptions from on_commit trigger */
 		} catch(Exception *e) {
 			panic("Can't register replica: %s", e->errmsg);
@@ -4205,6 +4268,11 @@ unregister_replica(struct trigger *trigger, void * /* event */)
 
 	struct replica *replica = replica_by_uuid(&old_uuid);
 	assert(replica != NULL);
+	uint32_t replica_id;
+	if (tuple_field_u32(old_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
+		return -1;
+	if (replica_id != instance_id)
+		trigger_clear(&replica->on_relay_state);
 	replica_clear_id(replica);
 	return 0;
 }
@@ -4240,8 +4308,13 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
 		uint32_t replica_id;
 		if (tuple_field_u32(new_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
 			return -1;
-		if (replica_check_id(replica_id) != 0)
+		/*
+		 * FIXME how to separate local changes
+		 * from applier's changes?
+		 */
+		if (replica_check_id_format(replica_id) != 0)
 			return -1;
+		
 		tt_uuid replica_uuid;
 		if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID,
 				    &replica_uuid) != 0)
diff --git a/src/box/box.cc b/src/box/box.cc
index 7c8b70b27..684b5f520 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1479,7 +1479,15 @@ box_session_push(const char *data, const char *data_end)
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
-	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
+	if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "["
+		 "%u" /* replica id */
+		 "%s" /* uuid */
+		 "NIL" /* ip+port */
+		 "NIL" /* timestamp */
+		 "NIL" /* vclock */
+		 "NIL" /* relay status */
+		 "NIL" /* err str if exist*/
+		 "]",
 		 (unsigned) id, tt_uuid_str(uuid)) != 0)
 		diag_raise();
 	assert(replica_by_uuid(uuid)->id == id);
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index 075cc236e..ff748ccb1 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -270,6 +270,14 @@ local function initial_1_7_5()
     format = {}
     format[1] = {name='id', type='unsigned'}
     format[2] = {name='uuid', type='string'}
+    -- additional fields have to be nullable because they useless
+    -- for instance itself
+    format[3] = {name='host', type='string', is_nullable = true}
+    format[4] = {name='relay_timestamp', type='double', is_nullable = true}
+    format[5] = {name='relay_vclock', type='string', is_nullable = true}
+    format[6] = {name='relay_status', type='string', is_nullable = true}
+    -- sets only if status is RELAY_STOPPED
+    format[7] = {name='relay_err', type='map', is_nullable = true}
     _space:insert{_cluster.id, ADMIN, '_cluster', 'memtx', 0, MAP, format}
     -- primary key: node id
     log.info("create index primary on _cluster")
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..7f71ad433 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -139,6 +139,8 @@ struct relay {
 	double last_row_time;
 	/** Relay sync state. */
 	enum relay_state state;
+	/** Triggers invoked on state change. */
+	struct rlist on_state;
 
 	struct {
 		/* Align to prevent false-sharing with tx thread */
@@ -148,6 +150,40 @@ struct relay {
 	} tx;
 };
 
+const char *
+relay_get_state_str(const struct relay *relay)
+{
+	switch(relay->state) {
+	case RELAY_OFF:
+		return "off";
+	case RELAY_FOLLOW:
+		return "follow";
+	case RELAY_STOPPED:
+		return "stopped";
+	default:
+		return "<unknown relay state>";
+	}
+}
+
+static inline void
+relay_set_state(struct relay *relay, enum relay_state state)
+{
+	relay->state = state;
+	trigger_run_xc(&relay->on_state, relay);
+}
+
+struct rlist *
+relay_on_state(struct relay *relay)
+{
+	return &relay->on_state;
+}
+
+struct replica *
+relay_replica(struct relay *relay)
+{
+	return relay->replica;
+}
+
 struct diag*
 relay_get_diag(struct relay *relay)
 {
@@ -193,7 +229,8 @@ relay_new(struct replica *replica)
 	fiber_cond_create(&relay->reader_cond);
 	diag_create(&relay->diag);
 	stailq_create(&relay->pending_gc);
-	relay->state = RELAY_OFF;
+	rlist_create(&relay->on_state);
+	relay_set_state(relay, RELAY_OFF);
 	return relay;
 }
 
@@ -210,7 +247,7 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
 	diag_clear(&relay->diag);
 	coio_create(&relay->io, fd);
 	relay->sync = sync;
-	relay->state = RELAY_FOLLOW;
+	relay_set_state(relay, RELAY_FOLLOW);
 	relay->last_row_time = ev_monotonic_now(loop());
 }
 
@@ -257,7 +294,7 @@ relay_stop(struct relay *relay)
 	if (relay->r != NULL)
 		recovery_delete(relay->r);
 	relay->r = NULL;
-	relay->state = RELAY_STOPPED;
+	relay_set_state(relay, RELAY_STOPPED);
 	/*
 	 * Needed to track whether relay thread is running or not
 	 * for relay_cancel(). Id is reset to a positive value
@@ -272,6 +309,7 @@ relay_delete(struct relay *relay)
 	if (relay->state == RELAY_FOLLOW)
 		relay_stop(relay);
 	fiber_cond_destroy(&relay->reader_cond);
+	trigger_destroy(&relay->on_state);
 	diag_destroy(&relay->diag);
 	TRASH(relay);
 	free(relay);
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..ea9567ad2 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -77,6 +77,10 @@ relay_get_diag(struct relay *relay);
 enum relay_state
 relay_get_state(const struct relay *relay);
 
+/** Return the current state of relay as a string. */
+const char *
+relay_get_state_str(const struct relay *relay);
+
 /**
  * Returns relay's vclock
  * @param relay relay
@@ -127,4 +131,12 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 		struct vclock *replica_vclock, uint32_t replica_version_id,
 		uint32_t replica_id_filter);
 
+/** Get pointer to triggers list. */
+struct rlist *
+relay_on_state(struct relay *relay);
+
+/** Return replica associated with this relay. */
+struct replica *
+relay_replica(struct relay *relay);
+
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 273a7cb66..538e6e1b4 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -114,7 +114,7 @@ replication_free(void)
 }
 
 int
-replica_check_id(uint32_t replica_id)
+replica_check_id_format(uint32_t replica_id)
 {
 	if (replica_id == REPLICA_ID_NIL) {
 		diag_set(ClientError, ER_REPLICA_ID_IS_RESERVED,
@@ -126,6 +126,14 @@ replica_check_id(uint32_t replica_id)
 			  (unsigned) replica_id);
 		return -1;
 	}
+	return 0;
+}
+
+int
+replica_check_id(uint32_t replica_id)
+{
+	if (replica_check_id_format(replica_id) != 0)
+		return -1;
 	/*
 	 * It's okay to update the instance id while it is joining to
 	 * a cluster as long as the id is set by the time bootstrap is
diff --git a/src/box/replication.h b/src/box/replication.h
index 93a25c8a7..fedfa3478 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -291,6 +291,10 @@ struct replica {
 	 * Trigger invoked when the applier changes its state.
 	 */
 	struct trigger on_applier_state;
+	/**
+	 * Trigger invoked when the relay changes its state.
+	 */
+	struct trigger on_relay_state;
 	/**
 	 * During initial connect or reconnect we require applier
 	 * to sync with the master before the replica can leave
@@ -372,6 +376,16 @@ replica_on_relay_stop(struct replica *replica);
 #if defined(__cplusplus)
 } /* extern "C" */
 
+/**
+ * Check format id.
+ */
+int
+replica_check_id_format(uint32_t replica_id);
+
+/**
+ * Check format id and check replica is not RO at joining
+ * to master.
+ */
 int
 replica_check_id(uint32_t replica_id);
 
-- 
2.24.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Tarantool-patches] [DRAFT v1] replication: track information about replica
  2020-06-29 20:31 [Tarantool-patches] [DRAFT v1] replication: track information about replica Sergey Kaplun
@ 2020-07-02 15:53 ` Cyrill Gorcunov
  2020-07-02 19:40   ` Sergey Kaplun
  0 siblings, 1 reply; 3+ messages in thread
From: Cyrill Gorcunov @ 2020-07-02 15:53 UTC (permalink / raw)
  To: Sergey Kaplun; +Cc: tarantool-patches, Vladislav Shpilevoy

On Mon, Jun 29, 2020 at 11:31:34PM +0300, Sergey Kaplun wrote:
...
>  
> +static int
> +relay_on_state_f(struct trigger *trigger, void *event)
> +{

Sergey, I'll review the patch with more attention tomorrow, still
here are some notes which I can point early.

 - please don't name the trigger function with _f postfix, we usually
   do this for cord thread runners (I know here are a few exceptions
   but they are simply misnamed)

> +	struct relay *relay = (struct relay *)event;
> +	(void)trigger;

You can easily exit early

	if (relay_get_state(relay) == RELAY_OFF)
		return 0;

shifting the rest of the code left. Still up to you I do not insist.

> +	if (relay_get_state(relay) != RELAY_OFF) {
> +		struct replica *replica = relay_replica(relay);
> +		const struct tt_uuid *uuid = &replica->uuid;
> +		assert(replica_by_uuid(uuid) != NULL);
> +		assert(replica->id != REPLICA_ID_NIL);
> +		if (boxk(IPROTO_UPDATE, BOX_CLUSTER_ID, "[%u]["
> +		         "[%s%u%lf]" /* last row time */
> +		         "[%s%u%s]" /* vclock */
> +		         "[%s%u%s]" /* relay state */
> +		         "]",
> +		         (unsigned) replica->id,
> +		         "=", 3, relay_last_row_time(relay),
> +		         "=", 4, vclock_to_string(relay_vclock(relay)),
> +		         "=", 5, relay_get_state_str(relay)
> +		        ) != 0) {
> +			diag_raise();
> +		}

Do we really need to call diag_raise() here?

> +		int rc;
> +		switch (relay_get_state(relay)) {
...
> +		}
> +		if (rc != 0)
> +			diag_raise();

And here. IIRC usually triggers simply return -1 on error.
The left of the patch is trimmed for now.

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [Tarantool-patches] [DRAFT v1] replication: track information about replica
  2020-07-02 15:53 ` Cyrill Gorcunov
@ 2020-07-02 19:40   ` Sergey Kaplun
  0 siblings, 0 replies; 3+ messages in thread
From: Sergey Kaplun @ 2020-07-02 19:40 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tarantool-patches, Vladislav Shpilevoy

Hi! Thanks for the review!

On 02.07.20, Cyrill Gorcunov wrote:
> On Mon, Jun 29, 2020 at 11:31:34PM +0300, Sergey Kaplun wrote:
> ...
> >  
> > +static int
> > +relay_on_state_f(struct trigger *trigger, void *event)
> > +{
> 
> Sergey, I'll review the patch with more attention tomorrow, still
> here are some notes which I can point early.
> 
>  - please don't name the trigger function with _f postfix, we usually
>    do this for cord thread runners (I know here are a few exceptions
>    but they are simply misnamed)
> 

Yup, thanks!

> > +	struct relay *relay = (struct relay *)event;
> > +	(void)trigger;
> 
> You can easily exit early
> 
> 	if (relay_get_state(relay) == RELAY_OFF)
> 		return 0;
> 
> shifting the rest of the code left. Still up to you I do not insist.
> 

Nice point! Early return looks much better!

> 
> Do we really need to call diag_raise() here?
> 
> > +		int rc;
> > +		switch (relay_get_state(relay)) {
> ...
> > +		}
> > +		if (rc != 0)
> > +			diag_raise();
> 
> And here. IIRC usually triggers simply return -1 on error.
> The left of the patch is trimmed for now.

I think we should not raise an error inside relay. Is say_error enough
if something goes wrong?

I will send new version with separate letter.

-- 
Best regards,
Sergey Kaplun

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2020-07-02 19:41 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-06-29 20:31 [Tarantool-patches] [DRAFT v1] replication: track information about replica Sergey Kaplun
2020-07-02 15:53 ` Cyrill Gorcunov
2020-07-02 19:40   ` Sergey Kaplun

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox