[Tarantool-patches] [DRAFT v2] replication: track information about replica
Sergey Kaplun
skaplun at tarantool.org
Thu Jul 2 23:21:41 MSK 2020
This is a draft for the patch.
The patch allows to track information about changing relay state. At
every change of relay state timestamp, vclock, new state (and error
message if exists) will be saved at _cluster space.
The patch adds trigger list at relay, that is invoked when relay changes
its state. The trigger that updates _cluster space is setted when a
replica is registered.
---
This is a draft for the patch. Nevertheless I would like to hear as much
criticism as possible. Also it's important for me to hear @kostja's
opinion here.
Originaly the task splits into the parts [1]:
1) Pass extra info from downstreams (box.info.listen) to be saved on
master. Persist it on master.
2) Persist last status change with its timestamp and vclock on master
for each downstream.
This patch is the second part.
As we've discussed with Alexander Turenko, there are several ways how
the first part can be implemented:
1) Use additional bytes inside greeting to transport information to
master. The bad thing is that greeting is about authorization, not the
additional information.
2) Add a new type of the request (eg IPROTO_INFO). An instance receiving
such a request should provide information about itself. So far, this is
only one field yet. Is it reasonable to add new protocol fields for one
feature?
Thoughts?
[1]: https://github.com/tarantool/tarantool/issues/3363#issuecomment-622382549
Branch: https://github.com/tarantool/tarantool/tree/skaplun/gh-3363-track-replica-status-change
Issue: https://github.com/tarantool/tarantool/issues/3363
src/box/alter.cc | 90 ++++++++++++++++++++++++++++++++++++++++-
src/box/box.cc | 10 ++++-
src/box/lua/upgrade.lua | 8 ++++
src/box/relay.cc | 45 +++++++++++++++++++--
src/box/relay.h | 12 ++++++
src/box/replication.cc | 25 +++++++++---
src/box/replication.h | 14 +++++++
7 files changed, 194 insertions(+), 10 deletions(-)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index bb4254878..edd180450 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -49,6 +49,8 @@
#include <stdio.h> /* snprintf() */
#include <ctype.h>
#include "replication.h" /* for replica_set_id() */
+#include "relay.h"
+#include "box/box.h"
#include "session.h" /* to fetch the current user. */
#include "vclock.h" /* VCLOCK_MAX */
#include "xrow.h"
@@ -4165,6 +4167,65 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
return 0;
}
+static int
+relay_on_state_change(struct trigger *trigger, void *event)
+{
+ struct relay *relay = (struct relay *)event;
+ (void)trigger;
+ if (relay_get_state(relay) == RELAY_OFF)
+ return 0;
+ struct replica *replica = relay_replica(relay);
+ const struct tt_uuid *uuid = &replica->uuid;
+ assert(replica_by_uuid(uuid) != NULL);
+ assert(replica->id != REPLICA_ID_NIL);
+ struct credentials *orig_credentials = effective_user();
+ fiber_set_user(fiber(), &admin_credentials);
+ int rc;
+ if ((rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID, "[%u]["
+ "[%s%u%lf]" /* last row time */
+ "[%s%u%s]" /* vclock */
+ "[%s%u%s]" /* relay state */
+ "]",
+ (unsigned) replica->id,
+ "=", 3, relay_last_row_time(relay),
+ "=", 4, vclock_to_string(relay_vclock(relay)),
+ "=", 5, relay_get_state_str(relay)
+ )) != 0) {
+ goto restore_cred;
+ }
+ switch (relay_get_state(relay)) {
+ case RELAY_STOPPED: {
+ struct error *e =
+ diag_last_error(relay_get_diag(relay));
+ if (e != NULL)
+ rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+ "[%u][[%s%u%s]]",
+ (unsigned) replica->id,
+ "=", 6, e->errmsg);
+ break;
+ }
+ case RELAY_FOLLOW:
+ rc = boxk(IPROTO_UPDATE, BOX_CLUSTER_ID,
+ "[%u][[%s%uNIL]]",
+ (unsigned) replica->id, "=", 6);
+ break;
+ default:
+ unreachable();
+ }
+restore_cred:
+ fiber_set_user(fiber(), orig_credentials);
+ if (rc != 0)
+ return -1;
+ return 0;
+}
+
+static inline void
+relay_add_on_state(struct relay *relay, struct trigger *trigger)
+{
+ trigger_create(trigger, relay_on_state_change, NULL, NULL);
+ trigger_add(relay_on_state(relay), trigger);
+}
+
/**
* A record with id of the new instance has been synced to the
* write ahead log. Update the cluster configuration cache
@@ -4183,9 +4244,15 @@ register_replica(struct trigger *trigger, void * /* event */)
struct replica *replica = replica_by_uuid(&uuid);
if (replica != NULL) {
replica_set_id(replica, id);
+ if (id != instance_id)
+ relay_add_on_state(replica->relay,
+ &replica->on_relay_state);
} else {
try {
replica = replicaset_add(id, &uuid);
+ if (id != instance_id)
+ relay_add_on_state(replica->relay,
+ &replica->on_relay_state);
/* Can't throw exceptions from on_commit trigger */
} catch(Exception *e) {
panic("Can't register replica: %s", e->errmsg);
@@ -4205,6 +4272,11 @@ unregister_replica(struct trigger *trigger, void * /* event */)
struct replica *replica = replica_by_uuid(&old_uuid);
assert(replica != NULL);
+ uint32_t replica_id;
+ if (tuple_field_u32(old_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
+ return -1;
+ if (replica_id != instance_id)
+ trigger_clear(&replica->on_relay_state);
replica_clear_id(replica);
return 0;
}
@@ -4240,8 +4312,24 @@ on_replace_dd_cluster(struct trigger *trigger, void *event)
uint32_t replica_id;
if (tuple_field_u32(new_tuple, BOX_CLUSTER_FIELD_ID, &replica_id) != 0)
return -1;
- if (replica_check_id(replica_id) != 0)
+ /*
+ * Request not from instance itself produces
+ * by applier to replicate addititonal
+ * information about our replica.
+ */
+ struct request request;
+ xrow_decode_dml_xc(stmt->row, &request,
+ dml_request_key_map(stmt->row->type));
+ uint32_t request_replica_id = request.header->replica_id;
+ if (request_replica_id != instance_id &&
+ request_replica_id != REPLICA_ID_NIL) {
+ if (replica_check_id_format(replica_id) != 0)
return -1;
+ } else {
+ if (replica_check_id(replica_id) != 0)
+ return -1;
+ }
+
tt_uuid replica_uuid;
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID,
&replica_uuid) != 0)
diff --git a/src/box/box.cc b/src/box/box.cc
index 7c8b70b27..684b5f520 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1479,7 +1479,15 @@ box_session_push(const char *data, const char *data_end)
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
- if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "[%u%s]",
+ if (boxk(IPROTO_INSERT, BOX_CLUSTER_ID, "["
+ "%u" /* replica id */
+ "%s" /* uuid */
+ "NIL" /* ip+port */
+ "NIL" /* timestamp */
+ "NIL" /* vclock */
+ "NIL" /* relay status */
+ "NIL" /* err str if exist*/
+ "]",
(unsigned) id, tt_uuid_str(uuid)) != 0)
diag_raise();
assert(replica_by_uuid(uuid)->id == id);
diff --git a/src/box/lua/upgrade.lua b/src/box/lua/upgrade.lua
index 075cc236e..ff748ccb1 100644
--- a/src/box/lua/upgrade.lua
+++ b/src/box/lua/upgrade.lua
@@ -270,6 +270,14 @@ local function initial_1_7_5()
format = {}
format[1] = {name='id', type='unsigned'}
format[2] = {name='uuid', type='string'}
+ -- additional fields have to be nullable because they useless
+ -- for instance itself
+ format[3] = {name='host', type='string', is_nullable = true}
+ format[4] = {name='relay_timestamp', type='double', is_nullable = true}
+ format[5] = {name='relay_vclock', type='string', is_nullable = true}
+ format[6] = {name='relay_status', type='string', is_nullable = true}
+ -- sets only if status is RELAY_STOPPED
+ format[7] = {name='relay_err', type='map', is_nullable = true}
_space:insert{_cluster.id, ADMIN, '_cluster', 'memtx', 0, MAP, format}
-- primary key: node id
log.info("create index primary on _cluster")
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 2ad02cb8a..720893509 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -139,6 +139,8 @@ struct relay {
double last_row_time;
/** Relay sync state. */
enum relay_state state;
+ /** Triggers invoked on state change. */
+ struct rlist on_state;
struct {
/* Align to prevent false-sharing with tx thread */
@@ -148,6 +150,41 @@ struct relay {
} tx;
};
+const char *
+relay_get_state_str(const struct relay *relay)
+{
+ switch(relay->state) {
+ case RELAY_OFF:
+ return "off";
+ case RELAY_FOLLOW:
+ return "follow";
+ case RELAY_STOPPED:
+ return "stopped";
+ default:
+ return "<unknown relay state>";
+ }
+}
+
+static inline void
+relay_set_state(struct relay *relay, enum relay_state state)
+{
+ relay->state = state;
+ /* FIXME Is say_error enough? */
+ trigger_run(&relay->on_state, relay);
+}
+
+struct rlist *
+relay_on_state(struct relay *relay)
+{
+ return &relay->on_state;
+}
+
+struct replica *
+relay_replica(struct relay *relay)
+{
+ return relay->replica;
+}
+
struct diag*
relay_get_diag(struct relay *relay)
{
@@ -193,7 +230,8 @@ relay_new(struct replica *replica)
fiber_cond_create(&relay->reader_cond);
diag_create(&relay->diag);
stailq_create(&relay->pending_gc);
- relay->state = RELAY_OFF;
+ rlist_create(&relay->on_state);
+ relay_set_state(relay, RELAY_OFF);
return relay;
}
@@ -210,7 +248,7 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
diag_clear(&relay->diag);
coio_create(&relay->io, fd);
relay->sync = sync;
- relay->state = RELAY_FOLLOW;
+ relay_set_state(relay, RELAY_FOLLOW);
relay->last_row_time = ev_monotonic_now(loop());
}
@@ -257,7 +295,7 @@ relay_stop(struct relay *relay)
if (relay->r != NULL)
recovery_delete(relay->r);
relay->r = NULL;
- relay->state = RELAY_STOPPED;
+ relay_set_state(relay, RELAY_STOPPED);
/*
* Needed to track whether relay thread is running or not
* for relay_cancel(). Id is reset to a positive value
@@ -272,6 +310,7 @@ relay_delete(struct relay *relay)
if (relay->state == RELAY_FOLLOW)
relay_stop(relay);
fiber_cond_destroy(&relay->reader_cond);
+ trigger_destroy(&relay->on_state);
diag_destroy(&relay->diag);
TRASH(relay);
free(relay);
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..ea9567ad2 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -77,6 +77,10 @@ relay_get_diag(struct relay *relay);
enum relay_state
relay_get_state(const struct relay *relay);
+/** Return the current state of relay as a string. */
+const char *
+relay_get_state_str(const struct relay *relay);
+
/**
* Returns relay's vclock
* @param relay relay
@@ -127,4 +131,12 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
struct vclock *replica_vclock, uint32_t replica_version_id,
uint32_t replica_id_filter);
+/** Get pointer to triggers list. */
+struct rlist *
+relay_on_state(struct relay *relay);
+
+/** Return replica associated with this relay. */
+struct replica *
+relay_replica(struct relay *relay);
+
#endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 273a7cb66..4688401e1 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -114,7 +114,7 @@ replication_free(void)
}
int
-replica_check_id(uint32_t replica_id)
+replica_check_id_format(uint32_t replica_id)
{
if (replica_id == REPLICA_ID_NIL) {
diag_set(ClientError, ER_REPLICA_ID_IS_RESERVED,
@@ -126,6 +126,14 @@ replica_check_id(uint32_t replica_id)
(unsigned) replica_id);
return -1;
}
+ return 0;
+}
+
+int
+replica_check_id(uint32_t replica_id)
+{
+ if (replica_check_id_format(replica_id) != 0)
+ return -1;
/*
* It's okay to update the instance id while it is joining to
* a cluster as long as the id is set by the time bootstrap is
@@ -900,13 +908,20 @@ replica_on_relay_stop(struct replica *replica)
* collector then. See also replica_clear_id.
*/
if (replica->id == REPLICA_ID_NIL) {
- if (!replica->anon) {
- gc_consumer_unregister(replica->gc);
- replica->gc = NULL;
- } else {
+ if (replica->anon) {
assert(replica->gc == NULL);
assert(replicaset.anon_count > 0);
replicaset.anon_count--;
+ } else if (replica->gc != NULL) {
+ /*
+ * As soon as the relay can run triggers
+ * when the state of the relay changes,
+ * it is possible that running fiber will
+ * return control to another one
+ * that unregisters the gc consumer.
+ */
+ gc_consumer_unregister(replica->gc);
+ replica->gc = NULL;
}
}
if (replica_is_orphan(replica)) {
diff --git a/src/box/replication.h b/src/box/replication.h
index 93a25c8a7..731b34029 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -291,6 +291,10 @@ struct replica {
* Trigger invoked when the applier changes its state.
*/
struct trigger on_applier_state;
+ /**
+ * Trigger invoked when the relay changes its state.
+ */
+ struct trigger on_relay_state;
/**
* During initial connect or reconnect we require applier
* to sync with the master before the replica can leave
@@ -372,6 +376,16 @@ replica_on_relay_stop(struct replica *replica);
#if defined(__cplusplus)
} /* extern "C" */
+/**
+ * Check format id.
+ */
+int
+replica_check_id_format(uint32_t replica_id);
+
+/**
+ * Check format id and check instance id is not same as local
+ * after joining to master.
+ */
int
replica_check_id(uint32_t replica_id);
--
2.24.1
More information about the Tarantool-patches
mailing list