Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v2 00/10] dRaft
@ 2020-09-03 22:51 Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
                   ` (9 more replies)
  0 siblings, 10 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Raft leader election implementation. This version of the patchset passes the
existing tests, since Raft is disabled by default. New tests were not added yet.

Changes in v2:
- Fixed a bug when bootstrap couldn't finish if the first leader restarted
  before quorum of replicas finished bootstrap too;
- Added box.info.raft;
- A lot of new comments, fixed typos, and squashes between the commits.

Branch: http://github.com/tarantool/tarantool/tree/gh-1146-raft
Issue: https://github.com/tarantool/tarantool/issues/1146

Vladislav Shpilevoy (9):
  applier: store instance_id in struct applier
  box: introduce summary RO flag
  wal: don't touch box.cfg.wal_dir more than once
  replication: track registered replica count
  raft: introduce persistent raft state
  raft: introduce box.cfg.raft_* options
  [tosquash] raft: pass source instance_id to raft_process_msg()
  raft: introduce state machine
  raft: introduce box.info.raft

sergepetrenko (1):
  raft: relay status updates to followers

 src/box/CMakeLists.txt          |   1 +
 src/box/applier.cc              |  66 ++-
 src/box/applier.h               |   2 +
 src/box/box.cc                  | 176 ++++++-
 src/box/box.h                   |   9 +
 src/box/iproto_constants.h      |  15 +
 src/box/lua/cfg.cc              |  27 ++
 src/box/lua/info.c              |  17 +
 src/box/lua/load_cfg.lua        |  15 +
 src/box/lua/misc.cc             |   1 +
 src/box/memtx_engine.c          |  36 ++
 src/box/raft.c                  | 830 ++++++++++++++++++++++++++++++++
 src/box/raft.h                  | 253 ++++++++++
 src/box/relay.cc                |  88 +++-
 src/box/relay.h                 |   7 +
 src/box/replication.cc          |   3 +
 src/box/replication.h           |   7 +
 src/box/wal.c                   |   6 +
 src/box/wal.h                   |   7 +
 src/box/xrow.c                  | 113 +++++
 src/box/xrow.h                  |  15 +
 test/app-tap/init_script.result |   3 +
 test/box/admin.result           |   6 +
 test/box/cfg.result             |  12 +
 test/box/info.result            |   1 +
 25 files changed, 1682 insertions(+), 34 deletions(-)
 create mode 100644 src/box/raft.c
 create mode 100644 src/box/raft.h

-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  8:13   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft Vladislav Shpilevoy
                   ` (8 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Applier is going to need its numeric ID in order to tell the
future Raft module who is a sender of a Raft message. An
alternative would be to add sender ID to each Raft message, but
this looks like a crutch. Moreover, applier still needs to know
its numeric ID in order to notify Raft about heartbeats from the
peer node.

Needed for #1146
---
 src/box/applier.cc | 19 +++++++++++++++++++
 src/box/applier.h  |  2 ++
 2 files changed, 21 insertions(+)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..699b5a683 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -67,6 +67,23 @@ applier_set_state(struct applier *applier, enum applier_state state)
 	trigger_run_xc(&applier->on_state, applier);
 }
 
+static inline void
+applier_assign_instance_id(struct applier *applier)
+{
+	/*
+	 * After final join, the applier already received latest
+	 * records from _cluster, including the record about
+	 * source instance. It can be absent in case the source is
+	 * an anonymous replica.
+	 */
+	assert(applier->state == APPLIER_JOINED);
+	struct replica *replica = replica_by_uuid(&applier->uuid);
+	if (replica != NULL)
+		applier->instance_id = replica->id;
+	else
+		assert(applier->instance_id == 0);
+}
+
 /**
  * Write a nice error message to log file on SocketError or ClientError
  * in applier_f().
@@ -603,6 +620,7 @@ applier_join(struct applier *applier)
 	say_info("final data received");
 
 	applier_set_state(applier, APPLIER_JOINED);
+	applier_assign_instance_id(applier);
 	applier_set_state(applier, APPLIER_READY);
 }
 
@@ -1207,6 +1225,7 @@ applier_subscribe(struct applier *applier)
 		    instance_id != REPLICA_ID_NIL) {
 			say_info("final data received");
 			applier_set_state(applier, APPLIER_JOINED);
+			applier_assign_instance_id(applier);
 			applier_set_state(applier, APPLIER_READY);
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
diff --git a/src/box/applier.h b/src/box/applier.h
index 6e979a806..15ca1fcfd 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -95,6 +95,8 @@ struct applier {
 	ev_tstamp lag;
 	/** The last box_error_code() logged to avoid log flooding */
 	uint32_t last_logged_errcode;
+	/** Remote instance ID. */
+	uint32_t instance_id;
 	/** Remote instance UUID */
 	struct tt_uuid uuid;
 	/** Remote URI (string) */
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04 11:38   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag Vladislav Shpilevoy
                   ` (7 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Box.info.raft returns a table of form:

    {
        state: <string>,
        term: <number>,
        vote: <instance ID>,
        leader: <instance ID>
    }

The fields correspond to the same named Raft concepts one to one.
This info dump is supposed to help with the tests, first of all.
And with investigation of problems in a real cluster.

Part of #1146
---
 src/box/lua/info.c   | 17 +++++++++++++++++
 test/box/info.result |  1 +
 2 files changed, 18 insertions(+)

diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 1c131caec..8e1dbd497 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -49,6 +49,7 @@
 #include "main.h"
 #include "version.h"
 #include "box/box.h"
+#include "box/raft.h"
 #include "lua/utils.h"
 #include "fiber.h"
 #include "tt_static.h"
@@ -577,6 +578,21 @@ lbox_info_listen(struct lua_State *L)
 	return 1;
 }
 
+static int
+lbox_info_raft(struct lua_State *L)
+{
+	lua_createtable(L, 0, 4);
+	lua_pushstring(L, raft_state_strs[raft.state]);
+	lua_setfield(L, -2, "state");
+	luaL_pushuint64(L, raft.volatile_term);
+	lua_setfield(L, -2, "term");
+	lua_pushinteger(L, raft.volatile_vote);
+	lua_setfield(L, -2, "vote");
+	lua_pushinteger(L, raft.leader);
+	lua_setfield(L, -2, "leader");
+	return 1;
+}
+
 static const struct luaL_Reg lbox_info_dynamic_meta[] = {
 	{"id", lbox_info_id},
 	{"uuid", lbox_info_uuid},
@@ -595,6 +611,7 @@ static const struct luaL_Reg lbox_info_dynamic_meta[] = {
 	{"vinyl", lbox_info_vinyl},
 	{"sql", lbox_info_sql},
 	{"listen", lbox_info_listen},
+	{"raft", lbox_info_raft},
 	{NULL, NULL}
 };
 
diff --git a/test/box/info.result b/test/box/info.result
index 40eeae069..d0abb634a 100644
--- a/test/box/info.result
+++ b/test/box/info.result
@@ -82,6 +82,7 @@ t
   - memory
   - package
   - pid
+  - raft
   - replication
   - replication_anon
   - ro
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  8:17   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
                   ` (6 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

An instance is writable if box.cfg.read_only is false, and it is
not orphan. Update of the final read-only state of the instance
needs to fire read-only update triggers, and notify the engines.
These 2 flags were easy and cheap to check on each operation, and
the triggers were easy to use since both flags are stored and
updated inside box.cc.

That is going to change when Raft is introduced. Raft will add 2
more checks:

  - A flag if Raft is enabled on the node. If it is not, then Raft
    state won't affect whether the instance is writable;

  - When Raft is enabled, it will allow writes on a leader only.

It means a check for being read-only would look like this:

    is_ro || is_orphan || (raft_is_enabled() && !raft_is_leader())

This is significantly slower. Besides, Raft somehow needs to
access the read-only triggers and engine API - this looks wrong.

The patch introduces a new flag is_ro_summary. The flag
incorporates all the read-only conditions into one flag. When some
subsystem may change read-only state of the instance, it needs to
call box_update_ro_summary(), and the function takes care of
updating the summary flag, running the triggers, and notifying the
engines.

Raft will use this function when its state or config will change.

Needed for #1146
---
 src/box/box.cc | 44 +++++++++++++++++++++++++++-----------------
 src/box/box.h  |  6 ++++++
 2 files changed, 33 insertions(+), 17 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index faffd5769..0813603c0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -129,6 +129,14 @@ static bool is_local_recovery = false;
  */
 static bool is_orphan;
 
+/**
+ * Summary flag incorporating all the instance attributes,
+ * affecting ability to write. Currently these are:
+ * - is_ro;
+ * - is_orphan;
+ */
+static bool is_ro_summary = true;
+
 /**
  * The pool of fibers in the transaction processor thread
  * working on incoming messages from net, wal and other
@@ -144,11 +152,24 @@ static struct fiber_pool tx_fiber_pool;
  */
 static struct cbus_endpoint tx_prio_endpoint;
 
+void
+box_update_ro_summary(void)
+{
+	bool old_is_ro_summary = is_ro_summary;
+	is_ro_summary = is_ro || is_orphan;
+	/* In 99% nothing changes. Filter this out first. */
+	if (is_ro_summary == old_is_ro_summary)
+		return;
+
+	if (is_ro_summary)
+		engine_switch_to_ro();
+	fiber_cond_broadcast(&ro_cond);
+}
+
 static int
 box_check_writable(void)
 {
-	/* box is only writable if box.cfg.read_only == false and */
-	if (is_ro || is_orphan) {
+	if (is_ro_summary) {
 		diag_set(ClientError, ER_READONLY);
 		diag_log();
 		return -1;
@@ -253,20 +274,14 @@ box_check_ro(void);
 void
 box_set_ro(void)
 {
-	bool ro = box_check_ro();
-	if (ro == is_ro)
-		return; /* nothing to do */
-	if (ro)
-		engine_switch_to_ro();
-
-	is_ro = ro;
-	fiber_cond_broadcast(&ro_cond);
+	is_ro = box_check_ro();
+	box_update_ro_summary();
 }
 
 bool
 box_is_ro(void)
 {
-	return is_ro || is_orphan;
+	return is_ro_summary;
 }
 
 bool
@@ -293,13 +308,8 @@ box_wait_ro(bool ro, double timeout)
 void
 box_do_set_orphan(bool orphan)
 {
-	if (is_orphan == orphan)
-		return; /* nothing to do */
-	if (orphan)
-		engine_switch_to_ro();
-
 	is_orphan = orphan;
-	fiber_cond_broadcast(&ro_cond);
+	box_update_ro_summary();
 }
 
 void
diff --git a/src/box/box.h b/src/box/box.h
index f9bd8b98d..5988264a5 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -137,6 +137,12 @@ box_set_orphan(bool orphan);
 void
 box_do_set_orphan(bool orphan);
 
+/**
+ * Update the final RO flag based on the instance flags and state.
+ */
+void
+box_update_ro_summary(void);
+
 /**
  * Iterate over all spaces and save them to the
  * snapshot file.
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  8:20   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy
                   ` (5 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Relay.cc and box.cc obtained box.cfg.wal_dir value using
cfg_gets() call. To initialize WAL and create struct recovery
objects.

That is not only a bit dangerous (cfg_gets() uses Lua API and can
throw a Lua error) and slow, but also not necessary - wal_dir
parameter is constant, it can't be changed after instance start.

It means, the value can be stored somewhere one time and then used
without Lua.

Main motivation is that the WAL directory path will be needed
inside relay threads to restart their recovery iterators in the
Raft patch. They can't use cfg_gets(), because Lua lives in TX
thread. But can access a constant global variable, introduced in
this patch (it existed before, but now has a method to get it).

Needed for #1146
---
 src/box/box.cc   | 9 ++++-----
 src/box/relay.cc | 7 ++-----
 src/box/wal.c    | 6 ++++++
 src/box/wal.h    | 7 +++++++
 4 files changed, 19 insertions(+), 10 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 0813603c0..eeb00d5e2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2395,8 +2395,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	wal_stream_create(&wal_stream);
 
 	struct recovery *recovery;
-	recovery = recovery_new(cfg_gets("wal_dir"),
-				cfg_geti("force_recovery"),
+	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
 				checkpoint_vclock);
 
 	/*
@@ -2469,7 +2468,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
 				      cfg_getd("wal_dir_rescan_delay"));
 		while (true) {
-			if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock))
+			if (path_lock(wal_dir(), &wal_dir_lock))
 				diag_raise();
 			if (wal_dir_lock >= 0)
 				break;
@@ -2616,7 +2615,7 @@ box_cfg_xc(void)
 	 * Lock the write ahead log directory to avoid multiple
 	 * instances running in the same dir.
 	 */
-	if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock) < 0)
+	if (path_lock(wal_dir(), &wal_dir_lock) < 0)
 		diag_raise();
 	if (wal_dir_lock < 0) {
 		/**
@@ -2625,7 +2624,7 @@ box_cfg_xc(void)
 		 * WAL dir must contain at least one xlog.
 		 */
 		if (!cfg_geti("hot_standby") || checkpoint == NULL)
-			tnt_raise(ClientError, ER_ALREADY_RUNNING, cfg_gets("wal_dir"));
+			tnt_raise(ClientError, ER_ALREADY_RUNNING, wal_dir());
 	}
 
 	struct journal bootstrap_journal;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..124b0f52f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -34,7 +34,6 @@
 #include "tt_static.h"
 #include "scoped_guard.h"
 #include "cbus.h"
-#include "cfg.h"
 #include "errinj.h"
 #include "fiber.h"
 #include "say.h"
@@ -369,8 +368,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
 		relay_delete(relay);
 	});
 
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			       start_vclock);
+	relay->r = recovery_new(wal_dir(), false, start_vclock);
 	vclock_copy(&relay->stop_vclock, stop_vclock);
 
 	int rc = cord_costart(&relay->cord, "final_join",
@@ -731,8 +729,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	});
 
 	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
-	relay->r = recovery_new(cfg_gets("wal_dir"), false,
-			        replica_clock);
+	relay->r = recovery_new(wal_dir(), false, replica_clock);
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 045006b60..e181e58d9 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -201,6 +201,12 @@ wal_mode(void)
 	return wal_writer_singleton.wal_mode;
 }
 
+const char *
+wal_dir(void)
+{
+	return wal_writer_singleton.wal_dir.dirname;
+}
+
 static void
 wal_write_to_disk(struct cmsg *msg);
 
diff --git a/src/box/wal.h b/src/box/wal.h
index 9d0cada46..581306fe9 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -98,6 +98,13 @@ wal_enable(void);
 void
 wal_free(void);
 
+/**
+ * Get WAL directory path. The value never changes after box is
+ * configured first time. Safe to use from multiple threads.
+ */
+const char *
+wal_dir(void);
+
 struct wal_watcher_msg {
 	struct cmsg cmsg;
 	struct wal_watcher *watcher;
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (3 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  8:24   ` Serge Petrenko
  2020-09-07 15:45   ` Sergey Ostanevich
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
                   ` (4 subsequent siblings)
  9 siblings, 2 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Struct replicaset didn't store a number of registered replicas.
Only an array, which was necessary to fullscan each time when want
to find the count.

That is going to be needed in Raft to calculate election quorum.
The patch makes the count tracked so as it could be found for
constant time by simply reading an integer.

Needed for #1146
---
 src/box/replication.cc | 3 +++
 src/box/replication.h  | 7 +++++++
 2 files changed, 10 insertions(+)

diff --git a/src/box/replication.cc b/src/box/replication.cc
index ef0e2411d..20f16206a 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
 						   tt_uuid_str(&replica->uuid));
 	}
 	replicaset.replica_by_id[replica_id] = replica;
+	++replicaset.size;
 
 	say_info("assigned id %d to replica %s",
 		 replica->id, tt_uuid_str(&replica->uuid));
@@ -267,6 +268,8 @@ replica_clear_id(struct replica *replica)
 	 * replication.
 	 */
 	replicaset.replica_by_id[replica->id] = NULL;
+	assert(replicaset.size > 0);
+	--replicaset.size;
 	if (replica->id == instance_id) {
 		/* See replica_check_id(). */
 		assert(replicaset.is_joining);
diff --git a/src/box/replication.h b/src/box/replication.h
index ddc2bddf4..69cc820c9 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -217,6 +217,13 @@ struct replicaset {
 	bool is_joining;
 	/* A number of anonymous replicas following this instance. */
 	int anon_count;
+	/**
+	 * Number of registered replicas. That includes all of them - connected,
+	 * disconnected, connected not directly, just present in _cluster. If an
+	 * instance has an ID, has the same replicaset UUID, then it is
+	 * accounted here.
+	 */
+	int size;
 	/** Applier state. */
 	struct {
 		/**
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (4 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  8:59   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
                   ` (3 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

box.internal.raft_*() helper functions were introduced to test
the persistency. Any state change is saved into WAL and into
snapshot.

Part of #1146
---
 src/box/CMakeLists.txt     |  1 +
 src/box/box.cc             |  8 +++++
 src/box/iproto_constants.h | 13 ++++++++
 src/box/lua/misc.cc        |  1 +
 src/box/memtx_engine.c     | 35 ++++++++++++++++++++
 src/box/raft.c             | 65 ++++++++++++++++++++++++++++++++++++
 src/box/raft.h             | 67 ++++++++++++++++++++++++++++++++++++++
 src/box/xrow.c             | 56 +++++++++++++++++++++++++++++++
 src/box/xrow.h             | 12 +++++++
 9 files changed, 258 insertions(+)
 create mode 100644 src/box/raft.c
 create mode 100644 src/box/raft.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689d2..29c3bfe79 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -170,6 +170,7 @@ add_library(box STATIC
     port.c
     txn.c
     txn_limbo.c
+    raft.c
     box.cc
     gc.c
     checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index eeb00d5e2..281917af2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
 #include "sequence.h"
 #include "sql_stmt_cache.h"
 #include "msgpack.h"
+#include "raft.h"
 #include "trivia/util.h"
 
 static char status[64] = "unknown";
@@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 			diag_raise();
 		return;
 	}
+	if (iproto_type_is_raft_request(row->type)) {
+		struct raft_request raft_req;
+		if (xrow_decode_raft(row, &raft_req) != 0)
+			diag_raise();
+		raft_process_recovery(&raft_req);
+		return;
+	}
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4f5a2b195..8a11626b3 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,8 @@ enum iproto_type {
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
+	IPROTO_RAFT = 30,
+
 	/** A confirmation message for synchronous transactions. */
 	IPROTO_CONFIRM = 40,
 	/** A rollback message for synchronous transactions. */
@@ -258,6 +260,11 @@ enum iproto_type {
 /** IPROTO type name by code */
 extern const char *iproto_type_strs[];
 
+enum iproto_raft_keys {
+	IPROTO_RAFT_TERM = 0,
+	IPROTO_RAFT_VOTE = 1,
+};
+
 /**
  * Returns IPROTO type name by @a type code.
  * @param type IPROTO type.
@@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
 	return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
 }
 
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+	return type == IPROTO_RAFT;
+}
+
 /** This is an error. */
 static inline bool
 iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..e356f2d4b 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,7 @@
 #include "box/tuple.h"
 #include "box/tuple_format.h"
 #include "box/lua/tuple.h"
+#include "box/xrow.h"
 #include "mpstream/mpstream.h"
 
 static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dfd6fce6e..7ebed7aa8 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
 #include "replication.h"
 #include "schema.h"
 #include "gc.h"
+#include "raft.h"
 
 /* sync snapshot every 16MB */
 #define SNAP_SYNC_INTERVAL	(1 << 24)
@@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
 	return 0;
 }
 
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+	assert(row->type == IPROTO_RAFT);
+	struct raft_request req;
+	if (xrow_decode_raft(row, &req) != 0)
+		return -1;
+	raft_process_recovery(&req);
+	return 0;
+}
+
 static int
 memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 				  struct xrow_header *row)
 {
 	assert(row->bodycnt == 1); /* always 1 for read */
 	if (row->type != IPROTO_INSERT) {
+		if (row->type == IPROTO_RAFT)
+			return memtx_engine_recover_raft(row);
 		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
 			 (uint32_t) row->type);
 		return -1;
@@ -477,6 +491,7 @@ struct checkpoint {
 	/** The vclock of the snapshot file. */
 	struct vclock vclock;
 	struct xdir dir;
+	struct raft_request raft;
 	/**
 	 * Do nothing, just touch the snapshot file - the
 	 * checkpoint already exists.
@@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
 	opts.free_cache = true;
 	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
 	vclock_create(&ckpt->vclock);
+	raft_serialize_for_disk(&ckpt->raft);
 	ckpt->touch = false;
 	return ckpt;
 }
@@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
 	return 0;
 };
 
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+	struct xrow_header row;
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	int rc = -1;
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto finish;
+	if (checkpoint_write_row(l, &row) != 0)
+		goto finish;
+	rc = 0;
+finish:
+	region_truncate(region, svp);
+	return rc;
+}
+
 static int
 checkpoint_f(va_list ap)
 {
@@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
 		if (rc != 0)
 			goto fail;
 	}
+	if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+		goto fail;
 	if (xlog_flush(&snap) < 0)
 		goto fail;
 
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..511fe42f5
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+	.term = 1,
+	.vote = 0,
+};
+
+void
+raft_process_recovery(const struct raft_request *req)
+{
+	if (req->term != 0)
+		raft.term = req->term;
+	if (req->vote != 0)
+		raft.vote = req->vote;
+}
+
+void
+raft_serialize_for_network(struct raft_request *req)
+{
+	req->term = raft.term;
+	req->vote = raft.vote;
+}
+
+void
+raft_serialize_for_disk(struct raft_request *req)
+{
+	req->term = raft.term;
+	req->vote = raft.vote;
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..31f7becdb
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,67 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft_request;
+
+struct raft {
+	uint64_t term;
+	uint32_t vote;
+};
+
+extern struct raft raft;
+
+/** Process a raft entry stored in WAL/snapshot. */
+void
+raft_process_recovery(const struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be sent to other instances of the
+ * cluster. It is allowed to save anything here, not only persistent state.
+ */
+void
+raft_serialize_for_network(struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be persisted on disk. Only term
+ * and vote are being persisted.
+ */
+void
+raft_serialize_for_disk(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 95ddb1fe7..1923bacfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
 	return 0;
 }
 
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+		 const struct raft_request *r)
+{
+	size_t size = mp_sizeof_map(2) +
+		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
+		      mp_sizeof_uint(r->term) +
+		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+		      mp_sizeof_uint(r->vote);
+	char *buf = region_alloc(region, size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, size, "region_alloc", "buf");
+		return -1;
+	}
+	memset(row, 0, sizeof(*row));
+	row->type = IPROTO_RAFT;
+	row->body[0].iov_base = buf;
+	row->body[0].iov_len = size;
+	row->group_id = GROUP_LOCAL;
+	row->bodycnt = 1;
+	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+	buf = mp_encode_uint(buf, r->term);
+	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+	buf = mp_encode_uint(buf, r->vote);
+	return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+	/* TODO: handle bad format. */
+	assert(row->type == IPROTO_RAFT);
+	assert(row->bodycnt == 1);
+	assert(row->group_id == GROUP_LOCAL);
+	memset(r, 0, sizeof(*r));
+	const char *pos = row->body[0].iov_base;
+	uint32_t map_size = mp_decode_map(&pos);
+	for (uint32_t i = 0; i < map_size; ++i)
+	{
+		uint64_t key = mp_decode_uint(&pos);
+		switch (key) {
+		case IPROTO_RAFT_TERM:
+			r->term = mp_decode_uint(&pos);
+			break;
+		case IPROTO_RAFT_VOTE:
+			r->vote = mp_decode_uint(&pos);
+			break;
+		default:
+			mp_next(&pos);
+			break;
+		}
+	}
+	return 0;
+}
+
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 58d47b12d..c234f6f88 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
 int
 xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
+struct raft_request {
+	uint64_t term;
+	uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+		 const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
 /**
  * CALL/EVAL request.
  */
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (5 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  9:07   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

The new options are:

- raft_is_enabled - enable/disable Raft. When disabled, the node
  is supposed to work like if Raft does not exist. Like earlier;

- raft_is_candidate - a flag whether the instance can try to
  become a leader. Note, it can vote for other nodes regardless of
  value of this option;

- raft_election_timeout - how long need to wait until election
  end, in seconds.

The options don't do anything now. They are added separately in
order to keep such mundane changes from the main Raft commit, to
simplify its review.

Part of #1146
---
 src/box/box.cc                  | 91 +++++++++++++++++++++++++++++++++
 src/box/box.h                   |  3 ++
 src/box/lua/cfg.cc              | 27 ++++++++++
 src/box/lua/load_cfg.lua        | 15 ++++++
 src/box/raft.c                  | 30 +++++++++++
 src/box/raft.h                  | 35 +++++++++++++
 test/app-tap/init_script.result |  3 ++
 test/box/admin.result           |  6 +++
 test/box/cfg.result             | 12 +++++
 9 files changed, 222 insertions(+)

diff --git a/src/box/box.cc b/src/box/box.cc
index 281917af2..5f04a1a78 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -472,6 +472,40 @@ box_check_uri(const char *source, const char *option_name)
 	}
 }
 
+static int
+box_check_raft_is_enabled(void)
+{
+	int b = cfg_getb("raft_is_enabled");
+	if (b < 0) {
+		diag_set(ClientError, ER_CFG, "raft_is_enabled",
+			 "the value must be a boolean");
+	}
+	return b;
+}
+
+static int
+box_check_raft_is_candidate(void)
+{
+	int b = cfg_getb("raft_is_candidate");
+	if (b < 0) {
+		diag_set(ClientError, ER_CFG, "raft_is_candidate",
+			 "the value must be a boolean");
+	}
+	return b;
+}
+
+static double
+box_check_raft_election_timeout(void)
+{
+	double d = cfg_getd("raft_election_timeout");
+	if (d == 0) {
+		diag_set(ClientError, ER_CFG, "raft_election_timeout",
+			 "the value must be a positive number");
+		return -1;
+	}
+	return d;
+}
+
 static void
 box_check_replication(void)
 {
@@ -729,6 +763,12 @@ box_check_config(void)
 	box_check_uri(cfg_gets("listen"), "listen");
 	box_check_instance_uuid(&uuid);
 	box_check_replicaset_uuid(&uuid);
+	if (box_check_raft_is_enabled() < 0)
+		diag_raise();
+	if (box_check_raft_is_candidate() < 0)
+		diag_raise();
+	if (box_check_raft_election_timeout() < 0)
+		diag_raise();
 	box_check_replication();
 	box_check_replication_timeout();
 	box_check_replication_connect_timeout();
@@ -751,6 +791,36 @@ box_check_config(void)
 		diag_raise();
 }
 
+int
+box_set_raft_is_enabled(void)
+{
+	int b = box_check_raft_is_enabled();
+	if (b < 0)
+		return -1;
+	raft_cfg_is_enabled(b);
+	return 0;
+}
+
+int
+box_set_raft_is_candidate(void)
+{
+	int b = box_check_raft_is_candidate();
+	if (b < 0)
+		return -1;
+	raft_cfg_is_candidate(b);
+	return 0;
+}
+
+int
+box_set_raft_election_timeout(void)
+{
+	double d = box_check_raft_election_timeout();
+	if (d < 0)
+		return -1;
+	raft_cfg_election_timeout(d);
+	return 0;
+}
+
 /*
  * Parse box.cfg.replication and create appliers.
  */
@@ -835,6 +905,7 @@ void
 box_set_replication_timeout(void)
 {
 	replication_timeout = box_check_replication_timeout();
+	raft_cfg_death_timeout();
 }
 
 void
@@ -865,6 +936,7 @@ box_set_replication_synchro_quorum(void)
 		return -1;
 	replication_synchro_quorum = value;
 	txn_limbo_on_parameters_change(&txn_limbo);
+	raft_cfg_election_quorum();
 	return 0;
 }
 
@@ -2680,6 +2752,25 @@ box_cfg_xc(void)
 
 	fiber_gc();
 	is_box_configured = true;
+	/*
+	 * Fill in Raft parameters after bootstrap. Before it is not possible -
+	 * there may be Raft data to recover from WAL and snapshot. Also until
+	 * recovery is done, it is not possible to write new records into WAL.
+	 * It is also totally safe, because relaying is not started until the
+	 * box is configured. So it can't happen, that this Raft node will try
+	 * to relay to another Raft node without Raft enabled leading to
+	 * disconnect.
+	 */
+	if (box_set_raft_is_candidate() != 0)
+		diag_raise();
+	if (box_set_raft_election_timeout() != 0)
+		diag_raise();
+	/*
+	 * Raft is enabled last. So as all the parameters are installed by that
+	 * time.
+	 */
+	if (box_set_raft_is_enabled() != 0)
+		diag_raise();
 
 	title("running");
 	say_info("ready to accept requests");
diff --git a/src/box/box.h b/src/box/box.h
index 5988264a5..637d10dd3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -245,6 +245,9 @@ void box_set_vinyl_memory(void);
 void box_set_vinyl_max_tuple_size(void);
 void box_set_vinyl_cache(void);
 void box_set_vinyl_timeout(void);
+int box_set_raft_is_enabled(void);
+int box_set_raft_is_candidate(void);
+int box_set_raft_election_timeout(void);
 void box_set_replication_timeout(void);
 void box_set_replication_connect_timeout(void);
 void box_set_replication_connect_quorum(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index d481155cd..339b85f9d 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -269,6 +269,30 @@ lbox_cfg_set_worker_pool_threads(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_raft_is_enabled(struct lua_State *L)
+{
+	if (box_set_raft_is_enabled() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_raft_is_candidate(struct lua_State *L)
+{
+	if (box_set_raft_is_candidate() != 0)
+		luaT_error(L);
+	return 0;
+}
+
+static int
+lbox_cfg_set_raft_election_timeout(struct lua_State *L)
+{
+	if (box_set_raft_election_timeout() != 0)
+		luaT_error(L);
+	return 0;
+}
+
 static int
 lbox_cfg_set_replication_timeout(struct lua_State *L)
 {
@@ -382,6 +406,9 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_vinyl_max_tuple_size", lbox_cfg_set_vinyl_max_tuple_size},
 		{"cfg_set_vinyl_cache", lbox_cfg_set_vinyl_cache},
 		{"cfg_set_vinyl_timeout", lbox_cfg_set_vinyl_timeout},
+		{"cfg_set_raft_is_enabled", lbox_cfg_set_raft_is_enabled},
+		{"cfg_set_raft_is_candidate", lbox_cfg_set_raft_is_candidate},
+		{"cfg_set_raft_election_timeout", lbox_cfg_set_raft_election_timeout},
 		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
 		{"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum},
 		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 53f572895..2c98fd837 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -86,6 +86,9 @@ local default_cfg = {
     checkpoint_wal_threshold = 1e18,
     checkpoint_count    = 2,
     worker_pool_threads = 4,
+    raft_is_enabled       = false,
+    raft_is_candidate     = true,
+    raft_election_timeout = 5,
     replication_timeout = 1,
     replication_sync_lag = 10,
     replication_sync_timeout = 300,
@@ -163,6 +166,9 @@ local template_cfg = {
     read_only           = 'boolean',
     hot_standby         = 'boolean',
     worker_pool_threads = 'number',
+    raft_is_enabled       = 'boolean',
+    raft_is_candidate     = 'boolean',
+    raft_election_timeout = 'number',
     replication_timeout = 'number',
     replication_sync_lag = 'number',
     replication_sync_timeout = 'number',
@@ -279,6 +285,9 @@ local dynamic_cfg = {
         require('title').update(box.cfg.custom_proc_title)
     end,
     force_recovery          = function() end,
+    raft_is_enabled         = private.cfg_set_raft_is_enabled,
+    raft_is_candidate       = private.cfg_set_raft_is_candidate,
+    raft_election_timeout   = private.cfg_set_raft_election_timeout,
     replication_timeout     = private.cfg_set_replication_timeout,
     replication_connect_timeout = private.cfg_set_replication_connect_timeout,
     replication_connect_quorum = private.cfg_set_replication_connect_quorum,
@@ -333,6 +342,9 @@ local dynamic_cfg_order = {
     -- the new one. This should be fixed when box.cfg is able to
     -- apply some parameters together and atomically.
     replication_anon        = 250,
+    raft_is_enabled         = 300,
+    raft_is_candidate       = 310,
+    raft_election_timeout   = 320,
 }
 
 local function sort_cfg_cb(l, r)
@@ -350,6 +362,9 @@ local dynamic_cfg_skip_at_load = {
     vinyl_cache             = true,
     vinyl_timeout           = true,
     too_long_threshold      = true,
+    raft_is_enabled         = true,
+    raft_is_candidate       = true,
+    raft_election_timeout   = true,
     replication             = true,
     replication_timeout     = true,
     replication_connect_timeout = true,
diff --git a/src/box/raft.c b/src/box/raft.c
index 511fe42f5..ee54d02b7 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -37,6 +37,8 @@
 
 /** Raft state of this instance. */
 struct raft raft = {
+	.is_enabled = false,
+	.is_candidate = false,
 	.term = 1,
 	.vote = 0,
 };
@@ -63,3 +65,31 @@ raft_serialize_for_disk(struct raft_request *req)
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
+
+void
+raft_cfg_is_enabled(bool is_enabled)
+{
+	raft.is_enabled = is_enabled;
+}
+
+void
+raft_cfg_is_candidate(bool is_candidate)
+{
+	raft.is_candidate = is_candidate;
+}
+
+void
+raft_cfg_election_timeout(double timeout)
+{
+	raft.election_timeout = timeout;
+}
+
+void
+raft_cfg_election_quorum(void)
+{
+}
+
+void
+raft_cfg_death_timeout(void)
+{
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 31f7becdb..f27222752 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,6 +30,7 @@
  * SUCH DAMAGE.
  */
 #include <stdint.h>
+#include <stdbool.h>
 
 #if defined(__cplusplus)
 extern "C" {
@@ -38,8 +39,11 @@ extern "C" {
 struct raft_request;
 
 struct raft {
+	bool is_enabled;
+	bool is_candidate;
 	uint64_t term;
 	uint32_t vote;
+	double election_timeout;
 };
 
 extern struct raft raft;
@@ -48,6 +52,37 @@ extern struct raft raft;
 void
 raft_process_recovery(const struct raft_request *req);
 
+/** Configure whether Raft is enabled. */
+void
+raft_cfg_is_enabled(bool is_enabled);
+
+/**
+ * Configure whether the instance can be elected as Raft leader. Even if false,
+ * the node still can vote, when Raft is enabled.
+ */
+void
+raft_cfg_is_candidate(bool is_candidate);
+
+/** Configure Raft leader election timeout. */
+void
+raft_cfg_election_timeout(double timeout);
+
+/**
+ * Configure Raft leader election quorum. There is no a separate option.
+ * Instead, synchronous replication quorum is used. Since Raft is tightly bound
+ * with synchronous replication.
+ */
+void
+raft_cfg_election_quorum(void);
+
+/**
+ * Configure Raft leader death timeout. I.e. number of seconds without
+ * heartbeats from the leader to consider it dead. There is no a separate
+ * option. Raft uses replication timeout for that.
+ */
+void
+raft_cfg_death_timeout(void);
+
 /**
  * Save complete Raft state into a request to be sent to other instances of the
  * cluster. It is allowed to save anything here, not only persistent state.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 857f0c95f..1d191987a 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -23,6 +23,9 @@ memtx_memory:107374182
 memtx_min_tuple_size:16
 net_msg_max:768
 pid_file:box.pid
+raft_election_timeout:5
+raft_is_candidate:true
+raft_is_enabled:false
 read_only:false
 readahead:16320
 replication_anon:false
diff --git a/test/box/admin.result b/test/box/admin.result
index ab3e80a97..13536a318 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -67,6 +67,12 @@ cfg_filter(box.cfg)
     - 768
   - - pid_file
     - <hidden>
+  - - raft_election_timeout
+    - 5
+  - - raft_is_candidate
+    - true
+  - - raft_is_enabled
+    - false
   - - read_only
     - false
   - - readahead
diff --git a/test/box/cfg.result b/test/box/cfg.result
index bdd210b09..11358b2cd 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -55,6 +55,12 @@ cfg_filter(box.cfg)
  |     - 768
  |   - - pid_file
  |     - <hidden>
+ |   - - raft_election_timeout
+ |     - 5
+ |   - - raft_is_candidate
+ |     - true
+ |   - - raft_is_enabled
+ |     - false
  |   - - read_only
  |     - false
  |   - - readahead
@@ -162,6 +168,12 @@ cfg_filter(box.cfg)
  |     - 768
  |   - - pid_file
  |     - <hidden>
+ |   - - raft_election_timeout
+ |     - 5
+ |   - - raft_is_candidate
+ |     - true
+ |   - - raft_is_enabled
+ |     - false
  |   - - read_only
  |     - false
  |   - - readahead
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (6 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine Vladislav Shpilevoy
  9 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

From: sergepetrenko <sergepetrenko@tarantool.org>

The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.

Part of #1146
Part of #5204
---
 src/box/applier.cc         | 32 +++++++++++++--
 src/box/box.cc             | 21 +++++++++-
 src/box/iproto_constants.h |  2 +
 src/box/memtx_engine.c     |  3 +-
 src/box/raft.c             | 71 +++++++++++++++++++++++++++++++-
 src/box/raft.h             | 31 +++++++++++++-
 src/box/relay.cc           | 62 +++++++++++++++++++++++++++-
 src/box/relay.h            |  7 ++++
 src/box/xrow.c             | 83 ++++++++++++++++++++++++++++++++------
 src/box/xrow.h             |  5 ++-
 10 files changed, 293 insertions(+), 24 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 699b5a683..53db97d6d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
 #include "scoped_guard.h"
 #include "txn_limbo.h"
 #include "journal.h"
+#include "raft.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -315,6 +316,8 @@ apply_final_join_row(struct xrow_header *row)
 	 */
 	if (iproto_type_is_synchro_request(row->type))
 		return 0;
+	if (iproto_type_is_raft_request(row->type))
+		return 0;
 	struct txn *txn = txn_begin();
 	if (txn == NULL)
 		return -1;
@@ -894,6 +897,21 @@ err:
 	return -1;
 }
 
+static int
+apply_raft_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_raft_request(row->type));
+
+	struct raft_request req;
+	struct vclock candidate_clock;
+	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
+		return -1;
+
+	raft_process_msg(&req);
+
+	return 0;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1238,11 +1256,19 @@ applier_subscribe(struct applier *applier)
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		struct xrow_header *first_row =
+			&stailq_first_entry(&rows, struct applier_tx_row,
+					    next)->row;
+		if (first_row->lsn == 0) {
+			if (unlikely(iproto_type_is_raft_request(
+							first_row->type))) {
+				if (apply_raft_row(first_row) != 0)
+					diag_raise();
+			}
 			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index 5f04a1a78..6f85e734a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 	}
 	if (iproto_type_is_raft_request(row->type)) {
 		struct raft_request raft_req;
-		if (xrow_decode_raft(row, &raft_req) != 0)
+		/* Vclock is never persisted in WAL by Raft. */
+		if (xrow_decode_raft(row, &raft_req, NULL) != 0)
 			diag_raise();
 		raft_process_recovery(&raft_req);
 		return;
@@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 		 tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
 	say_info("remote vclock %s local vclock %s",
 		 vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+	if (raft_is_enabled()) {
+		/*
+		 * Send out the current raft state of the instance. Don't do
+		 * that if Raft is disabled. It can be that a part of the
+		 * cluster still contains old versions, which can't handle Raft
+		 * messages. So when it is disabled, its network footprint
+		 * should be 0.
+		 */
+		struct raft_request req;
+		/*
+		 * Omit the candidate vclock, since we've just sent it in
+		 * subscribe response.
+		 */
+		raft_serialize_for_network(&req, NULL);
+		xrow_encode_raft(&row, &fiber()->gc, &req);
+		coio_write_xrow(io, &row);
+	}
 	/*
 	 * Replica clock is used in gc state and recovery
 	 * initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..3ec397d3c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,8 @@ extern const char *iproto_type_strs[];
 enum iproto_raft_keys {
 	IPROTO_RAFT_TERM = 0,
 	IPROTO_RAFT_VOTE = 1,
+	IPROTO_RAFT_STATE = 2,
+	IPROTO_RAFT_VCLOCK = 3,
 };
 
 /**
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 7ebed7aa8..fe7ae9f63 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
 {
 	assert(row->type == IPROTO_RAFT);
 	struct raft_request req;
-	if (xrow_decode_raft(row, &req) != 0)
+	/* Vclock is never persisted in WAL by Raft. */
+	if (xrow_decode_raft(row, &req, NULL) != 0)
 		return -1;
 	raft_process_recovery(&req);
 	return 0;
diff --git a/src/box/raft.c b/src/box/raft.c
index ee54d02b7..7697809ee 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,9 +34,20 @@
 #include "journal.h"
 #include "xrow.h"
 #include "small/region.h"
+#include "replication.h"
+#include "relay.h"
+
+const char *raft_state_strs[] = {
+	NULL,
+	"follower",
+	"candidate",
+	"leader",
+};
 
 /** Raft state of this instance. */
 struct raft raft = {
+	.leader = 0,
+	.state = RAFT_STATE_FOLLOWER,
 	.is_enabled = false,
 	.is_candidate = false,
 	.term = 1,
@@ -50,18 +61,65 @@ raft_process_recovery(const struct raft_request *req)
 		raft.term = req->term;
 	if (req->vote != 0)
 		raft.vote = req->vote;
+	/*
+	 * Role is never persisted. If recovery is happening, the
+	 * node was restarted, and the former role can be false
+	 * anyway.
+	 */
+	assert(req->state == 0);
+	/*
+	 * Vclock is always persisted by some other subsystem - WAL, snapshot.
+	 * It is used only to decide to whom to give the vote during election,
+	 * as a part of the volatile state.
+	 */
+	assert(req->vclock == NULL);
+	/* Raft is not enabled until recovery is finished. */
+	assert(!raft_is_enabled());
 }
 
 void
-raft_serialize_for_network(struct raft_request *req)
+raft_process_msg(const struct raft_request *req)
 {
+	if (req->term > raft.term) {
+		// Update term.
+		// The logic will be similar, but the code
+		// below is for testing purposes.
+		raft.term = req->term;
+	}
+	if (req->vote > 0) {
+		// Check whether the vote's for us.
+	}
+	switch (req->state) {
+	case RAFT_STATE_FOLLOWER:
+	    break;
+	case RAFT_STATE_CANDIDATE:
+	    // Perform voting logic.
+	    break;
+	case RAFT_STATE_LEADER:
+	    // Switch to a new leader.
+	    break;
+	default:
+	    break;
+	}
+}
+
+void
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
+{
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
+	req->state = raft.state;
+	/*
+	 * Raft does not own vclock, so it always expects it passed externally.
+	 */
+	req->vclock = vclock;
 }
 
 void
 raft_serialize_for_disk(struct raft_request *req)
 {
+	memset(req, 0, sizeof(*req));
 	req->term = raft.term;
 	req->vote = raft.vote;
 }
@@ -93,3 +151,14 @@ void
 raft_cfg_death_timeout(void)
 {
 }
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+	replicaset_foreach(replica) {
+		if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+		    relay_get_state(replica->relay) == RELAY_FOLLOW) {
+			relay_push_raft(replica->relay, req);
+		}
+	}
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index f27222752..e3261454b 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,8 +37,19 @@ extern "C" {
 #endif
 
 struct raft_request;
+struct vclock;
+
+enum raft_state {
+	RAFT_STATE_FOLLOWER = 1,
+	RAFT_STATE_CANDIDATE = 2,
+	RAFT_STATE_LEADER = 3,
+};
+
+extern const char *raft_state_strs[];
 
 struct raft {
+	uint32_t leader;
+	enum raft_state state;
 	bool is_enabled;
 	bool is_candidate;
 	uint64_t term;
@@ -48,10 +59,21 @@ struct raft {
 
 extern struct raft raft;
 
+/** Check if Raft is enabled. */
+static inline bool
+raft_is_enabled(void)
+{
+	return raft.is_enabled;
+}
+
 /** Process a raft entry stored in WAL/snapshot. */
 void
 raft_process_recovery(const struct raft_request *req);
 
+/** Process a raft status message coming from the network. */
+void
+raft_process_msg(const struct raft_request *req);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -88,7 +110,7 @@ raft_cfg_death_timeout(void);
  * cluster. It is allowed to save anything here, not only persistent state.
  */
 void
-raft_serialize_for_network(struct raft_request *req);
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
 
 /**
  * Save complete Raft state into a request to be persisted on disk. Only term
@@ -97,6 +119,13 @@ raft_serialize_for_network(struct raft_request *req);
 void
 raft_serialize_for_disk(struct raft_request *req);
 
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 124b0f52f..74581db9c 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
 #include "xstream.h"
 #include "wal.h"
 #include "txn_limbo.h"
+#include "raft.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+struct relay_raft_msg {
+	struct cmsg base;
+	struct cmsg_hop route;
+	struct raft_request req;
+	struct vclock vclock;
+	struct relay *relay;
+};
+
+static void
+relay_raft_msg_push(struct cmsg *base)
+{
+	struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+	struct xrow_header row;
+	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+	try {
+		relay_send(msg->relay, &row);
+	} catch (Exception *e) {
+		relay_set_error(msg->relay, e);
+		fiber_cancel(fiber());
+	}
+	free(msg);
+}
+
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req)
+{
+	/*
+	 * XXX: the message should be preallocated. It should
+	 * work like Kharon in IProto. Relay should have 2 raft
+	 * messages rotating. When one is sent, the other can be
+	 * updated and a flag is set. When the first message is
+	 * sent, the control returns to TX thread, sees the set
+	 * flag, rotates the buffers, and sends it again. And so
+	 * on. This is how it can work in future, with 0 heap
+	 * allocations. Current solution with alloc-per-update is
+	 * good enough as a start. Another option - wait until all
+	 * is moved to WAL thread, where this will all happen
+	 * in one thread and will be much simpler.
+	 */
+	struct relay_raft_msg *msg =
+		(struct relay_raft_msg *)malloc(sizeof(*msg));
+	if (msg == NULL) {
+		panic("Couldn't allocate raft message");
+		return;
+	}
+	msg->req = *req;
+	if (req->vclock != NULL) {
+		msg->req.vclock = &msg->vclock;
+		vclock_copy(&msg->vclock, req->vclock);
+	}
+	msg->route.f = relay_raft_msg_push;
+	msg->route.pipe = NULL;
+	cmsg_init(&msg->base, &msg->route);
+	msg->relay = relay;
+	cpipe_push(&relay->relay_pipe, &msg->base);
+}
+
 /** Send a single row to the client. */
 static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
-	assert(iproto_type_is_dml(packet->type) ||
-	       iproto_type_is_synchro_request(packet->type));
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
 		 * We do not relay replica-local rows to other
@@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	assert(iproto_type_is_dml(packet->type) ||
+	       iproto_type_is_synchro_request(packet->type));
 	/* Check if the rows from the instance are filtered. */
 	if ((1 << packet->replica_id & relay->id_filter) != 0)
 		return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
 double
 relay_last_row_time(const struct relay *relay);
 
+/**
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
+ */
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..11fdacc0d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,11 +958,30 @@ int
 xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r)
 {
-	size_t size = mp_sizeof_map(2) +
-		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
-		      mp_sizeof_uint(r->term) +
-		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
-		      mp_sizeof_uint(r->vote);
+	/*
+	 * Terms is encoded always. Sometimes the rest can be even ignored if
+	 * the term is too old.
+	 */
+	int map_size = 1;
+	size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+		      mp_sizeof_uint(r->term);
+	if (r->vote != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+			mp_sizeof_uint(r->vote);
+	}
+	if (r->state != 0) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+			mp_sizeof_uint(r->state);
+	}
+	if (r->vclock != NULL) {
+		++map_size;
+		size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
+			mp_sizeof_vclock_ignore0(r->vclock);
+	}
+	size += mp_sizeof_map(map_size);
+
 	char *buf = region_alloc(region, size);
 	if (buf == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 	row->body[0].iov_len = size;
 	row->group_id = GROUP_LOCAL;
 	row->bodycnt = 1;
-	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_map(buf, map_size);
 	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
 	buf = mp_encode_uint(buf, r->term);
-	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
-	buf = mp_encode_uint(buf, r->vote);
+	if (r->vote != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+		buf = mp_encode_uint(buf, r->vote);
+	}
+	if (r->state != 0) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+		buf = mp_encode_uint(buf, r->state);
+	}
+	if (r->vclock != NULL) {
+		buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
+		buf = mp_encode_vclock_ignore0(buf, r->vclock);
+	}
 	return 0;
 }
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock)
 {
-	/* TODO: handle bad format. */
 	assert(row->type == IPROTO_RAFT);
-	assert(row->bodycnt == 1);
-	assert(row->group_id == GROUP_LOCAL);
+	if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+		diag_set(ClientError, ER_INVALID_MSGPACK,
+			 "malformed raft request");
+		return -1;
+	}
 	memset(r, 0, sizeof(*r));
-	const char *pos = row->body[0].iov_base;
+	r->vclock = vclock;
+
+	const char *begin = row->body[0].iov_base;
+	const char *end = begin + row->body[0].iov_len;
+	const char *pos = begin;
 	uint32_t map_size = mp_decode_map(&pos);
 	for (uint32_t i = 0; i < map_size; ++i)
 	{
+		if (mp_typeof(*pos) != MP_UINT)
+			goto bad_msgpack;
 		uint64_t key = mp_decode_uint(&pos);
 		switch (key) {
 		case IPROTO_RAFT_TERM:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->term = mp_decode_uint(&pos);
 			break;
 		case IPROTO_RAFT_VOTE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
 			r->vote = mp_decode_uint(&pos);
 			break;
+		case IPROTO_RAFT_STATE:
+			if (mp_typeof(*pos) != MP_UINT)
+				goto bad_msgpack;
+			r->state = mp_decode_uint(&pos);
+			break;
+		case IPROTO_RAFT_VCLOCK:
+			if (r->vclock == NULL)
+				mp_next(&pos);
+			else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+				goto bad_msgpack;
+			break;
 		default:
 			mp_next(&pos);
 			break;
 		}
 	}
 	return 0;
+
+bad_msgpack:
+	xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+	return -1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..c627102dd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 struct raft_request {
 	uint64_t term;
 	uint32_t vote;
+	uint32_t state;
+	struct vclock *vclock;
 };
 
 int
@@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
 		 const struct raft_request *r);
 
 int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+		 struct vclock *vclock);
 
 /**
  * CALL/EVAL request.
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg()
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (7 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04  9:22   ` Serge Petrenko
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine Vladislav Shpilevoy
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

Instance ID of the sender is needed in order to

- be able to vote for him;
- be able to remember its ID as leader ID, when it is a leader.
---
 src/box/applier.cc | 9 ++++-----
 src/box/raft.c     | 3 ++-
 src/box/raft.h     | 8 ++++++--
 3 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 53db97d6d..5459a1dc1 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -898,7 +898,7 @@ err:
 }
 
 static int
-apply_raft_row(struct xrow_header *row)
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
 {
 	assert(iproto_type_is_raft_request(row->type));
 
@@ -906,9 +906,7 @@ apply_raft_row(struct xrow_header *row)
 	struct vclock candidate_clock;
 	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
 		return -1;
-
-	raft_process_msg(&req);
-
+	raft_process_msg(&req, applier->instance_id);
 	return 0;
 }
 
@@ -1262,7 +1260,8 @@ applier_subscribe(struct applier *applier)
 		if (first_row->lsn == 0) {
 			if (unlikely(iproto_type_is_raft_request(
 							first_row->type))) {
-				if (apply_raft_row(first_row) != 0)
+				if (applier_handle_raft(applier,
+							first_row) != 0)
 					diag_raise();
 			}
 			applier_signal_ack(applier);
diff --git a/src/box/raft.c b/src/box/raft.c
index 7697809ee..4d3d07c48 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -78,8 +78,9 @@ raft_process_recovery(const struct raft_request *req)
 }
 
 void
-raft_process_msg(const struct raft_request *req)
+raft_process_msg(const struct raft_request *req, uint32_t source)
 {
+	(void)source;
 	if (req->term > raft.term) {
 		// Update term.
 		// The logic will be similar, but the code
diff --git a/src/box/raft.h b/src/box/raft.h
index e3261454b..db64cf933 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -70,9 +70,13 @@ raft_is_enabled(void)
 void
 raft_process_recovery(const struct raft_request *req);
 
-/** Process a raft status message coming from the network. */
+/**
+ * Process a raft status message coming from the network.
+ * @param req Raft request.
+ * @param source Instance ID of the message sender.
+ */
 void
-raft_process_msg(const struct raft_request *req);
+raft_process_msg(const struct raft_request *req, uint32_t source);
 
 /** Configure whether Raft is enabled. */
 void
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine
  2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
                   ` (8 preceding siblings ...)
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
  2020-09-04 11:36   ` Serge Petrenko
  9 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
  To: tarantool-patches, sergepetrenko, gorcunov

The commit is a core part of Raft implementation. It introduces
the Raft state machine implementation and its integration into the
instance's life cycle.

The implementation follows the protocol to the letter except a few
important details.

Firstly, the original Raft assumes, that all nodes share the same
log record numbers. In Tarantool they are called LSNs. But in case
of Tarantool each node has its own LSN in its own component of
vclock. That makes the election messages a bit heavier, because
the nodes need to send and compare complete vclocks of each other
instead of a single number like in the original Raft. But logic
becomes simpler. Because in the original Raft there is a problem
of uncertainty about what to do with records of an old leader
right after a new leader is elected. They could be rolled back or
confirmed depending on circumstances. The issue disappears when
vclock is used.

Secondly, leader election works differently during cluster
bootstrap, until number of bootstrapped replicas becomes >=
election quorum. That arises from specifics of replicas bootstrap
and order of systems initialization. In short: during bootstrap a
leader election may use a smaller election quorum than the
configured one. See more details in the code.

Part of #1146
---
 src/box/applier.cc |  18 +-
 src/box/box.cc     |   7 +-
 src/box/raft.c     | 711 +++++++++++++++++++++++++++++++++++++++++++--
 src/box/raft.h     | 118 ++++++++
 src/box/relay.cc   |  19 ++
 5 files changed, 847 insertions(+), 26 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 5459a1dc1..c7c486ee4 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
  * Return 0 for success or -1 in case of an error.
  */
 static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
 {
+	/*
+	 * Rows received not directly from a leader are ignored. That is a
+	 * protection against the case when an old leader keeps sending data
+	 * around not knowing yet that it is not a leader anymore.
+	 *
+	 * XXX: it may be that this can be fine to apply leader transactions by
+	 * looking at their replica_id field if it is equal to leader id. That
+	 * can be investigated as an 'optimization'. Even though may not give
+	 * anything, because won't change total number of rows sent in the
+	 * network anyway.
+	 */
+	if (!raft_is_source_allowed(applier->instance_id))
+		return 0;
 	struct xrow_header *first_row = &stailq_first_entry(rows,
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
@@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier)
 		struct xrow_header *first_row =
 			&stailq_first_entry(&rows, struct applier_tx_row,
 					    next)->row;
+		raft_process_heartbeat(applier->instance_id);
 		if (first_row->lsn == 0) {
 			if (unlikely(iproto_type_is_raft_request(
 							first_row->type))) {
@@ -1265,7 +1279,7 @@ applier_subscribe(struct applier *applier)
 					diag_raise();
 			}
 			applier_signal_ack(applier);
-		} else if (applier_apply_tx(&rows) != 0) {
+		} else if (applier_apply_tx(applier, &rows) != 0) {
 			diag_raise();
 		}
 
diff --git a/src/box/box.cc b/src/box/box.cc
index 6f85e734a..9d0782fff 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
 box_update_ro_summary(void)
 {
 	bool old_is_ro_summary = is_ro_summary;
-	is_ro_summary = is_ro || is_orphan;
+	is_ro_summary = is_ro || is_orphan || raft_is_ro();
 	/* In 99% nothing changes. Filter this out first. */
 	if (is_ro_summary == old_is_ro_summary)
 		return;
@@ -171,6 +171,10 @@ static int
 box_check_writable(void)
 {
 	if (is_ro_summary) {
+		/*
+		 * XXX: return a special error when the node is not a leader to
+		 * reroute to the leader node.
+		 */
 		diag_set(ClientError, ER_READONLY);
 		diag_log();
 		return -1;
@@ -2646,6 +2650,7 @@ box_init(void)
 
 	txn_limbo_init();
 	sequence_init();
+	raft_init();
 }
 
 bool
diff --git a/src/box/raft.c b/src/box/raft.c
index 4d3d07c48..1c4275cd5 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,12 @@
 #include "small/region.h"
 #include "replication.h"
 #include "relay.h"
+#include "box.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
 
 const char *raft_state_strs[] = {
 	NULL,
@@ -48,19 +54,220 @@ const char *raft_state_strs[] = {
 struct raft raft = {
 	.leader = 0,
 	.state = RAFT_STATE_FOLLOWER,
+	.volatile_term = 1,
+	.volatile_vote = 0,
 	.is_enabled = false,
 	.is_candidate = false,
+	.is_cfg_candidate = false,
+	.is_write_in_progress = false,
 	.term = 1,
 	.vote = 0,
+	.vote_mask = 0,
+	.vote_count = 0,
+	.election_timeout = 5,
 };
 
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted for self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+	return raft.volatile_term == raft.term &&
+	       raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where max value of the
+ * factor is a constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+	double timeout = raft.election_timeout;
+	/* Translate to ms. Integer is needed to be able to use mod below. */
+	uint32_t rand_part =
+		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+	if (rand_part == 0)
+		rand_part = 1;
+	/*
+	 * XXX: this is not giving a good distribution, but it is not so trivial
+	 * to implement a correct random value generator. There is a task to
+	 * unify all such places. Not critical here.
+	 */
+	rand_part = rand() % (rand_part + 1);
+	return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+	if (v == NULL)
+		return false;
+	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+	return cmp == 0 || cmp == 1;
+}
+
+/**
+ * Election quorum is not strictly equal to synchronous replication quorum.
+ * Sometimes it can be lowered. That is about bootstrap.
+ *
+ * The problem with bootstrap is that when the replicaset boots, all the
+ * instances can't write to WAL and can't recover from their initial snapshot.
+ * They need one node which will boot first, and then they will replicate from
+ * it.
+ *
+ * This one node should boot from its zero snapshot, create replicaset UUID,
+ * register self with ID 1 in _cluster space, and then register all the other
+ * instances here. To do that the node must be writable. It should have
+ * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
+ * is enabled.
+ *
+ * To be elected a Raft leader it needs to perform election. But it can't be
+ * done before at least synchronous quorum of the replicas is bootstrapped. And
+ * they can't be bootstrapped because wait for a leader to initialize _cluster.
+ * Cyclic dependency.
+ *
+ * This is resolved by truncation of the election quorum to the number of
+ * registered replicas, if their count is less than synchronous quorum. That
+ * helps to elect a first leader.
+ *
+ * It may seem that the first node could just declare itself a leader and then
+ * strictly follow the protocol from now on, but that won't work, because if the
+ * first node will restart after it is booted, but before quorum of replicas is
+ * booted, the cluster will stuck again.
+ *
+ * The current solution is totally safe because
+ *
+ * - after all the cluster will have node count >= quorum, if user used a
+ *   correct config (God help him if he didn't);
+ *
+ * - synchronous replication quorum is untouched - it is not truncated. Only
+ *   leader election quorum is affected. So synchronous data won't be lost.
+ */
+static inline int
+raft_election_quorum(void)
+{
+	return MIN(replication_synchro_quorum, replicaset.size);
+}
+
+/** Broadcast an event about this node changed its state to all relays. */
+static inline void
+raft_broadcast_new_state(void)
+{
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.term = raft.term;
+	req.state = raft.state;
+	raft_broadcast(&req);
+}
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine. Now until Raft is re-enabled,
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events);
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void);
+
+/**
+ * Flush Raft state changes to WAL. The callback resets itself, if during the
+ * write more changes appear.
+ */
+static void
+raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events);
+
 void
 raft_process_recovery(const struct raft_request *req)
 {
-	if (req->term != 0)
+	if (req->term != 0) {
 		raft.term = req->term;
-	if (req->vote != 0)
+		raft.volatile_term = req->term;
+	}
+	if (req->vote != 0) {
 		raft.vote = req->vote;
+		raft.volatile_vote = req->vote;
+	}
 	/*
 	 * Role is never persisted. If recovery is happening, the
 	 * node was restarted, and the former role can be false
@@ -80,34 +287,428 @@ raft_process_recovery(const struct raft_request *req)
 void
 raft_process_msg(const struct raft_request *req, uint32_t source)
 {
-	(void)source;
-	if (req->term > raft.term) {
-		// Update term.
-		// The logic will be similar, but the code
-		// below is for testing purposes.
-		raft.term = req->term;
+	assert(source > 0);
+	assert(source != instance_id);
+	/* Outdated request. */
+	if (req->term < raft.volatile_term)
+		return;
+
+	enum raft_state old_state = raft.state;
+
+	/* Term bump. */
+	if (req->term > raft.volatile_term)
+		raft_sm_schedule_new_term(req->term);
+
+	/* Vote request during the on-going election. */
+	if (req->vote != 0) {
+		switch (raft.state) {
+		case RAFT_STATE_FOLLOWER:
+		case RAFT_STATE_LEADER:
+			/*
+			 * Can't respond on vote requests when Raft is disabled.
+			 */
+			if (!raft.is_enabled)
+				break;
+			/* Check if already voted in this term. */
+			if (raft.volatile_vote != 0)
+				break;
+			/* Not a candidate. Can't accept votes. */
+			if (req->vote == instance_id)
+				break;
+			/* Can't vote for too old or incomparable nodes. */
+			if (!raft_can_vote_for(req->vclock))
+				break;
+			/*
+			 * Check if somebody is asking to vote for a third
+			 * node - nope. Make votes only when asked directly by
+			 * the new candidate. However that restriction may be
+			 * relaxed in future, if can be proven to be safe.
+			 */
+			if (req->vote != source)
+				break;
+			/*
+			 * Either the term is new, or didn't vote in the current
+			 * term yet. Anyway can vote now.
+			 */
+			raft.state = RAFT_STATE_FOLLOWER;
+			raft_sm_schedule_new_vote(req->vote);
+			break;
+		case RAFT_STATE_CANDIDATE:
+			/* Check if this is a vote for a competing candidate. */
+			if (req->vote != instance_id)
+				break;
+			/*
+			 * Vote for self was requested earlier in this round,
+			 * and now was answered by some other instance.
+			 */
+			assert(raft.volatile_vote == instance_id);
+			bool was_set = bit_set(&raft.vote_mask, source);
+			raft.vote_count += !was_set;
+			if (raft.vote_count < raft_election_quorum())
+				break;
+			raft.state = RAFT_STATE_LEADER;
+			raft.leader = instance_id;
+			break;
+		default:
+			unreachable();
+		}
+	}
+	/*
+	 * If the node does not claim to be a leader, nothing interesting. Terms
+	 * and votes are already handled.
+	 */
+	if (req->state != RAFT_STATE_LEADER)
+		goto end;
+	/* The node is a leader, but it is already known. */
+	if (source == raft.leader)
+		goto end;
+	/*
+	 * XXX: A message from a conflicting leader. Split brain, basically.
+	 * Need to decide what to do. Current solution is to do nothing. In
+	 * future either this node should try to become a leader, or should stop
+	 * all writes and require manual intervention.
+	 */
+	if (raft.leader != 0)
+		goto end;
+
+	/* New leader was elected. */
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft.leader = source;
+end:
+	if (raft.state != old_state) {
+		/*
+		 * If the node stopped being a leader - should become read-only.
+		 * If became a leader - should become read-write (if other
+		 * subsystems also allow read-write).
+		 */
+		box_update_ro_summary();
+		/*
+		 * New term and vote are not broadcasted yet. Firstly their WAL
+		 * write should be finished. But the state is volatile. It is ok
+		 * to broadcast it now.
+		 */
+		raft_broadcast_new_state();
+	}
+}
+
+void
+raft_process_heartbeat(uint32_t source)
+{
+	/*
+	 * When not a candidate - don't wait for anything. Therefore do not care
+	 * about the leader being dead.
+	 */
+	if (!raft.is_candidate)
+		return;
+	/* Don't care about heartbeats when this node is a leader itself. */
+	if (raft.state == RAFT_STATE_LEADER)
+		return;
+	/* Not interested in heartbeats from not a leader. */
+	if (raft.leader != source)
+		return;
+	/*
+	 * XXX: it may be expensive to reset the timer like that. It may be less
+	 * expensive to let the timer work, and remember last timestamp when
+	 * anything was heard from the leader. Then in the timer callback check
+	 * the timestamp, and restart the timer, if it is fine.
+	 */
+	assert(ev_is_active(&raft.timer));
+	ev_timer_stop(loop(), &raft.timer);
+	raft_sm_wait_leader_dead();
+}
+
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+	fiber_wakeup(entry->complete_data);
+}
+
+/** Synchronously write a Raft request into WAL. */
+static void
+raft_write_request(const struct raft_request *req)
+{
+	assert(raft.is_write_in_progress);
+	/*
+	 * Vclock is never persisted by Raft. It is used only to
+	 * be sent to network when vote for self.
+	 */
+	assert(req->vclock == NULL);
+	/*
+	 * State is not persisted. That would be strictly against Raft protocol.
+	 * The reason is that it does not make much sense - even if the node is
+	 * a leader now, after the node is restarted, there will be another
+	 * leader elected by that time likely.
+	 */
+	assert(req->state == 0);
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	struct xrow_header row;
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
+
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto fail;
+	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+			     fiber());
+
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto fail;
 	}
-	if (req->vote > 0) {
-		// Check whether the vote's for us.
+
+	raft_broadcast(req);
+
+	region_truncate(region, svp);
+	return;
+fail:
+	/*
+	 * XXX: the stub is supposed to be removed once it is defined what to do
+	 * when a raft request WAL write fails.
+	 */
+	panic("Could not write a raft request to WAL\n");
+}
+
+static void
+raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events)
+{
+	assert(watcher == &raft.io);
+	(void) events;
+	assert(raft.is_write_in_progress);
+	/* During write Raft can't be anything but a follower. */
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	struct raft_request req;
+	uint64_t old_term = raft.term;
+	uint32_t old_vote = raft.vote;
+	enum raft_state old_state = raft.state;
+
+	if (raft_is_fully_on_disk()) {
+end_dump:
+		raft.is_write_in_progress = false;
+		ev_check_stop(loop, watcher);
+		/*
+		 * The state machine is stable. Can see now, to what state to
+		 * go.
+		 */
+		if (!raft.is_candidate) {
+			/*
+			 * If not a candidate, can't do anything except vote for
+			 * somebody (if Raft is enabled). Nothing to do except
+			 * staying a follower without timeouts.
+			 */
+		} else if (raft.leader != 0) {
+			/* There is a known leader. Wait until it is dead. */
+			raft_sm_wait_leader_dead();
+		} else if (raft.vote == instance_id) {
+			/* Just wrote own vote. */
+			if (raft_election_quorum() == 1) {
+				raft.state = RAFT_STATE_LEADER;
+				raft.leader = instance_id;
+				/*
+				 * Make read-write (if other subsystems allow
+				 * that).
+				 */
+				box_update_ro_summary();
+			} else {
+				raft.state = RAFT_STATE_CANDIDATE;
+				/* First vote for self. */
+				raft.vote_count = 1;
+				raft.vote_mask = 0;
+				bit_set(&raft.vote_mask, instance_id);
+				raft_sm_wait_election_end();
+			}
+		} else if (raft.vote != 0) {
+			/*
+			 * Voted for some other node. Wait if it manages to
+			 * become a leader.
+			 */
+			raft_sm_wait_election_end();
+		} else {
+			/* No leaders, no votes. */
+			raft_sm_schedule_new_election();
+		}
+	} else {
+		memset(&req, 0, sizeof(req));
+		assert(raft.volatile_term >= raft.term);
+		/* Term is written always. */
+		req.term = raft.volatile_term;
+		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
+			req.vote = raft.volatile_vote;
+
+		raft_write_request(&req);
+
+		assert(req.term >= raft.term);
+		if (req.term > raft.term) {
+			raft.term = req.term;
+			raft.vote = 0;
+		}
+		if (req.vote != 0) {
+			assert(raft.vote == 0);
+			raft.vote = req.vote;
+		}
+		if (raft_is_fully_on_disk())
+			goto end_dump;
 	}
-	switch (req->state) {
-	case RAFT_STATE_FOLLOWER:
-	    break;
-	case RAFT_STATE_CANDIDATE:
-	    // Perform voting logic.
-	    break;
-	case RAFT_STATE_LEADER:
-	    // Switch to a new leader.
-	    break;
-	default:
-	    break;
+
+	memset(&req, 0, sizeof(req));
+	/* Term is encoded always. */
+	req.term = raft.term;
+	bool has_changes = old_term != raft.term;
+	if (raft.vote != 0 && old_vote != raft.vote) {
+		req.vote = raft.vote;
+		/*
+		 * When vote for self, need to send current vclock too. Two
+		 * reasons for that:
+		 *
+		 * - nodes need to vote for the instance containing the newest
+		 *   data. So as not to loose it, because some of it may be
+		 *   confirmed by the synchronous replication;
+		 *
+		 * - replication is basically stopped during election. Other
+		 *   nodes can't learn vclock of this instance through regular
+		 *   replication.
+		 */
+		if (raft.vote == instance_id)
+			req.vclock = &replicaset.vclock;
+		has_changes = true;
+	}
+	if (raft.state != old_state) {
+		req.state = raft.state;
+		has_changes = true;
 	}
+	if (has_changes)
+		raft_broadcast(&req);
+}
+
+static void
+raft_sm_pause_and_dump(void)
+{
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	if (raft.is_write_in_progress)
+		return;
+	ev_timer_stop(loop(), &raft.timer);
+	ev_check_start(loop(), &raft.io);
+	raft.is_write_in_progress = true;
+}
+
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+	assert(new_term > raft.volatile_term);
+	assert(raft.volatile_term >= raft.term);
+	raft.volatile_term = new_term;
+	/* New terms means completely new Raft state. */
+	raft.volatile_vote = 0;
+	raft.leader = 0;
+	raft.state = RAFT_STATE_FOLLOWER;
+	raft_sm_pause_and_dump();
+}
+
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+	assert(raft.volatile_vote == 0);
+	raft.volatile_vote = new_vote;
+	raft_sm_pause_and_dump();
+}
+
+static void
+raft_sm_schedule_new_election(void)
+{
+	assert(raft_is_fully_on_disk());
+	assert(raft.is_candidate);
+	assert(raft.leader == 0);
+	/* Everyone is a follower until its vote for self is persisted. */
+	raft_sm_schedule_new_term(raft.term + 1);
+	raft_sm_schedule_new_vote(instance_id);
+	box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+				 int events)
+{
+	assert(timer == &raft.timer);
+	(void)events;
+	ev_timer_stop(loop, timer);
+	raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	assert(raft.leader != 0);
+	double death_timeout = replication_disconnect_timeout();
+	ev_timer_set(&raft.timer, death_timeout, death_timeout);
+}
+
+static void
+raft_sm_wait_election_end(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(raft.is_candidate);
+	assert(raft.state == RAFT_STATE_FOLLOWER ||
+	       (raft.state == RAFT_STATE_CANDIDATE &&
+		raft.volatile_vote == instance_id));
+	assert(raft.leader == 0);
+	double election_timeout = raft.election_timeout +
+				  raft_new_random_election_shift();
+	ev_timer_set(&raft.timer, election_timeout, election_timeout);
+}
+
+static void
+raft_sm_start(void)
+{
+	assert(!ev_is_active(&raft.timer));
+	assert(!ev_is_active(&raft.io));
+	assert(!raft.is_write_in_progress);
+	assert(!raft.is_enabled);
+	assert(raft.state == RAFT_STATE_FOLLOWER);
+	raft.is_enabled = true;
+	raft.is_candidate = raft.is_cfg_candidate;
+	if (!raft.is_candidate)
+		/* Nop. */;
+	else if (raft.leader != 0)
+		raft_sm_wait_leader_dead();
+	else
+		raft_sm_schedule_new_election();
+	box_update_ro_summary();
+	/*
+	 * When Raft is enabled, send the complete state. Because
+	 * it wasn't sent in disabled state.
+	 */
+	struct raft_request req;
+	raft_serialize_for_network(&req, NULL);
+	raft_broadcast(&req);
+}
+
+static void
+raft_sm_stop(void)
+{
+	assert(raft.is_enabled);
+	raft.is_enabled = false;
+	raft.is_candidate = false;
+	box_update_ro_summary();
 }
 
 void
 raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
 {
 	memset(req, 0, sizeof(*req));
+	/*
+	 * Volatile state is never used for any communications.
+	 * Use only persisted state.
+	 */
 	req->term = raft.term;
 	req->vote = raft.vote;
 	req->state = raft.state;
@@ -128,29 +729,86 @@ raft_serialize_for_disk(struct raft_request *req)
 void
 raft_cfg_is_enabled(bool is_enabled)
 {
-	raft.is_enabled = is_enabled;
+	if (is_enabled == raft.is_enabled)
+		return;
+
+	if (!is_enabled)
+		raft_sm_stop();
+	else
+		raft_sm_start();
 }
 
 void
 raft_cfg_is_candidate(bool is_candidate)
 {
-	raft.is_candidate = is_candidate;
+	bool old_is_candidate = raft.is_candidate;
+	raft.is_cfg_candidate = is_candidate;
+	raft.is_candidate = is_candidate && raft.is_enabled;
+	if (raft.is_candidate == old_is_candidate)
+		return;
+
+	if (raft.is_candidate) {
+		assert(raft.state == RAFT_STATE_FOLLOWER);
+		/*
+		 * If there is an on-going WAL write, it means there was some
+		 * node who sent newer data to this node.
+		 */
+		if (raft.leader == 0 && raft_is_fully_on_disk())
+			raft_sm_schedule_new_election();
+	} else if (raft.state != RAFT_STATE_FOLLOWER) {
+		raft.state = RAFT_STATE_FOLLOWER;
+		raft_broadcast_new_state();
+	}
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_election_timeout(double timeout)
 {
+	if (timeout == raft.election_timeout)
+		return;
+
 	raft.election_timeout = timeout;
+	if (raft.vote != 0 && raft.leader == 0) {
+		assert(ev_is_active(&raft.timer));
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + raft.election_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+	}
 }
 
 void
 raft_cfg_election_quorum(void)
 {
+	if (raft.state != RAFT_STATE_CANDIDATE ||
+	    raft.state == RAFT_STATE_LEADER)
+		return;
+	if (raft.vote_count < raft_election_quorum())
+		return;
+	/*
+	 * The node is a candidate. It means its state if fully synced with
+	 * disk. Otherwise it would be a follower.
+	 */
+	assert(!raft.is_write_in_progress);
+	raft.state = RAFT_STATE_LEADER;
+	raft.leader = instance_id;
+	raft_broadcast_new_state();
+	box_update_ro_summary();
 }
 
 void
 raft_cfg_death_timeout(void)
 {
+	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+	    raft.leader != 0) {
+		assert(ev_is_active(&raft.timer));
+		double death_timeout = replication_disconnect_timeout();
+		double timeout = ev_timer_remaining(loop(), &raft.timer) -
+				 raft.timer.at + death_timeout;
+		ev_timer_stop(loop(), &raft.timer);
+		ev_timer_set(&raft.timer, timeout, timeout);
+	}
 }
 
 void
@@ -163,3 +821,10 @@ raft_broadcast(const struct raft_request *req)
 		}
 	}
 }
+
+void
+raft_init(void)
+{
+	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
+	ev_check_init(&raft.io, raft_sm_dump_step);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index db64cf933..111a9c16e 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,34 +31,141 @@
  */
 #include <stdint.h>
 #include <stdbool.h>
+#include "tarantool_ev.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
+/**
+ * This is an implementation of Raft leader election protocol, separated from
+ * synchronous replication part.
+ *
+ * The protocol describes an algorithm which helps to elect a single leader in
+ * the cluster, which is supposed to handle write requests. And re-elect a new
+ * leader, when the current leader dies.
+ *
+ * The implementation follows the protocol to the letter except a few important
+ * details.
+ *
+ * Firstly, the original Raft assumes, that all nodes share the same log record
+ * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
+ * node has its own LSN in its own component of vclock. That makes the election
+ * messages a bit heavier, because the nodes need to send and compare complete
+ * vclocks of each other instead of a single number like in the original Raft.
+ * But logic becomes simpler. Because in the original Raft there is a problem of
+ * uncertainty about what to do with records of an old leader right after a new
+ * leader is elected. They could be rolled back or confirmed depending on
+ * circumstances. The issue disappears when vclock is used.
+ *
+ * Secondly, leader election works differently during cluster bootstrap, until
+ * number of bootstrapped replicas becomes >= election quorum. That arises from
+ * specifics of replicas bootstrap and order of systems initialization. In
+ * short: during bootstrap a leader election may use a smaller election quorum
+ * than the configured one. See more details in the code.
+ */
+
 struct raft_request;
 struct vclock;
 
 enum raft_state {
+	/**
+	 * Can't write. Can only accept data from a leader. Node in this state
+	 * either monitors an existing leader, or there is an on-going election
+	 * and the node voted for another node, or it can't be a candidate and
+	 * does not do anything.
+	 */
 	RAFT_STATE_FOLLOWER = 1,
+	/**
+	 * The node can't write. There is an active election, in which the node
+	 * voted for self. Now it waits for election outcome.
+	 */
 	RAFT_STATE_CANDIDATE = 2,
+	/** Election was successful. The node accepts write requests. */
 	RAFT_STATE_LEADER = 3,
 };
 
 extern const char *raft_state_strs[];
 
 struct raft {
+	/** Instance ID of leader of the current term. */
 	uint32_t leader;
+	/** State of the instance. */
 	enum raft_state state;
+	/**
+	 * Volatile part of the Raft state, whose WAL write may be still
+	 * in-progress, and yet the state may be already used. Volatile state is
+	 * never sent to anywhere, but the state machine makes decisions based
+	 * on it. That is vital.
+	 * As an example, volatile vote needs to be used to reject votes inside
+	 * a term, where the instance already voted (even if the vote WAL write
+	 * is not finished yet). Otherwise the instance would try to write
+	 * several votes inside one term.
+	 */
+	uint64_t volatile_term;
+	uint32_t volatile_vote;
+	/**
+	 * Flag whether Raft is enabled. When disabled, it still persists terms
+	 * so as to quickly enroll into the cluster when (if) it is enabled. In
+	 * everything else disabled Raft does not affect instance work.
+	 */
 	bool is_enabled;
+	/**
+	 * Flag whether the node can become a leader. It is an accumulated value
+	 * of configuration options Raft enabled and Raft candidate. If at least
+	 * one is false - the instance is not a candidate.
+	 */
 	bool is_candidate;
+	/** Flag whether the instance is allowed to be a leader. */
+	bool is_cfg_candidate;
+	/**
+	 * Flag whether Raft currently tries to write something into WAL. It
+	 * happens asynchronously, not right after Raft state is updated.
+	 */
+	bool is_write_in_progress;
+	/**
+	 * Persisted Raft state. These values are used when need to tell current
+	 * Raft state to other nodes.
+	 */
 	uint64_t term;
 	uint32_t vote;
+	/**
+	 * Bit 1 on position N means that a vote from instance with ID = N was
+	 * obtained.
+	 */
+	uint32_t vote_mask;
+	/** Number of votes for this instance. Valid only in candidate state. */
+	int vote_count;
+	/** State machine timed event trigger. */
+	struct ev_timer timer;
+	/**
+	 * Dump of Raft state in the end of event loop, when it is changed.
+	 */
+	struct ev_check io;
+	/** Configured election timeout in seconds. */
 	double election_timeout;
 };
 
 extern struct raft raft;
 
+/**
+ * A flag whether the instance is read-only according to Raft. Even if Raft
+ * allows writes though, it does not mean the instance is writable. It can be
+ * affected by box.cfg.read_only, connection quorum.
+ */
+static inline bool
+raft_is_ro(void)
+{
+	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+/** See if the instance can accept rows from an instance with the given ID. */
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+	return !raft.is_enabled || raft.leader == source_id;
+}
+
 /** Check if Raft is enabled. */
 static inline bool
 raft_is_enabled(void)
@@ -78,6 +185,13 @@ raft_process_recovery(const struct raft_request *req);
 void
 raft_process_msg(const struct raft_request *req, uint32_t source);
 
+/**
+ * Process a heartbeat message from an instance with the given ID. It is used to
+ * watch leader's health and start election when necessary.
+ */
+void
+raft_process_heartbeat(uint32_t source);
+
 /** Configure whether Raft is enabled. */
 void
 raft_cfg_is_enabled(bool is_enabled);
@@ -130,6 +244,10 @@ raft_serialize_for_disk(struct raft_request *req);
 void
 raft_broadcast(const struct raft_request *req);
 
+/** Initialize Raft global data structures. */
+void
+raft_init(void);
+
 #if defined(__cplusplus)
 }
 #endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74581db9c..4f9bbc0de 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
 		relay_send(relay, row);
 }
 
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+	recovery_delete(relay->r);
+	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
 struct relay_raft_msg {
 	struct cmsg base;
 	struct cmsg_hop route;
@@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
 	struct xrow_header row;
 	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
 	try {
+		if (msg->req.state == RAFT_STATE_LEADER)
+			relay_restart_recovery(msg->relay);
 		relay_send(msg->relay, &row);
 	} catch (Exception *e) {
 		relay_set_error(msg->relay, e);
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
@ 2020-09-04  8:13   ` Serge Petrenko
  2020-09-07 22:54     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  8:13 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> Applier is going to need its numeric ID in order to tell the
> future Raft module who is a sender of a Raft message. An
> alternative would be to add sender ID to each Raft message, but
> this looks like a crutch. Moreover, applier still needs to know
> its numeric ID in order to notify Raft about heartbeats from the
> peer node.
>
> Needed for #1146

Hi! Thanks for the patch!

> ---
>   src/box/applier.cc | 19 +++++++++++++++++++
>   src/box/applier.h  |  2 ++
>   2 files changed, 21 insertions(+)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index c1d07ca54..699b5a683 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -67,6 +67,23 @@ applier_set_state(struct applier *applier, enum applier_state state)
>   	trigger_run_xc(&applier->on_state, applier);
>   }
>   
> +static inline void
> +applier_assign_instance_id(struct applier *applier)
Maybe call it `applier_set_id`? This way it's shorter and resembles 
`replica_set_id`.
> +{
> +	/*
> +	 * After final join, the applier already received latest
> +	 * records from _cluster, including the record about
> +	 * source instance. It can be absent in case the source is
> +	 * an anonymous replica.
> +	 */
> +	assert(applier->state == APPLIER_JOINED);
> +	struct replica *replica = replica_by_uuid(&applier->uuid);
> +	if (replica != NULL)
> +		applier->instance_id = replica->id;
> +	else
> +		assert(applier->instance_id == 0);
> +}
> +
>   /**
>    * Write a nice error message to log file on SocketError or ClientError
>    * in applier_f().
> @@ -603,6 +620,7 @@ applier_join(struct applier *applier)
>   	say_info("final data received");
>   
>   	applier_set_state(applier, APPLIER_JOINED);
> +	applier_assign_instance_id(applier);
>   	applier_set_state(applier, APPLIER_READY);
>   }
>   
> @@ -1207,6 +1225,7 @@ applier_subscribe(struct applier *applier)
>   		    instance_id != REPLICA_ID_NIL) {
>   			say_info("final data received");
>   			applier_set_state(applier, APPLIER_JOINED);
> +			applier_assign_instance_id(applier);
>   			applier_set_state(applier, APPLIER_READY);
>   			applier_set_state(applier, APPLIER_FOLLOW);
>   		}
> diff --git a/src/box/applier.h b/src/box/applier.h
> index 6e979a806..15ca1fcfd 100644
> --- a/src/box/applier.h
> +++ b/src/box/applier.h
> @@ -95,6 +95,8 @@ struct applier {
>   	ev_tstamp lag;
>   	/** The last box_error_code() logged to avoid log flooding */
>   	uint32_t last_logged_errcode;
> +	/** Remote instance ID. */
> +	uint32_t instance_id;
>   	/** Remote instance UUID */
>   	struct tt_uuid uuid;
>   	/** Remote URI (string) */

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag Vladislav Shpilevoy
@ 2020-09-04  8:17   ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  8:17 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> An instance is writable if box.cfg.read_only is false, and it is
> not orphan. Update of the final read-only state of the instance
> needs to fire read-only update triggers, and notify the engines.
> These 2 flags were easy and cheap to check on each operation, and
> the triggers were easy to use since both flags are stored and
> updated inside box.cc.
>
> That is going to change when Raft is introduced. Raft will add 2
> more checks:
>
>    - A flag if Raft is enabled on the node. If it is not, then Raft
>      state won't affect whether the instance is writable;
>
>    - When Raft is enabled, it will allow writes on a leader only.
>
> It means a check for being read-only would look like this:
>
>      is_ro || is_orphan || (raft_is_enabled() && !raft_is_leader())
>
> This is significantly slower. Besides, Raft somehow needs to
> access the read-only triggers and engine API - this looks wrong.
>
> The patch introduces a new flag is_ro_summary. The flag
> incorporates all the read-only conditions into one flag. When some
> subsystem may change read-only state of the instance, it needs to
> call box_update_ro_summary(), and the function takes care of
> updating the summary flag, running the triggers, and notifying the
> engines.
>
> Raft will use this function when its state or config will change.
>
> Needed for #1146

Thanks for the patch! LGTM.


> ---
>   src/box/box.cc | 44 +++++++++++++++++++++++++++-----------------
>   src/box/box.h  |  6 ++++++
>   2 files changed, 33 insertions(+), 17 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index faffd5769..0813603c0 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -129,6 +129,14 @@ static bool is_local_recovery = false;
>    */
>   static bool is_orphan;
>   
> +/**
> + * Summary flag incorporating all the instance attributes,
> + * affecting ability to write. Currently these are:
> + * - is_ro;
> + * - is_orphan;
> + */
> +static bool is_ro_summary = true;
> +
>   /**
>    * The pool of fibers in the transaction processor thread
>    * working on incoming messages from net, wal and other
> @@ -144,11 +152,24 @@ static struct fiber_pool tx_fiber_pool;
>    */
>   static struct cbus_endpoint tx_prio_endpoint;
>   
> +void
> +box_update_ro_summary(void)
> +{
> +	bool old_is_ro_summary = is_ro_summary;
> +	is_ro_summary = is_ro || is_orphan;
> +	/* In 99% nothing changes. Filter this out first. */
> +	if (is_ro_summary == old_is_ro_summary)
> +		return;
> +
> +	if (is_ro_summary)
> +		engine_switch_to_ro();
> +	fiber_cond_broadcast(&ro_cond);
> +}
> +
>   static int
>   box_check_writable(void)
>   {
> -	/* box is only writable if box.cfg.read_only == false and */
> -	if (is_ro || is_orphan) {
> +	if (is_ro_summary) {
>   		diag_set(ClientError, ER_READONLY);
>   		diag_log();
>   		return -1;
> @@ -253,20 +274,14 @@ box_check_ro(void);
>   void
>   box_set_ro(void)
>   {
> -	bool ro = box_check_ro();
> -	if (ro == is_ro)
> -		return; /* nothing to do */
> -	if (ro)
> -		engine_switch_to_ro();
> -
> -	is_ro = ro;
> -	fiber_cond_broadcast(&ro_cond);
> +	is_ro = box_check_ro();
> +	box_update_ro_summary();
>   }
>   
>   bool
>   box_is_ro(void)
>   {
> -	return is_ro || is_orphan;
> +	return is_ro_summary;
>   }
>   
>   bool
> @@ -293,13 +308,8 @@ box_wait_ro(bool ro, double timeout)
>   void
>   box_do_set_orphan(bool orphan)
>   {
> -	if (is_orphan == orphan)
> -		return; /* nothing to do */
> -	if (orphan)
> -		engine_switch_to_ro();
> -
>   	is_orphan = orphan;
> -	fiber_cond_broadcast(&ro_cond);
> +	box_update_ro_summary();
>   }
>   
>   void
> diff --git a/src/box/box.h b/src/box/box.h
> index f9bd8b98d..5988264a5 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -137,6 +137,12 @@ box_set_orphan(bool orphan);
>   void
>   box_do_set_orphan(bool orphan);
>   
> +/**
> + * Update the final RO flag based on the instance flags and state.
> + */
> +void
> +box_update_ro_summary(void);
> +
>   /**
>    * Iterate over all spaces and save them to the
>    * snapshot file.

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
@ 2020-09-04  8:20   ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  8:20 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> Relay.cc and box.cc obtained box.cfg.wal_dir value using
> cfg_gets() call. To initialize WAL and create struct recovery
> objects.
>
> That is not only a bit dangerous (cfg_gets() uses Lua API and can
> throw a Lua error) and slow, but also not necessary - wal_dir
> parameter is constant, it can't be changed after instance start.
>
> It means, the value can be stored somewhere one time and then used
> without Lua.
>
> Main motivation is that the WAL directory path will be needed
> inside relay threads to restart their recovery iterators in the
> Raft patch. They can't use cfg_gets(), because Lua lives in TX
> thread. But can access a constant global variable, introduced in
> this patch (it existed before, but now has a method to get it).
>
> Needed for #1146


LGTM

> ---
>   src/box/box.cc   | 9 ++++-----
>   src/box/relay.cc | 7 ++-----
>   src/box/wal.c    | 6 ++++++
>   src/box/wal.h    | 7 +++++++
>   4 files changed, 19 insertions(+), 10 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 0813603c0..eeb00d5e2 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -2395,8 +2395,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   	wal_stream_create(&wal_stream);
>   
>   	struct recovery *recovery;
> -	recovery = recovery_new(cfg_gets("wal_dir"),
> -				cfg_geti("force_recovery"),
> +	recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
>   				checkpoint_vclock);
>   
>   	/*
> @@ -2469,7 +2468,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
>   		recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
>   				      cfg_getd("wal_dir_rescan_delay"));
>   		while (true) {
> -			if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock))
> +			if (path_lock(wal_dir(), &wal_dir_lock))
>   				diag_raise();
>   			if (wal_dir_lock >= 0)
>   				break;
> @@ -2616,7 +2615,7 @@ box_cfg_xc(void)
>   	 * Lock the write ahead log directory to avoid multiple
>   	 * instances running in the same dir.
>   	 */
> -	if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock) < 0)
> +	if (path_lock(wal_dir(), &wal_dir_lock) < 0)
>   		diag_raise();
>   	if (wal_dir_lock < 0) {
>   		/**
> @@ -2625,7 +2624,7 @@ box_cfg_xc(void)
>   		 * WAL dir must contain at least one xlog.
>   		 */
>   		if (!cfg_geti("hot_standby") || checkpoint == NULL)
> -			tnt_raise(ClientError, ER_ALREADY_RUNNING, cfg_gets("wal_dir"));
> +			tnt_raise(ClientError, ER_ALREADY_RUNNING, wal_dir());
>   	}
>   
>   	struct journal bootstrap_journal;
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a7843a8c2..124b0f52f 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -34,7 +34,6 @@
>   #include "tt_static.h"
>   #include "scoped_guard.h"
>   #include "cbus.h"
> -#include "cfg.h"
>   #include "errinj.h"
>   #include "fiber.h"
>   #include "say.h"
> @@ -369,8 +368,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
>   		relay_delete(relay);
>   	});
>   
> -	relay->r = recovery_new(cfg_gets("wal_dir"), false,
> -			       start_vclock);
> +	relay->r = recovery_new(wal_dir(), false, start_vclock);
>   	vclock_copy(&relay->stop_vclock, stop_vclock);
>   
>   	int rc = cord_costart(&relay->cord, "final_join",
> @@ -731,8 +729,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
>   	});
>   
>   	vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
> -	relay->r = recovery_new(cfg_gets("wal_dir"), false,
> -			        replica_clock);
> +	relay->r = recovery_new(wal_dir(), false, replica_clock);
>   	vclock_copy(&relay->tx.vclock, replica_clock);
>   	relay->version_id = replica_version_id;
>   
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 045006b60..e181e58d9 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -201,6 +201,12 @@ wal_mode(void)
>   	return wal_writer_singleton.wal_mode;
>   }
>   
> +const char *
> +wal_dir(void)
> +{
> +	return wal_writer_singleton.wal_dir.dirname;
> +}
> +
>   static void
>   wal_write_to_disk(struct cmsg *msg);
>   
> diff --git a/src/box/wal.h b/src/box/wal.h
> index 9d0cada46..581306fe9 100644
> --- a/src/box/wal.h
> +++ b/src/box/wal.h
> @@ -98,6 +98,13 @@ wal_enable(void);
>   void
>   wal_free(void);
>   
> +/**
> + * Get WAL directory path. The value never changes after box is
> + * configured first time. Safe to use from multiple threads.
> + */
> +const char *
> +wal_dir(void);
> +
>   struct wal_watcher_msg {
>   	struct cmsg cmsg;
>   	struct wal_watcher *watcher;

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy
@ 2020-09-04  8:24   ` Serge Petrenko
  2020-09-07 22:54     ` Vladislav Shpilevoy
  2020-09-07 15:45   ` Sergey Ostanevich
  1 sibling, 1 reply; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  8:24 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> Struct replicaset didn't store a number of registered replicas.
> Only an array, which was necessary to fullscan each time when want
> to find the count.
>
> That is going to be needed in Raft to calculate election quorum.
> The patch makes the count tracked so as it could be found for
> constant time by simply reading an integer.
>
> Needed for #1146
Thanks for the patch!

LGTM with one comment.

> ---
>   src/box/replication.cc | 3 +++
>   src/box/replication.h  | 7 +++++++
>   2 files changed, 10 insertions(+)
>
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index ef0e2411d..20f16206a 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
>   						   tt_uuid_str(&replica->uuid));
>   	}
>   	replicaset.replica_by_id[replica_id] = replica;
> +	++replicaset.size;
>   
>   	say_info("assigned id %d to replica %s",
>   		 replica->id, tt_uuid_str(&replica->uuid));
> @@ -267,6 +268,8 @@ replica_clear_id(struct replica *replica)
>   	 * replication.
>   	 */
>   	replicaset.replica_by_id[replica->id] = NULL;
> +	assert(replicaset.size > 0);
> +	--replicaset.size;
>   	if (replica->id == instance_id) {
>   		/* See replica_check_id(). */
>   		assert(replicaset.is_joining);
> diff --git a/src/box/replication.h b/src/box/replication.h
> index ddc2bddf4..69cc820c9 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -217,6 +217,13 @@ struct replicaset {
>   	bool is_joining;
>   	/* A number of anonymous replicas following this instance. */
>   	int anon_count;
> +	/**
> +	 * Number of registered replicas. That includes all of them - connected,
> +	 * disconnected, connected not directly, just present in _cluster. If an
> +	 * instance has an ID, has the same replicaset UUID, then it is
> +	 * accounted here.
> +	 */
> +	int size;

We already have `anon_count`. Maybe call this field `registered_count` 
for consistency?
Besides `size` is too general of a term.

>   	/** Applier state. */
>   	struct {
>   		/**

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
@ 2020-09-04  8:59   ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  8:59 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> box.internal.raft_*() helper functions were introduced to test
> the persistency. Any state change is saved into WAL and into
> snapshot.
>
> Part of #1146
> ---


LGTM.


>   src/box/CMakeLists.txt     |  1 +
>   src/box/box.cc             |  8 +++++
>   src/box/iproto_constants.h | 13 ++++++++
>   src/box/lua/misc.cc        |  1 +
>   src/box/memtx_engine.c     | 35 ++++++++++++++++++++
>   src/box/raft.c             | 65 ++++++++++++++++++++++++++++++++++++
>   src/box/raft.h             | 67 ++++++++++++++++++++++++++++++++++++++
>   src/box/xrow.c             | 56 +++++++++++++++++++++++++++++++
>   src/box/xrow.h             | 12 +++++++
>   9 files changed, 258 insertions(+)
>   create mode 100644 src/box/raft.c
>   create mode 100644 src/box/raft.h
>
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index b8b2689d2..29c3bfe79 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -170,6 +170,7 @@ add_library(box STATIC
>       port.c
>       txn.c
>       txn_limbo.c
> +    raft.c
>       box.cc
>       gc.c
>       checkpoint_schedule.c
> diff --git a/src/box/box.cc b/src/box/box.cc
> index eeb00d5e2..281917af2 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
>   #include "sequence.h"
>   #include "sql_stmt_cache.h"
>   #include "msgpack.h"
> +#include "raft.h"
>   #include "trivia/util.h"
>   
>   static char status[64] = "unknown";
> @@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>   			diag_raise();
>   		return;
>   	}
> +	if (iproto_type_is_raft_request(row->type)) {
> +		struct raft_request raft_req;
> +		if (xrow_decode_raft(row, &raft_req) != 0)
> +			diag_raise();
> +		raft_process_recovery(&raft_req);
> +		return;
> +	}
>   	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>   	if (request.type != IPROTO_NOP) {
>   		struct space *space = space_cache_find_xc(request.space_id);
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 4f5a2b195..8a11626b3 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -219,6 +219,8 @@ enum iproto_type {
>   	/** The maximum typecode used for box.stat() */
>   	IPROTO_TYPE_STAT_MAX,
>   
> +	IPROTO_RAFT = 30,
> +
>   	/** A confirmation message for synchronous transactions. */
>   	IPROTO_CONFIRM = 40,
>   	/** A rollback message for synchronous transactions. */
> @@ -258,6 +260,11 @@ enum iproto_type {
>   /** IPROTO type name by code */
>   extern const char *iproto_type_strs[];
>   
> +enum iproto_raft_keys {
> +	IPROTO_RAFT_TERM = 0,
> +	IPROTO_RAFT_VOTE = 1,
> +};
> +
>   /**
>    * Returns IPROTO type name by @a type code.
>    * @param type IPROTO type.
> @@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
>   	return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
>   }
>   
> +static inline bool
> +iproto_type_is_raft_request(uint32_t type)
> +{
> +	return type == IPROTO_RAFT;
> +}
> +
>   /** This is an error. */
>   static inline bool
>   iproto_type_is_error(uint32_t type)
> diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
> index 5da84b35a..e356f2d4b 100644
> --- a/src/box/lua/misc.cc
> +++ b/src/box/lua/misc.cc
> @@ -40,6 +40,7 @@
>   #include "box/tuple.h"
>   #include "box/tuple_format.h"
>   #include "box/lua/tuple.h"
> +#include "box/xrow.h"
>   #include "mpstream/mpstream.h"
>   
>   static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index dfd6fce6e..7ebed7aa8 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -48,6 +48,7 @@
>   #include "replication.h"
>   #include "schema.h"
>   #include "gc.h"
> +#include "raft.h"
>   
>   /* sync snapshot every 16MB */
>   #define SNAP_SYNC_INTERVAL	(1 << 24)
> @@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
>   	return 0;
>   }
>   
> +static int
> +memtx_engine_recover_raft(const struct xrow_header *row)
> +{
> +	assert(row->type == IPROTO_RAFT);
> +	struct raft_request req;
> +	if (xrow_decode_raft(row, &req) != 0)
> +		return -1;
> +	raft_process_recovery(&req);
> +	return 0;
> +}
> +
>   static int
>   memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
>   				  struct xrow_header *row)
>   {
>   	assert(row->bodycnt == 1); /* always 1 for read */
>   	if (row->type != IPROTO_INSERT) {
> +		if (row->type == IPROTO_RAFT)
> +			return memtx_engine_recover_raft(row);
>   		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
>   			 (uint32_t) row->type);
>   		return -1;
> @@ -477,6 +491,7 @@ struct checkpoint {
>   	/** The vclock of the snapshot file. */
>   	struct vclock vclock;
>   	struct xdir dir;
> +	struct raft_request raft;
>   	/**
>   	 * Do nothing, just touch the snapshot file - the
>   	 * checkpoint already exists.
> @@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
>   	opts.free_cache = true;
>   	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
>   	vclock_create(&ckpt->vclock);
> +	raft_serialize_for_disk(&ckpt->raft);
>   	ckpt->touch = false;
>   	return ckpt;
>   }
> @@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
>   	return 0;
>   };
>   
> +static int
> +checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
> +{
> +	struct xrow_header row;
> +	struct region *region = &fiber()->gc;
> +	uint32_t svp = region_used(region);
> +	int rc = -1;
> +	if (xrow_encode_raft(&row, region, req) != 0)
> +		goto finish;
> +	if (checkpoint_write_row(l, &row) != 0)
> +		goto finish;
> +	rc = 0;
> +finish:
> +	region_truncate(region, svp);
> +	return rc;
> +}
> +
>   static int
>   checkpoint_f(va_list ap)
>   {
> @@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
>   		if (rc != 0)
>   			goto fail;
>   	}
> +	if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
> +		goto fail;
>   	if (xlog_flush(&snap) < 0)
>   		goto fail;
>   
> diff --git a/src/box/raft.c b/src/box/raft.c
> new file mode 100644
> index 000000000..511fe42f5
> --- /dev/null
> +++ b/src/box/raft.c
> @@ -0,0 +1,65 @@
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "raft.h"
> +
> +#include "error.h"
> +#include "journal.h"
> +#include "xrow.h"
> +#include "small/region.h"
> +
> +/** Raft state of this instance. */
> +struct raft raft = {
> +	.term = 1,
> +	.vote = 0,
> +};
> +
> +void
> +raft_process_recovery(const struct raft_request *req)
> +{
> +	if (req->term != 0)
> +		raft.term = req->term;
> +	if (req->vote != 0)
> +		raft.vote = req->vote;
> +}
> +
> +void
> +raft_serialize_for_network(struct raft_request *req)
> +{
> +	req->term = raft.term;
> +	req->vote = raft.vote;
> +}
> +
> +void
> +raft_serialize_for_disk(struct raft_request *req)
> +{
> +	req->term = raft.term;
> +	req->vote = raft.vote;
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> new file mode 100644
> index 000000000..31f7becdb
> --- /dev/null
> +++ b/src/box/raft.h
> @@ -0,0 +1,67 @@
> +#pragma once
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#if defined(__cplusplus)
> +extern "C" {
> +#endif
> +
> +struct raft_request;
> +
> +struct raft {
> +	uint64_t term;
> +	uint32_t vote;
> +};
> +
> +extern struct raft raft;
> +
> +/** Process a raft entry stored in WAL/snapshot. */
> +void
> +raft_process_recovery(const struct raft_request *req);
> +
> +/**
> + * Save complete Raft state into a request to be sent to other instances of the
> + * cluster. It is allowed to save anything here, not only persistent state.
> + */
> +void
> +raft_serialize_for_network(struct raft_request *req);
> +
> +/**
> + * Save complete Raft state into a request to be persisted on disk. Only term
> + * and vote are being persisted.
> + */
> +void
> +raft_serialize_for_disk(struct raft_request *req);
> +
> +#if defined(__cplusplus)
> +}
> +#endif
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 95ddb1fe7..1923bacfc 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
>   	return 0;
>   }
>   
> +int
> +xrow_encode_raft(struct xrow_header *row, struct region *region,
> +		 const struct raft_request *r)
> +{
> +	size_t size = mp_sizeof_map(2) +
> +		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
> +		      mp_sizeof_uint(r->term) +
> +		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
> +		      mp_sizeof_uint(r->vote);
> +	char *buf = region_alloc(region, size);
> +	if (buf == NULL) {
> +		diag_set(OutOfMemory, size, "region_alloc", "buf");
> +		return -1;
> +	}
> +	memset(row, 0, sizeof(*row));
> +	row->type = IPROTO_RAFT;
> +	row->body[0].iov_base = buf;
> +	row->body[0].iov_len = size;
> +	row->group_id = GROUP_LOCAL;
> +	row->bodycnt = 1;
> +	buf = mp_encode_map(buf, 2);
> +	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
> +	buf = mp_encode_uint(buf, r->term);
> +	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
> +	buf = mp_encode_uint(buf, r->vote);
> +	return 0;
> +}
> +
> +int
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
> +{
> +	/* TODO: handle bad format. */
> +	assert(row->type == IPROTO_RAFT);
> +	assert(row->bodycnt == 1);
> +	assert(row->group_id == GROUP_LOCAL);
> +	memset(r, 0, sizeof(*r));
> +	const char *pos = row->body[0].iov_base;
> +	uint32_t map_size = mp_decode_map(&pos);
> +	for (uint32_t i = 0; i < map_size; ++i)
> +	{
> +		uint64_t key = mp_decode_uint(&pos);
> +		switch (key) {
> +		case IPROTO_RAFT_TERM:
> +			r->term = mp_decode_uint(&pos);
> +			break;
> +		case IPROTO_RAFT_VOTE:
> +			r->vote = mp_decode_uint(&pos);
> +			break;
> +		default:
> +			mp_next(&pos);
> +			break;
> +		}
> +	}
> +	return 0;
> +}
> +
>   int
>   xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
>   {
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 58d47b12d..c234f6f88 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
>   int
>   xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
>   
> +struct raft_request {
> +	uint64_t term;
> +	uint32_t vote;
> +};
> +
> +int
> +xrow_encode_raft(struct xrow_header *row, struct region *region,
> +		 const struct raft_request *r);
> +
> +int
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
> +
>   /**
>    * CALL/EVAL request.
>    */

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
@ 2020-09-04  9:07   ` Serge Petrenko
  2020-09-07 22:55     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  9:07 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> The new options are:
>
> - raft_is_enabled - enable/disable Raft. When disabled, the node
>    is supposed to work like if Raft does not exist. Like earlier;
>
> - raft_is_candidate - a flag whether the instance can try to
>    become a leader. Note, it can vote for other nodes regardless of
>    value of this option;
>
> - raft_election_timeout - how long need to wait until election
>    end, in seconds.
>
> The options don't do anything now. They are added separately in
> order to keep such mundane changes from the main Raft commit, to
> simplify its review.
>
> Part of #1146


Thanks for the patch!

Please see my comment below.

> ---
>   src/box/box.cc                  | 91 +++++++++++++++++++++++++++++++++
>   src/box/box.h                   |  3 ++
>   src/box/lua/cfg.cc              | 27 ++++++++++
>   src/box/lua/load_cfg.lua        | 15 ++++++
>   src/box/raft.c                  | 30 +++++++++++
>   src/box/raft.h                  | 35 +++++++++++++
>   test/app-tap/init_script.result |  3 ++
>   test/box/admin.result           |  6 +++
>   test/box/cfg.result             | 12 +++++
>   9 files changed, 222 insertions(+)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 281917af2..5f04a1a78 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -472,6 +472,40 @@ box_check_uri(const char *source, const char *option_name)
>   	}
>   }
>   
> +static int
> +box_check_raft_is_enabled(void)
> +{
> +	int b = cfg_getb("raft_is_enabled");
> +	if (b < 0) {
> +		diag_set(ClientError, ER_CFG, "raft_is_enabled",
> +			 "the value must be a boolean");
> +	}
> +	return b;
> +}
> +
> +static int
> +box_check_raft_is_candidate(void)
> +{
> +	int b = cfg_getb("raft_is_candidate");
> +	if (b < 0) {
> +		diag_set(ClientError, ER_CFG, "raft_is_candidate",
> +			 "the value must be a boolean");
> +	}
> +	return b;
> +}
> +
> +static double
> +box_check_raft_election_timeout(void)
> +{
> +	double d = cfg_getd("raft_election_timeout");
> +	if (d == 0) {

Should be "d <= 0" here?

Otherwise you end up with a diag_raise without appropriate diag_set
when raft_election_timeout is negative.


> +		diag_set(ClientError, ER_CFG, "raft_election_timeout",
> +			 "the value must be a positive number");
> +		return -1;
> +	}
> +	return d;
> +}
> +
>   static void
>   box_check_replication(void)
>   {
> @@ -729,6 +763,12 @@ box_check_config(void)
>   	box_check_uri(cfg_gets("listen"), "listen");
>   	box_check_instance_uuid(&uuid);
>   	box_check_replicaset_uuid(&uuid);
> +	if (box_check_raft_is_enabled() < 0)
> +		diag_raise();
> +	if (box_check_raft_is_candidate() < 0)
> +		diag_raise();
> +	if (box_check_raft_election_timeout() < 0)
> +		diag_raise();
>   	box_check_replication();
>   	box_check_replication_timeout();
>   	box_check_replication_connect_timeout();
> @@ -751,6 +791,36 @@ box_check_config(void)
>   		diag_raise();
>   }
>   
> +int
> +box_set_raft_is_enabled(void)
> +{
> +	int b = box_check_raft_is_enabled();
> +	if (b < 0)
> +		return -1;
> +	raft_cfg_is_enabled(b);
> +	return 0;
> +}
> +
> +int
> +box_set_raft_is_candidate(void)
> +{
> +	int b = box_check_raft_is_candidate();
> +	if (b < 0)
> +		return -1;
> +	raft_cfg_is_candidate(b);
> +	return 0;
> +}
> +
> +int
> +box_set_raft_election_timeout(void)
> +{
> +	double d = box_check_raft_election_timeout();
> +	if (d < 0)
> +		return -1;
> +	raft_cfg_election_timeout(d);
> +	return 0;
> +}
> +
>   /*
>    * Parse box.cfg.replication and create appliers.
>    */
> @@ -835,6 +905,7 @@ void
>   box_set_replication_timeout(void)
>   {
>   	replication_timeout = box_check_replication_timeout();
> +	raft_cfg_death_timeout();
>   }
>   
>   void
> @@ -865,6 +936,7 @@ box_set_replication_synchro_quorum(void)
>   		return -1;
>   	replication_synchro_quorum = value;
>   	txn_limbo_on_parameters_change(&txn_limbo);
> +	raft_cfg_election_quorum();
>   	return 0;
>   }
>   
> @@ -2680,6 +2752,25 @@ box_cfg_xc(void)
>   
>   	fiber_gc();
>   	is_box_configured = true;
> +	/*
> +	 * Fill in Raft parameters after bootstrap. Before it is not possible -
> +	 * there may be Raft data to recover from WAL and snapshot. Also until
> +	 * recovery is done, it is not possible to write new records into WAL.
> +	 * It is also totally safe, because relaying is not started until the
> +	 * box is configured. So it can't happen, that this Raft node will try
> +	 * to relay to another Raft node without Raft enabled leading to
> +	 * disconnect.
> +	 */
> +	if (box_set_raft_is_candidate() != 0)
> +		diag_raise();
> +	if (box_set_raft_election_timeout() != 0)
> +		diag_raise();
> +	/*
> +	 * Raft is enabled last. So as all the parameters are installed by that
> +	 * time.
> +	 */
> +	if (box_set_raft_is_enabled() != 0)
> +		diag_raise();
>   
>   	title("running");
>   	say_info("ready to accept requests");
> diff --git a/src/box/box.h b/src/box/box.h
> index 5988264a5..637d10dd3 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -245,6 +245,9 @@ void box_set_vinyl_memory(void);
>   void box_set_vinyl_max_tuple_size(void);
>   void box_set_vinyl_cache(void);
>   void box_set_vinyl_timeout(void);
> +int box_set_raft_is_enabled(void);
> +int box_set_raft_is_candidate(void);
> +int box_set_raft_election_timeout(void);
>   void box_set_replication_timeout(void);
>   void box_set_replication_connect_timeout(void);
>   void box_set_replication_connect_quorum(void);
> diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
> index d481155cd..339b85f9d 100644
> --- a/src/box/lua/cfg.cc
> +++ b/src/box/lua/cfg.cc
> @@ -269,6 +269,30 @@ lbox_cfg_set_worker_pool_threads(struct lua_State *L)
>   	return 0;
>   }
>   
> +static int
> +lbox_cfg_set_raft_is_enabled(struct lua_State *L)
> +{
> +	if (box_set_raft_is_enabled() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
> +static int
> +lbox_cfg_set_raft_is_candidate(struct lua_State *L)
> +{
> +	if (box_set_raft_is_candidate() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
> +static int
> +lbox_cfg_set_raft_election_timeout(struct lua_State *L)
> +{
> +	if (box_set_raft_election_timeout() != 0)
> +		luaT_error(L);
> +	return 0;
> +}
> +
>   static int
>   lbox_cfg_set_replication_timeout(struct lua_State *L)
>   {
> @@ -382,6 +406,9 @@ box_lua_cfg_init(struct lua_State *L)
>   		{"cfg_set_vinyl_max_tuple_size", lbox_cfg_set_vinyl_max_tuple_size},
>   		{"cfg_set_vinyl_cache", lbox_cfg_set_vinyl_cache},
>   		{"cfg_set_vinyl_timeout", lbox_cfg_set_vinyl_timeout},
> +		{"cfg_set_raft_is_enabled", lbox_cfg_set_raft_is_enabled},
> +		{"cfg_set_raft_is_candidate", lbox_cfg_set_raft_is_candidate},
> +		{"cfg_set_raft_election_timeout", lbox_cfg_set_raft_election_timeout},
>   		{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
>   		{"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum},
>   		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
> diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
> index 53f572895..2c98fd837 100644
> --- a/src/box/lua/load_cfg.lua
> +++ b/src/box/lua/load_cfg.lua
> @@ -86,6 +86,9 @@ local default_cfg = {
>       checkpoint_wal_threshold = 1e18,
>       checkpoint_count    = 2,
>       worker_pool_threads = 4,
> +    raft_is_enabled       = false,
> +    raft_is_candidate     = true,
> +    raft_election_timeout = 5,
>       replication_timeout = 1,
>       replication_sync_lag = 10,
>       replication_sync_timeout = 300,
> @@ -163,6 +166,9 @@ local template_cfg = {
>       read_only           = 'boolean',
>       hot_standby         = 'boolean',
>       worker_pool_threads = 'number',
> +    raft_is_enabled       = 'boolean',
> +    raft_is_candidate     = 'boolean',
> +    raft_election_timeout = 'number',
>       replication_timeout = 'number',
>       replication_sync_lag = 'number',
>       replication_sync_timeout = 'number',
> @@ -279,6 +285,9 @@ local dynamic_cfg = {
>           require('title').update(box.cfg.custom_proc_title)
>       end,
>       force_recovery          = function() end,
> +    raft_is_enabled         = private.cfg_set_raft_is_enabled,
> +    raft_is_candidate       = private.cfg_set_raft_is_candidate,
> +    raft_election_timeout   = private.cfg_set_raft_election_timeout,
>       replication_timeout     = private.cfg_set_replication_timeout,
>       replication_connect_timeout = private.cfg_set_replication_connect_timeout,
>       replication_connect_quorum = private.cfg_set_replication_connect_quorum,
> @@ -333,6 +342,9 @@ local dynamic_cfg_order = {
>       -- the new one. This should be fixed when box.cfg is able to
>       -- apply some parameters together and atomically.
>       replication_anon        = 250,
> +    raft_is_enabled         = 300,
> +    raft_is_candidate       = 310,
> +    raft_election_timeout   = 320,
>   }
>   
>   local function sort_cfg_cb(l, r)
> @@ -350,6 +362,9 @@ local dynamic_cfg_skip_at_load = {
>       vinyl_cache             = true,
>       vinyl_timeout           = true,
>       too_long_threshold      = true,
> +    raft_is_enabled         = true,
> +    raft_is_candidate       = true,
> +    raft_election_timeout   = true,
>       replication             = true,
>       replication_timeout     = true,
>       replication_connect_timeout = true,
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 511fe42f5..ee54d02b7 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -37,6 +37,8 @@
>   
>   /** Raft state of this instance. */
>   struct raft raft = {
> +	.is_enabled = false,
> +	.is_candidate = false,
>   	.term = 1,
>   	.vote = 0,
>   };
> @@ -63,3 +65,31 @@ raft_serialize_for_disk(struct raft_request *req)
>   	req->term = raft.term;
>   	req->vote = raft.vote;
>   }
> +
> +void
> +raft_cfg_is_enabled(bool is_enabled)
> +{
> +	raft.is_enabled = is_enabled;
> +}
> +
> +void
> +raft_cfg_is_candidate(bool is_candidate)
> +{
> +	raft.is_candidate = is_candidate;
> +}
> +
> +void
> +raft_cfg_election_timeout(double timeout)
> +{
> +	raft.election_timeout = timeout;
> +}
> +
> +void
> +raft_cfg_election_quorum(void)
> +{
> +}
> +
> +void
> +raft_cfg_death_timeout(void)
> +{
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> index 31f7becdb..f27222752 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -30,6 +30,7 @@
>    * SUCH DAMAGE.
>    */
>   #include <stdint.h>
> +#include <stdbool.h>
>   
>   #if defined(__cplusplus)
>   extern "C" {
> @@ -38,8 +39,11 @@ extern "C" {
>   struct raft_request;
>   
>   struct raft {
> +	bool is_enabled;
> +	bool is_candidate;
>   	uint64_t term;
>   	uint32_t vote;
> +	double election_timeout;
>   };
>   
>   extern struct raft raft;
> @@ -48,6 +52,37 @@ extern struct raft raft;
>   void
>   raft_process_recovery(const struct raft_request *req);
>   
> +/** Configure whether Raft is enabled. */
> +void
> +raft_cfg_is_enabled(bool is_enabled);
> +
> +/**
> + * Configure whether the instance can be elected as Raft leader. Even if false,
> + * the node still can vote, when Raft is enabled.
> + */
> +void
> +raft_cfg_is_candidate(bool is_candidate);
> +
> +/** Configure Raft leader election timeout. */
> +void
> +raft_cfg_election_timeout(double timeout);
> +
> +/**
> + * Configure Raft leader election quorum. There is no a separate option.
> + * Instead, synchronous replication quorum is used. Since Raft is tightly bound
> + * with synchronous replication.
> + */
> +void
> +raft_cfg_election_quorum(void);
> +
> +/**
> + * Configure Raft leader death timeout. I.e. number of seconds without
> + * heartbeats from the leader to consider it dead. There is no a separate
> + * option. Raft uses replication timeout for that.
> + */
> +void
> +raft_cfg_death_timeout(void);
> +
>   /**
>    * Save complete Raft state into a request to be sent to other instances of the
>    * cluster. It is allowed to save anything here, not only persistent state.
> diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
> index 857f0c95f..1d191987a 100644
> --- a/test/app-tap/init_script.result
> +++ b/test/app-tap/init_script.result
> @@ -23,6 +23,9 @@ memtx_memory:107374182
>   memtx_min_tuple_size:16
>   net_msg_max:768
>   pid_file:box.pid
> +raft_election_timeout:5
> +raft_is_candidate:true
> +raft_is_enabled:false
>   read_only:false
>   readahead:16320
>   replication_anon:false
> diff --git a/test/box/admin.result b/test/box/admin.result
> index ab3e80a97..13536a318 100644
> --- a/test/box/admin.result
> +++ b/test/box/admin.result
> @@ -67,6 +67,12 @@ cfg_filter(box.cfg)
>       - 768
>     - - pid_file
>       - <hidden>
> +  - - raft_election_timeout
> +    - 5
> +  - - raft_is_candidate
> +    - true
> +  - - raft_is_enabled
> +    - false
>     - - read_only
>       - false
>     - - readahead
> diff --git a/test/box/cfg.result b/test/box/cfg.result
> index bdd210b09..11358b2cd 100644
> --- a/test/box/cfg.result
> +++ b/test/box/cfg.result
> @@ -55,6 +55,12 @@ cfg_filter(box.cfg)
>    |     - 768
>    |   - - pid_file
>    |     - <hidden>
> + |   - - raft_election_timeout
> + |     - 5
> + |   - - raft_is_candidate
> + |     - true
> + |   - - raft_is_enabled
> + |     - false
>    |   - - read_only
>    |     - false
>    |   - - readahead
> @@ -162,6 +168,12 @@ cfg_filter(box.cfg)
>    |     - 768
>    |   - - pid_file
>    |     - <hidden>
> + |   - - raft_election_timeout
> + |     - 5
> + |   - - raft_is_candidate
> + |     - true
> + |   - - raft_is_enabled
> + |     - false
>    |   - - read_only
>    |     - false
>    |   - - readahead

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg()
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
@ 2020-09-04  9:22   ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04  9:22 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> Instance ID of the sender is needed in order to
>
> - be able to vote for him;
> - be able to remember its ID as leader ID, when it is a leader.
> ---
>   src/box/applier.cc | 9 ++++-----
>   src/box/raft.c     | 3 ++-
>   src/box/raft.h     | 8 ++++++--
>   3 files changed, 12 insertions(+), 8 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 53db97d6d..5459a1dc1 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -898,7 +898,7 @@ err:


LGTM.

>   }
>   
>   static int
> -apply_raft_row(struct xrow_header *row)
> +applier_handle_raft(struct applier *applier, struct xrow_header *row)
>   {
>   	assert(iproto_type_is_raft_request(row->type));
>   
> @@ -906,9 +906,7 @@ apply_raft_row(struct xrow_header *row)
>   	struct vclock candidate_clock;
>   	if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
>   		return -1;
> -
> -	raft_process_msg(&req);
> -
> +	raft_process_msg(&req, applier->instance_id);
>   	return 0;
>   }
>   
> @@ -1262,7 +1260,8 @@ applier_subscribe(struct applier *applier)
>   		if (first_row->lsn == 0) {
>   			if (unlikely(iproto_type_is_raft_request(
>   							first_row->type))) {
> -				if (apply_raft_row(first_row) != 0)
> +				if (applier_handle_raft(applier,
> +							first_row) != 0)
>   					diag_raise();
>   			}
>   			applier_signal_ack(applier);
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 7697809ee..4d3d07c48 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -78,8 +78,9 @@ raft_process_recovery(const struct raft_request *req)
>   }
>   
>   void
> -raft_process_msg(const struct raft_request *req)
> +raft_process_msg(const struct raft_request *req, uint32_t source)
>   {
> +	(void)source;
>   	if (req->term > raft.term) {
>   		// Update term.
>   		// The logic will be similar, but the code
> diff --git a/src/box/raft.h b/src/box/raft.h
> index e3261454b..db64cf933 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -70,9 +70,13 @@ raft_is_enabled(void)
>   void
>   raft_process_recovery(const struct raft_request *req);
>   
> -/** Process a raft status message coming from the network. */
> +/**
> + * Process a raft status message coming from the network.
> + * @param req Raft request.
> + * @param source Instance ID of the message sender.
> + */
>   void
> -raft_process_msg(const struct raft_request *req);
> +raft_process_msg(const struct raft_request *req, uint32_t source);
>   
>   /** Configure whether Raft is enabled. */
>   void

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine Vladislav Shpilevoy
@ 2020-09-04 11:36   ` Serge Petrenko
  2020-09-07 22:57     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04 11:36 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov

[-- Attachment #1: Type: text/plain, Size: 37822 bytes --]


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> The commit is a core part of Raft implementation. It introduces
> the Raft state machine implementation and its integration into the
> instance's life cycle.
>
> The implementation follows the protocol to the letter except a few
> important details.
>
> Firstly, the original Raft assumes, that all nodes share the same
> log record numbers. In Tarantool they are called LSNs. But in case
> of Tarantool each node has its own LSN in its own component of
> vclock. That makes the election messages a bit heavier, because
> the nodes need to send and compare complete vclocks of each other
> instead of a single number like in the original Raft. But logic
> becomes simpler. Because in the original Raft there is a problem
> of uncertainty about what to do with records of an old leader
> right after a new leader is elected. They could be rolled back or
> confirmed depending on circumstances. The issue disappears when
> vclock is used.
>
> Secondly, leader election works differently during cluster
> bootstrap, until number of bootstrapped replicas becomes >=
> election quorum. That arises from specifics of replicas bootstrap
> and order of systems initialization. In short: during bootstrap a
> leader election may use a smaller election quorum than the
> configured one. See more details in the code.
>
> Part of #1146
> ---

Hi! Thanks for the patch!

I haven't looked it through thoroughly yet, but some pieces have caught
my eye, so please find my comments below. I'll take some time to look
the patch through once again.


>   src/box/applier.cc |  18 +-
>   src/box/box.cc     |   7 +-
>   src/box/raft.c     | 711 +++++++++++++++++++++++++++++++++++++++++++--
>   src/box/raft.h     | 118 ++++++++
>   src/box/relay.cc   |  19 ++
>   5 files changed, 847 insertions(+), 26 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 5459a1dc1..c7c486ee4 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>    * Return 0 for success or -1 in case of an error.
>    */
>   static int
> -applier_apply_tx(struct stailq *rows)
> +applier_apply_tx(struct applier *applier, struct stailq *rows)
>   {
> +	/*
> +	 * Rows received not directly from a leader are ignored. That is a
> +	 * protection against the case when an old leader keeps sending data
> +	 * around not knowing yet that it is not a leader anymore.
> +	 *
> +	 * XXX: it may be that this can be fine to apply leader transactions by
> +	 * looking at their replica_id field if it is equal to leader id. That
> +	 * can be investigated as an 'optimization'. Even though may not give
> +	 * anything, because won't change total number of rows sent in the
> +	 * network anyway.
> +	 */
> +	if (!raft_is_source_allowed(applier->instance_id))
> +		return 0;
Have we abandoned Rule 66?
>   	struct xrow_header *first_row = &stailq_first_entry(rows,
>   					struct applier_tx_row, next)->row;
>   	struct xrow_header *last_row;
> @@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier)
>   		struct xrow_header *first_row =
>   			&stailq_first_entry(&rows, struct applier_tx_row,
>   					    next)->row;
> +		raft_process_heartbeat(applier->instance_id);
>   		if (first_row->lsn == 0) {
>   			if (unlikely(iproto_type_is_raft_request(
>   							first_row->type))) {
> @@ -1265,7 +1279,7 @@ applier_subscribe(struct applier *applier)
>   					diag_raise();
>   			}
>   			applier_signal_ack(applier);
> -		} else if (applier_apply_tx(&rows) != 0) {
> +		} else if (applier_apply_tx(applier, &rows) != 0) {
>   			diag_raise();
>   		}
>   
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6f85e734a..9d0782fff 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -157,7 +157,7 @@ void
>   box_update_ro_summary(void)
>   {
>   	bool old_is_ro_summary = is_ro_summary;
> -	is_ro_summary = is_ro || is_orphan;
> +	is_ro_summary = is_ro || is_orphan || raft_is_ro();
>   	/* In 99% nothing changes. Filter this out first. */
>   	if (is_ro_summary == old_is_ro_summary)
>   		return;
> @@ -171,6 +171,10 @@ static int
>   box_check_writable(void)
>   {
>   	if (is_ro_summary) {
> +		/*
> +		 * XXX: return a special error when the node is not a leader to
> +		 * reroute to the leader node.
> +		 */
>   		diag_set(ClientError, ER_READONLY);
>   		diag_log();
>   		return -1;
> @@ -2646,6 +2650,7 @@ box_init(void)
>   
>   	txn_limbo_init();
>   	sequence_init();
> +	raft_init();
>   }
>   
>   bool
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 4d3d07c48..1c4275cd5 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -36,6 +36,12 @@
>   #include "small/region.h"
>   #include "replication.h"
>   #include "relay.h"
> +#include "box.h"
> +
> +/**
> + * Maximal random deviation of the election timeout. From the configured value.
> + */
> +#define RAFT_RANDOM_ELECTION_FACTOR 0.1
>   
>   const char *raft_state_strs[] = {
>   	NULL,
> @@ -48,19 +54,220 @@ const char *raft_state_strs[] = {
>   struct raft raft = {
>   	.leader = 0,
>   	.state = RAFT_STATE_FOLLOWER,
> +	.volatile_term = 1,
> +	.volatile_vote = 0,
>   	.is_enabled = false,
>   	.is_candidate = false,
> +	.is_cfg_candidate = false,
> +	.is_write_in_progress = false,
>   	.term = 1,
>   	.vote = 0,
> +	.vote_mask = 0,
> +	.vote_count = 0,
> +	.election_timeout = 5,
>   };
>   
> +/**
> + * Check if Raft is completely synced with disk. Meaning all its critical values
> + * are in WAL. Only in that state the node can become a leader or a candidate.
> + * If the node has a not flushed data, it means either the term was bumped, or
> + * a new vote was made.
> + *
> + * In case of term bump it means either there is another node with a newer term,
> + * and this one should be a follower; or this node bumped the term itself along
> + * with making a vote to start a new election - then it is also a follower which
> + * will turn into a candidate when the flush is done.
> + *
> + * In case of a new not flushed vote it means either this node voted for some
> + * other node, and must be a follower; or it voted for self, and also must be a
> + * follower, but will become a candidate when the flush is done.
> + *
> + * In total - when something is not synced with disk, the instance is a follower
> + * in any case.
> + */
> +static bool
> +raft_is_fully_on_disk(void)
> +{
> +	return raft.volatile_term == raft.term &&
> +	       raft.volatile_vote == raft.vote;
> +}
> +
> +/**
> + * Raft protocol says that election timeout should be a bit randomized so as
> + * the nodes wouldn't start election at the same time and end up with not having
> + * a quorum for anybody. This implementation randomizes the election timeout by
> + * adding {election timeout * random factor} value, where max value of the
> + * factor is a constant floating point value > 0.
> + */
> +static inline double
> +raft_new_random_election_shift(void)
> +{
> +	double timeout = raft.election_timeout;
> +	/* Translate to ms. Integer is needed to be able to use mod below. */
> +	uint32_t rand_part =
> +		(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
> +	if (rand_part == 0)
> +		rand_part = 1;
> +	/*
> +	 * XXX: this is not giving a good distribution, but it is not so trivial
> +	 * to implement a correct random value generator. There is a task to
> +	 * unify all such places. Not critical here.
> +	 */
> +	rand_part = rand() % (rand_part + 1);
> +	return rand_part / 1000.0;
> +}
> +
> +/**
> + * Raft says that during election a node1 can vote for node2, if node2 has a
> + * bigger term, or has the same term but longer log. In case of Tarantool it
> + * means the node2 vclock should be >= node1 vclock, in all components. It is
> + * not enough to compare only one component. At least because there may be not
> + * a previous leader when the election happens first time. Or a node could
> + * restart and forget who the previous leader was.
> + */
> +static inline bool
> +raft_can_vote_for(const struct vclock *v)
> +{
> +	if (v == NULL)
> +		return false;
> +	int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
> +	return cmp == 0 || cmp == 1;
> +}
> +
> +/**
> + * Election quorum is not strictly equal to synchronous replication quorum.
> + * Sometimes it can be lowered. That is about bootstrap.
> + *
> + * The problem with bootstrap is that when the replicaset boots, all the
> + * instances can't write to WAL and can't recover from their initial snapshot.
> + * They need one node which will boot first, and then they will replicate from
> + * it.
> + *
> + * This one node should boot from its zero snapshot, create replicaset UUID,
> + * register self with ID 1 in _cluster space, and then register all the other
> + * instances here. To do that the node must be writable. It should have
> + * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
> + * is enabled.
> + *
> + * To be elected a Raft leader it needs to perform election. But it can't be
> + * done before at least synchronous quorum of the replicas is bootstrapped. And
> + * they can't be bootstrapped because wait for a leader to initialize _cluster.
> + * Cyclic dependency.
> + *
> + * This is resolved by truncation of the election quorum to the number of
> + * registered replicas, if their count is less than synchronous quorum. That
> + * helps to elect a first leader.
> + *
> + * It may seem that the first node could just declare itself a leader and then
> + * strictly follow the protocol from now on, but that won't work, because if the
> + * first node will restart after it is booted, but before quorum of replicas is
> + * booted, the cluster will stuck again.
> + *
> + * The current solution is totally safe because
> + *
> + * - after all the cluster will have node count >= quorum, if user used a
> + *   correct config (God help him if he didn't);
> + *
> + * - synchronous replication quorum is untouched - it is not truncated. Only
> + *   leader election quorum is affected. So synchronous data won't be lost.
> + */
> +static inline int
> +raft_election_quorum(void)
> +{
> +	return MIN(replication_synchro_quorum, replicaset.size);
> +}
> +
> +/** Broadcast an event about this node changed its state to all relays. */
> +static inline void
> +raft_broadcast_new_state(void)
> +{
> +	struct raft_request req;
> +	memset(&req, 0, sizeof(req));
> +	req.term = raft.term;
> +	req.state = raft.state;
> +	raft_broadcast(&req);
> +}
> +
> +/** Raft state machine methods. 'sm' stands for State Machine. */
> +
> +/**
> + * Start the state machine. When it is stopped, Raft state is updated and
> + * goes to WAL when necessary, but it does not affect the instance operation.
> + * For example, when Raft is stopped, the instance role does not affect whether
> + * it is writable.
> + */
> +static void
> +raft_sm_start(void);
> +
> +/**
> + * Stop the state machine. Now until Raft is re-enabled,
> + * - Raft stops affecting the instance operation;
> + * - this node can't become a leader;
> + * - this node can't vote.
> + */
> +static void
> +raft_sm_stop(void);
> +
> +/**
> + * When the instance is a follower but is allowed to be a leader, it will wait
> + * for death of the current leader to start new election.
> + */
> +static void
> +raft_sm_wait_leader_dead(void);
> +
> +/**
> + * If election is started by this node, or it voted for some other node started
> + * the election, and it can be a leader itself, it will wait until the current
> + * election times out. When it happens, the node will start new election.
> + */
> +static void
> +raft_sm_wait_election_end(void);
> +
> +/** Bump volatile term and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term);
> +
> +/** Bump volatile vote and schedule its flush to disk. */
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote);
> +
> +/**
> + * Bump term and vote for self immediately. After that is persisted, the
> + * election timeout will be activated. Unless during that nothing newer happens.
> + */
> +static void
> +raft_sm_schedule_new_election(void);
> +
> +/**
> + * The main trigger of Raft state machine - start new election when the current
> + * leader dies, or when there is no a leader and the previous election failed.
> + */
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> +				 int events);
> +
> +/** Start Raft state flush to disk. */
> +static void
> +raft_sm_pause_and_dump(void);
> +
> +/**
> + * Flush Raft state changes to WAL. The callback resets itself, if during the
> + * write more changes appear.
> + */
> +static void
> +raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events);
> +
>   void
>   raft_process_recovery(const struct raft_request *req)
>   {
> -	if (req->term != 0)
> +	if (req->term != 0) {
>   		raft.term = req->term;
> -	if (req->vote != 0)
> +		raft.volatile_term = req->term;
> +	}
> +	if (req->vote != 0) {
>   		raft.vote = req->vote;
> +		raft.volatile_vote = req->vote;
> +	}
>   	/*
>   	 * Role is never persisted. If recovery is happening, the
>   	 * node was restarted, and the former role can be false
> @@ -80,34 +287,428 @@ raft_process_recovery(const struct raft_request *req)
>   void
>   raft_process_msg(const struct raft_request *req, uint32_t source)
>   {
> -	(void)source;
> -	if (req->term > raft.term) {
> -		// Update term.
> -		// The logic will be similar, but the code
> -		// below is for testing purposes.
> -		raft.term = req->term;
> +	assert(source > 0);
> +	assert(source != instance_id);
> +	/* Outdated request. */
> +	if (req->term < raft.volatile_term)
> +		return;
> +
> +	enum raft_state old_state = raft.state;
> +
> +	/* Term bump. */
> +	if (req->term > raft.volatile_term)
> +		raft_sm_schedule_new_term(req->term);
> +
> +	/* Vote request during the on-going election. */
> +	if (req->vote != 0) {
> +		switch (raft.state) {
> +		case RAFT_STATE_FOLLOWER:
> +		case RAFT_STATE_LEADER:
> +			/*
> +			 * Can't respond on vote requests when Raft is disabled.
> +			 */
> +			if (!raft.is_enabled)
> +				break;
> +			/* Check if already voted in this term. */
> +			if (raft.volatile_vote != 0)
> +				break;
> +			/* Not a candidate. Can't accept votes. */
> +			if (req->vote == instance_id)
> +				break;
> +			/* Can't vote for too old or incomparable nodes. */
> +			if (!raft_can_vote_for(req->vclock))
> +				break;
> +			/*
> +			 * Check if somebody is asking to vote for a third
> +			 * node - nope. Make votes only when asked directly by
> +			 * the new candidate. However that restriction may be
> +			 * relaxed in future, if can be proven to be safe.
> +			 */
> +			if (req->vote != source)
> +				break;
> +			/*
> +			 * Either the term is new, or didn't vote in the current
> +			 * term yet. Anyway can vote now.
> +			 */
> +			raft.state = RAFT_STATE_FOLLOWER;
> +			raft_sm_schedule_new_vote(req->vote);
> +			break;
> +		case RAFT_STATE_CANDIDATE:
> +			/* Check if this is a vote for a competing candidate. */
> +			if (req->vote != instance_id)
> +				break;
> +			/*
> +			 * Vote for self was requested earlier in this round,
> +			 * and now was answered by some other instance.
> +			 */
> +			assert(raft.volatile_vote == instance_id);
> +			bool was_set = bit_set(&raft.vote_mask, source);
> +			raft.vote_count += !was_set;
> +			if (raft.vote_count < raft_election_quorum())
> +				break;
> +			raft.state = RAFT_STATE_LEADER;
> +			raft.leader = instance_id;
> +			break;
> +		default:
> +			unreachable();
> +		}
> +	}
> +	/*
> +	 * If the node does not claim to be a leader, nothing interesting. Terms
> +	 * and votes are already handled.
> +	 */
> +	if (req->state != RAFT_STATE_LEADER)
> +		goto end;
> +	/* The node is a leader, but it is already known. */
> +	if (source == raft.leader)
> +		goto end;
> +	/*
> +	 * XXX: A message from a conflicting leader. Split brain, basically.
> +	 * Need to decide what to do. Current solution is to do nothing. In
> +	 * future either this node should try to become a leader, or should stop
> +	 * all writes and require manual intervention.
> +	 */
> +	if (raft.leader != 0)
> +		goto end;
> +
> +	/* New leader was elected. */
> +	raft.state = RAFT_STATE_FOLLOWER;
> +	raft.leader = source;
> +end:
> +	if (raft.state != old_state) {
> +		/*
> +		 * If the node stopped being a leader - should become read-only.
> +		 * If became a leader - should become read-write (if other
> +		 * subsystems also allow read-write).
> +		 */
> +		box_update_ro_summary();
> +		/*
> +		 * New term and vote are not broadcasted yet. Firstly their WAL
> +		 * write should be finished. But the state is volatile. It is ok
> +		 * to broadcast it now.
> +		 */
> +		raft_broadcast_new_state();
> +	}
> +}
> +
> +void
> +raft_process_heartbeat(uint32_t source)
> +{
> +	/*
> +	 * When not a candidate - don't wait for anything. Therefore do not care
> +	 * about the leader being dead.
> +	 */
> +	if (!raft.is_candidate)
> +		return;
> +	/* Don't care about heartbeats when this node is a leader itself. */
> +	if (raft.state == RAFT_STATE_LEADER)
> +		return;
> +	/* Not interested in heartbeats from not a leader. */
> +	if (raft.leader != source)
> +		return;
> +	/*
> +	 * XXX: it may be expensive to reset the timer like that. It may be less
> +	 * expensive to let the timer work, and remember last timestamp when
> +	 * anything was heard from the leader. Then in the timer callback check
> +	 * the timestamp, and restart the timer, if it is fine.
> +	 */
> +	assert(ev_is_active(&raft.timer));
> +	ev_timer_stop(loop(), &raft.timer);
> +	raft_sm_wait_leader_dead();
> +}
> +
> +/** Wakeup Raft state writer fiber waiting for WAL write end. */
> +static void
> +raft_write_cb(struct journal_entry *entry)
> +{
> +	fiber_wakeup(entry->complete_data);
> +}
> +
> +/** Synchronously write a Raft request into WAL. */
> +static void
> +raft_write_request(const struct raft_request *req)
> +{
> +	assert(raft.is_write_in_progress);
> +	/*
> +	 * Vclock is never persisted by Raft. It is used only to
> +	 * be sent to network when vote for self.
> +	 */
> +	assert(req->vclock == NULL);
> +	/*
> +	 * State is not persisted. That would be strictly against Raft protocol.
> +	 * The reason is that it does not make much sense - even if the node is
> +	 * a leader now, after the node is restarted, there will be another
> +	 * leader elected by that time likely.
> +	 */
> +	assert(req->state == 0);
> +	struct region *region = &fiber()->gc;
> +	uint32_t svp = region_used(region);
> +	struct xrow_header row;
> +	char buf[sizeof(struct journal_entry) +
> +		 sizeof(struct xrow_header *)];
> +	struct journal_entry *entry = (struct journal_entry *)buf;
> +	entry->rows[0] = &row;
> +
> +	if (xrow_encode_raft(&row, region, req) != 0)
> +		goto fail;
> +	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
> +			     fiber());
> +
> +	if (journal_write(entry) != 0 || entry->res < 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		goto fail;
>   	}
> -	if (req->vote > 0) {
> -		// Check whether the vote's for us.
> +
> +	raft_broadcast(req);
> +
> +	region_truncate(region, svp);
> +	return;
> +fail:
> +	/*
> +	 * XXX: the stub is supposed to be removed once it is defined what to do
> +	 * when a raft request WAL write fails.
> +	 */
> +	panic("Could not write a raft request to WAL\n");
> +}
> +
> +static void
> +raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events)
> +{
> +	assert(watcher == &raft.io);
> +	(void) events;
> +	assert(raft.is_write_in_progress);
> +	/* During write Raft can't be anything but a follower. */
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	struct raft_request req;
> +	uint64_t old_term = raft.term;
> +	uint32_t old_vote = raft.vote;
> +	enum raft_state old_state = raft.state;
> +
> +	if (raft_is_fully_on_disk()) {
> +end_dump:
> +		raft.is_write_in_progress = false;
> +		ev_check_stop(loop, watcher);
> +		/*
> +		 * The state machine is stable. Can see now, to what state to
> +		 * go.
> +		 */
> +		if (!raft.is_candidate) {
> +			/*
> +			 * If not a candidate, can't do anything except vote for
> +			 * somebody (if Raft is enabled). Nothing to do except
> +			 * staying a follower without timeouts.
> +			 */
> +		} else if (raft.leader != 0) {
> +			/* There is a known leader. Wait until it is dead. */
> +			raft_sm_wait_leader_dead();
> +		} else if (raft.vote == instance_id) {
> +			/* Just wrote own vote. */
> +			if (raft_election_quorum() == 1) {
> +				raft.state = RAFT_STATE_LEADER;
> +				raft.leader = instance_id;
> +				/*
> +				 * Make read-write (if other subsystems allow
> +				 * that).
> +				 */
> +				box_update_ro_summary();
> +			} else {
> +				raft.state = RAFT_STATE_CANDIDATE;
> +				/* First vote for self. */
> +				raft.vote_count = 1;
> +				raft.vote_mask = 0;
> +				bit_set(&raft.vote_mask, instance_id);
> +				raft_sm_wait_election_end();
> +			}
> +		} else if (raft.vote != 0) {
> +			/*
> +			 * Voted for some other node. Wait if it manages to
> +			 * become a leader.
> +			 */
> +			raft_sm_wait_election_end();
> +		} else {
> +			/* No leaders, no votes. */
> +			raft_sm_schedule_new_election();
> +		}
> +	} else {
> +		memset(&req, 0, sizeof(req));
> +		assert(raft.volatile_term >= raft.term);
> +		/* Term is written always. */
> +		req.term = raft.volatile_term;
> +		if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
> +			req.vote = raft.volatile_vote;
> +
> +		raft_write_request(&req);
> +
> +		assert(req.term >= raft.term);
> +		if (req.term > raft.term) {
> +			raft.term = req.term;
> +			raft.vote = 0;
> +		}
> +		if (req.vote != 0) {
> +			assert(raft.vote == 0);
> +			raft.vote = req.vote;
> +		}
> +		if (raft_is_fully_on_disk())
> +			goto end_dump;
>   	}
> -	switch (req->state) {
> -	case RAFT_STATE_FOLLOWER:
> -	    break;
> -	case RAFT_STATE_CANDIDATE:
> -	    // Perform voting logic.
> -	    break;
> -	case RAFT_STATE_LEADER:
> -	    // Switch to a new leader.
> -	    break;
> -	default:
> -	    break;
> +
> +	memset(&req, 0, sizeof(req));
> +	/* Term is encoded always. */
> +	req.term = raft.term;
> +	bool has_changes = old_term != raft.term;
> +	if (raft.vote != 0 && old_vote != raft.vote) {
> +		req.vote = raft.vote;
> +		/*
> +		 * When vote for self, need to send current vclock too. Two
> +		 * reasons for that:
> +		 *
> +		 * - nodes need to vote for the instance containing the newest
> +		 *   data. So as not to loose it, because some of it may be
> +		 *   confirmed by the synchronous replication;
> +		 *
> +		 * - replication is basically stopped during election. Other
> +		 *   nodes can't learn vclock of this instance through regular
> +		 *   replication.
> +		 */
> +		if (raft.vote == instance_id)
> +			req.vclock = &replicaset.vclock;
> +		has_changes = true;
> +	}
> +	if (raft.state != old_state) {
> +		req.state = raft.state;
> +		has_changes = true;
>   	}
> +	if (has_changes)
> +		raft_broadcast(&req);
> +}
> +
> +static void
> +raft_sm_pause_and_dump(void)
> +{
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	if (raft.is_write_in_progress)
> +		return;
> +	ev_timer_stop(loop(), &raft.timer);
> +	ev_check_start(loop(), &raft.io);
> +	raft.is_write_in_progress = true;
> +}
> +
> +static void
> +raft_sm_schedule_new_term(uint64_t new_term)
> +{
> +	assert(new_term > raft.volatile_term);
> +	assert(raft.volatile_term >= raft.term);
> +	raft.volatile_term = new_term;
> +	/* New terms means completely new Raft state. */
> +	raft.volatile_vote = 0;
> +	raft.leader = 0;
> +	raft.state = RAFT_STATE_FOLLOWER;
> +	raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_vote(uint32_t new_vote)
> +{
> +	assert(raft.volatile_vote == 0);
> +	raft.volatile_vote = new_vote;
> +	raft_sm_pause_and_dump();
> +}
> +
> +static void
> +raft_sm_schedule_new_election(void)
> +{
> +	assert(raft_is_fully_on_disk());
> +	assert(raft.is_candidate);
> +	assert(raft.leader == 0);
> +	/* Everyone is a follower until its vote for self is persisted. */
> +	raft_sm_schedule_new_term(raft.term + 1);
> +	raft_sm_schedule_new_vote(instance_id);
> +	box_update_ro_summary();
> +}
> +
> +static void
> +raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
> +				 int events)
> +{
> +	assert(timer == &raft.timer);
> +	(void)events;
> +	ev_timer_stop(loop, timer);
> +	raft_sm_schedule_new_election();
> +}
> +
> +static void
> +raft_sm_wait_leader_dead(void)
> +{
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!ev_is_active(&raft.io));
> +	assert(!raft.is_write_in_progress);
> +	assert(raft.is_candidate);
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	assert(raft.leader != 0);
> +	double death_timeout = replication_disconnect_timeout();
> +	ev_timer_set(&raft.timer, death_timeout, death_timeout);
> +}
> +
> +static void
> +raft_sm_wait_election_end(void)
> +{
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!ev_is_active(&raft.io));
> +	assert(!raft.is_write_in_progress);
> +	assert(raft.is_candidate);
> +	assert(raft.state == RAFT_STATE_FOLLOWER ||
> +	       (raft.state == RAFT_STATE_CANDIDATE &&
> +		raft.volatile_vote == instance_id));
> +	assert(raft.leader == 0);
> +	double election_timeout = raft.election_timeout +
> +				  raft_new_random_election_shift();
> +	ev_timer_set(&raft.timer, election_timeout, election_timeout);
> +}
> +
> +static void
> +raft_sm_start(void)
> +{
> +	assert(!ev_is_active(&raft.timer));
> +	assert(!ev_is_active(&raft.io));
> +	assert(!raft.is_write_in_progress);
> +	assert(!raft.is_enabled);
> +	assert(raft.state == RAFT_STATE_FOLLOWER);
> +	raft.is_enabled = true;
> +	raft.is_candidate = raft.is_cfg_candidate;
> +	if (!raft.is_candidate)
> +		/* Nop. */;
> +	else if (raft.leader != 0)
> +		raft_sm_wait_leader_dead();
> +	else
> +		raft_sm_schedule_new_election();
> +	box_update_ro_summary();
> +	/*
> +	 * When Raft is enabled, send the complete state. Because
> +	 * it wasn't sent in disabled state.
> +	 */
> +	struct raft_request req;
> +	raft_serialize_for_network(&req, NULL);
> +	raft_broadcast(&req);
> +}
> +
> +static void
> +raft_sm_stop(void)
> +{
> +	assert(raft.is_enabled);
> +	raft.is_enabled = false;
> +	raft.is_candidate = false;
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
>   {
>   	memset(req, 0, sizeof(*req));
> +	/*
> +	 * Volatile state is never used for any communications.
> +	 * Use only persisted state.
> +	 */
>   	req->term = raft.term;
>   	req->vote = raft.vote;
>   	req->state = raft.state;
> @@ -128,29 +729,86 @@ raft_serialize_for_disk(struct raft_request *req)
>   void
>   raft_cfg_is_enabled(bool is_enabled)
>   {
> -	raft.is_enabled = is_enabled;
> +	if (is_enabled == raft.is_enabled)
> +		return;
> +
> +	if (!is_enabled)
> +		raft_sm_stop();
> +	else
> +		raft_sm_start();
>   }
>   
>   void
>   raft_cfg_is_candidate(bool is_candidate)
>   {
> -	raft.is_candidate = is_candidate;
> +	bool old_is_candidate = raft.is_candidate;
> +	raft.is_cfg_candidate = is_candidate;
> +	raft.is_candidate = is_candidate && raft.is_enabled;
> +	if (raft.is_candidate == old_is_candidate)
> +		return;
> +
> +	if (raft.is_candidate) {
> +		assert(raft.state == RAFT_STATE_FOLLOWER);
> +		/*
> +		 * If there is an on-going WAL write, it means there was some
> +		 * node who sent newer data to this node.
> +		 */
> +		if (raft.leader == 0 && raft_is_fully_on_disk())
> +			raft_sm_schedule_new_election();
> +	} else if (raft.state != RAFT_STATE_FOLLOWER) {
> +		raft.state = RAFT_STATE_FOLLOWER;
> +		raft_broadcast_new_state();
> +	}
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_cfg_election_timeout(double timeout)
>   {
> +	if (timeout == raft.election_timeout)
> +		return;
> +
>   	raft.election_timeout = timeout;
> +	if (raft.vote != 0 && raft.leader == 0) {
> +		assert(ev_is_active(&raft.timer));
> +		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> +				 raft.timer.at + raft.election_timeout;
> +		ev_timer_stop(loop(), &raft.timer);
> +		ev_timer_set(&raft.timer, timeout, timeout);
> +	}
>   }
>   
>   void
>   raft_cfg_election_quorum(void)
>   {
> +	if (raft.state != RAFT_STATE_CANDIDATE ||
> +	    raft.state == RAFT_STATE_LEADER)
> +		return;
> +	if (raft.vote_count < raft_election_quorum())
> +		return;
> +	/*
> +	 * The node is a candidate. It means its state if fully synced with
> +	 * disk. Otherwise it would be a follower.
> +	 */
> +	assert(!raft.is_write_in_progress);
> +	raft.state = RAFT_STATE_LEADER;
> +	raft.leader = instance_id;
> +	raft_broadcast_new_state();
> +	box_update_ro_summary();
>   }
>   
>   void
>   raft_cfg_death_timeout(void)
>   {
> +	if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
> +	    raft.leader != 0) {
> +		assert(ev_is_active(&raft.timer));
> +		double death_timeout = replication_disconnect_timeout();
> +		double timeout = ev_timer_remaining(loop(), &raft.timer) -
> +				 raft.timer.at + death_timeout;
> +		ev_timer_stop(loop(), &raft.timer);
> +		ev_timer_set(&raft.timer, timeout, timeout);
> +	}
>   }
>   
>   void
> @@ -163,3 +821,10 @@ raft_broadcast(const struct raft_request *req)
>   		}
>   	}
>   }
> +
> +void
> +raft_init(void)
> +{
> +	ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
> +	ev_check_init(&raft.io, raft_sm_dump_step);
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> index db64cf933..111a9c16e 100644
> --- a/src/box/raft.h
> +++ b/src/box/raft.h
> @@ -31,34 +31,141 @@
>    */
>   #include <stdint.h>
>   #include <stdbool.h>
> +#include "tarantool_ev.h"
>   
>   #if defined(__cplusplus)
>   extern "C" {
>   #endif
>   
> +/**
> + * This is an implementation of Raft leader election protocol, separated from
> + * synchronous replication part.
> + *
> + * The protocol describes an algorithm which helps to elect a single leader in
> + * the cluster, which is supposed to handle write requests. And re-elect a new
> + * leader, when the current leader dies.
> + *
> + * The implementation follows the protocol to the letter except a few important
> + * details.
> + *
> + * Firstly, the original Raft assumes, that all nodes share the same log record
> + * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
> + * node has its own LSN in its own component of vclock. That makes the election
> + * messages a bit heavier, because the nodes need to send and compare complete
> + * vclocks of each other instead of a single number like in the original Raft.
> + * But logic becomes simpler. Because in the original Raft there is a problem of
> + * uncertainty about what to do with records of an old leader right after a new
> + * leader is elected. They could be rolled back or confirmed depending on
> + * circumstances. The issue disappears when vclock is used.
> + *
> + * Secondly, leader election works differently during cluster bootstrap, until
> + * number of bootstrapped replicas becomes >= election quorum. That arises from
> + * specifics of replicas bootstrap and order of systems initialization. In
> + * short: during bootstrap a leader election may use a smaller election quorum
> + * than the configured one. See more details in the code.
> + */
> +
>   struct raft_request;
>   struct vclock;
>   
>   enum raft_state {
> +	/**
> +	 * Can't write. Can only accept data from a leader. Node in this state
> +	 * either monitors an existing leader, or there is an on-going election
> +	 * and the node voted for another node, or it can't be a candidate and
> +	 * does not do anything.
> +	 */
>   	RAFT_STATE_FOLLOWER = 1,
> +	/**
> +	 * The node can't write. There is an active election, in which the node
> +	 * voted for self. Now it waits for election outcome.
> +	 */
>   	RAFT_STATE_CANDIDATE = 2,
> +	/** Election was successful. The node accepts write requests. */
>   	RAFT_STATE_LEADER = 3,
>   };
>   
>   extern const char *raft_state_strs[];
>   
>   struct raft {
> +	/** Instance ID of leader of the current term. */
>   	uint32_t leader;
> +	/** State of the instance. */
>   	enum raft_state state;
> +	/**
> +	 * Volatile part of the Raft state, whose WAL write may be still
> +	 * in-progress, and yet the state may be already used. Volatile state is
> +	 * never sent to anywhere, but the state machine makes decisions based
> +	 * on it. That is vital.
> +	 * As an example, volatile vote needs to be used to reject votes inside
> +	 * a term, where the instance already voted (even if the vote WAL write
> +	 * is not finished yet). Otherwise the instance would try to write
> +	 * several votes inside one term.
> +	 */
> +	uint64_t volatile_term;
> +	uint32_t volatile_vote;
> +	/**
> +	 * Flag whether Raft is enabled. When disabled, it still persists terms
> +	 * so as to quickly enroll into the cluster when (if) it is enabled. In
> +	 * everything else disabled Raft does not affect instance work.
> +	 */
>   	bool is_enabled;
> +	/**
> +	 * Flag whether the node can become a leader. It is an accumulated value
> +	 * of configuration options Raft enabled and Raft candidate. If at least
> +	 * one is false - the instance is not a candidate.
> +	 */
>   	bool is_candidate;
> +	/** Flag whether the instance is allowed to be a leader. */
> +	bool is_cfg_candidate;
> +	/**
> +	 * Flag whether Raft currently tries to write something into WAL. It
> +	 * happens asynchronously, not right after Raft state is updated.
> +	 */
> +	bool is_write_in_progress;
> +	/**
> +	 * Persisted Raft state. These values are used when need to tell current
> +	 * Raft state to other nodes.
> +	 */
>   	uint64_t term;
>   	uint32_t vote;
> +	/**
> +	 * Bit 1 on position N means that a vote from instance with ID = N was
> +	 * obtained.
> +	 */
> +	uint32_t vote_mask;
> +	/** Number of votes for this instance. Valid only in candidate state. */
> +	int vote_count;
> +	/** State machine timed event trigger. */
> +	struct ev_timer timer;
> +	/**
> +	 * Dump of Raft state in the end of event loop, when it is changed.
> +	 */
> +	struct ev_check io;

You probably need the ev_prepare watcher.

Here's what the libev doc  says:

All|ev_prepare|watchers are invoked just/before/|ev_run|starts
to gather new events, and all|ev_check|watchers are queued (not
invoked) just after|ev_run|has gathered them, but before it
queues any callbacks for any received events. That means
|ev_prepare|watchers are the last watchers invoked before the
event loop sleeps or polls for new events, and|ev_check|watchers
will be invoked before any other watchers of the same or lower
priority within an event loop iteration.


> +	/** Configured election timeout in seconds. */
>   	double election_timeout;
>   };
>   
>   extern struct raft raft;
>   
> +/**
> + * A flag whether the instance is read-only according to Raft. Even if Raft
> + * allows writes though, it does not mean the instance is writable. It can be
> + * affected by box.cfg.read_only, connection quorum.
> + */
> +static inline bool
> +raft_is_ro(void)
> +{
> +	return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
> +}
> +
> +/** See if the instance can accept rows from an instance with the given ID. */
> +static inline bool
> +raft_is_source_allowed(uint32_t source_id)
> +{
> +	return !raft.is_enabled || raft.leader == source_id;
> +}
> +
>   /** Check if Raft is enabled. */
>   static inline bool
>   raft_is_enabled(void)
> @@ -78,6 +185,13 @@ raft_process_recovery(const struct raft_request *req);
>   void
>   raft_process_msg(const struct raft_request *req, uint32_t source);
>   
> +/**
> + * Process a heartbeat message from an instance with the given ID. It is used to
> + * watch leader's health and start election when necessary.
> + */
> +void
> +raft_process_heartbeat(uint32_t source);
> +
>   /** Configure whether Raft is enabled. */
>   void
>   raft_cfg_is_enabled(bool is_enabled);
> @@ -130,6 +244,10 @@ raft_serialize_for_disk(struct raft_request *req);
>   void
>   raft_broadcast(const struct raft_request *req);
>   
> +/** Initialize Raft global data structures. */
> +void
> +raft_init(void);
> +
>   #if defined(__cplusplus)
>   }
>   #endif
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 74581db9c..4f9bbc0de 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
>   		relay_send(relay, row);
>   }
>   
> +/**
> + * Recreate recovery cursor from the last confirmed point. That is
> + * used by Raft, when the node becomes a leader. It may happen,
> + * that it already sent some data to other nodes as a follower,
> + * and they ignored the data. Now when the node is a leader, it
> + * should send the not confirmed data again. Otherwise the cluster
> + * will stuck, or worse - the newer data would be sent without the
> + * older sent but ignored data.
> + */
> +static void
> +relay_restart_recovery(struct relay *relay)
> +{
> +	recovery_delete(relay->r);
> +	relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
> +	recover_remaining_wals(relay->r, &relay->stream, NULL, true);
> +}
> +
>   struct relay_raft_msg {
>   	struct cmsg base;
>   	struct cmsg_hop route;
> @@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
>   	struct xrow_header row;
>   	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
>   	try {
> +		if (msg->req.state == RAFT_STATE_LEADER)
> +			relay_restart_recovery(msg->relay);
>   		relay_send(msg->relay, &row);

Looks like you should first send the message and then restart the recovery.
Otherwise the resent data will be ignored once again.


>   	} catch (Exception *e) {
>   		relay_set_error(msg->relay, e);

-- 
Serge Petrenko


[-- Attachment #2: Type: text/html, Size: 45093 bytes --]

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft Vladislav Shpilevoy
@ 2020-09-04 11:38   ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-04 11:38 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> Box.info.raft returns a table of form:
>
>      {
>          state: <string>,
>          term: <number>,
>          vote: <instance ID>,
>          leader: <instance ID>
>      }
>
> The fields correspond to the same named Raft concepts one to one.
> This info dump is supposed to help with the tests, first of all.
> And with investigation of problems in a real cluster.
>
> Part of #1146

Thanks for the patch! Looks good to me.

> ---
>   src/box/lua/info.c   | 17 +++++++++++++++++
>   test/box/info.result |  1 +
>   2 files changed, 18 insertions(+)
>
> diff --git a/src/box/lua/info.c b/src/box/lua/info.c
> index 1c131caec..8e1dbd497 100644
> --- a/src/box/lua/info.c
> +++ b/src/box/lua/info.c
> @@ -49,6 +49,7 @@
>   #include "main.h"
>   #include "version.h"
>   #include "box/box.h"
> +#include "box/raft.h"
>   #include "lua/utils.h"
>   #include "fiber.h"
>   #include "tt_static.h"
> @@ -577,6 +578,21 @@ lbox_info_listen(struct lua_State *L)
>   	return 1;
>   }
>   
> +static int
> +lbox_info_raft(struct lua_State *L)
> +{
> +	lua_createtable(L, 0, 4);
> +	lua_pushstring(L, raft_state_strs[raft.state]);
> +	lua_setfield(L, -2, "state");
> +	luaL_pushuint64(L, raft.volatile_term);
> +	lua_setfield(L, -2, "term");
> +	lua_pushinteger(L, raft.volatile_vote);
> +	lua_setfield(L, -2, "vote");
> +	lua_pushinteger(L, raft.leader);
> +	lua_setfield(L, -2, "leader");
> +	return 1;
> +}
> +
>   static const struct luaL_Reg lbox_info_dynamic_meta[] = {
>   	{"id", lbox_info_id},
>   	{"uuid", lbox_info_uuid},
> @@ -595,6 +611,7 @@ static const struct luaL_Reg lbox_info_dynamic_meta[] = {
>   	{"vinyl", lbox_info_vinyl},
>   	{"sql", lbox_info_sql},
>   	{"listen", lbox_info_listen},
> +	{"raft", lbox_info_raft},
>   	{NULL, NULL}
>   };
>   
> diff --git a/test/box/info.result b/test/box/info.result
> index 40eeae069..d0abb634a 100644
> --- a/test/box/info.result
> +++ b/test/box/info.result
> @@ -82,6 +82,7 @@ t
>     - memory
>     - package
>     - pid
> +  - raft
>     - replication
>     - replication_anon
>     - ro

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count
  2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy
  2020-09-04  8:24   ` Serge Petrenko
@ 2020-09-07 15:45   ` Sergey Ostanevich
  2020-09-07 22:54     ` Vladislav Shpilevoy
  1 sibling, 1 reply; 27+ messages in thread
From: Sergey Ostanevich @ 2020-09-07 15:45 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

Hi!

Thanks for the patch, just one comment below.

Regards,
Sergos


On 04 сен 00:51, Vladislav Shpilevoy wrote:
> Struct replicaset didn't store a number of registered replicas.
> Only an array, which was necessary to fullscan each time when want
> to find the count.
> 
> That is going to be needed in Raft to calculate election quorum.
> The patch makes the count tracked so as it could be found for
> constant time by simply reading an integer.
> 
> Needed for #1146
> ---
>  src/box/replication.cc | 3 +++
>  src/box/replication.h  | 7 +++++++
>  2 files changed, 10 insertions(+)
> 
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index ef0e2411d..20f16206a 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
>  						   tt_uuid_str(&replica->uuid));
>  	}
>  	replicaset.replica_by_id[replica_id] = replica;
> +	++replicaset.size;

There's another use of the replica_set_id() inside the  register_replica()
of alter.cc. Apparently, it's just for re-assign the ID without adding of
a new node. I would propose to move this counter into replicaset_add()

>  
>  	say_info("assigned id %d to replica %s",
>  		 replica->id, tt_uuid_str(&replica->uuid));
> @@ -267,6 +268,8 @@ replica_clear_id(struct replica *replica)
>  	 * replication.
>  	 */
>  	replicaset.replica_by_id[replica->id] = NULL;
> +	assert(replicaset.size > 0);
> +	--replicaset.size;
>  	if (replica->id == instance_id) {
>  		/* See replica_check_id(). */
>  		assert(replicaset.is_joining);
> diff --git a/src/box/replication.h b/src/box/replication.h
> index ddc2bddf4..69cc820c9 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -217,6 +217,13 @@ struct replicaset {
>  	bool is_joining;
>  	/* A number of anonymous replicas following this instance. */
>  	int anon_count;
> +	/**
> +	 * Number of registered replicas. That includes all of them - connected,
> +	 * disconnected, connected not directly, just present in _cluster. If an
> +	 * instance has an ID, has the same replicaset UUID, then it is
> +	 * accounted here.
> +	 */
> +	int size;
>  	/** Applier state. */
>  	struct {
>  		/**
> -- 
> 2.21.1 (Apple Git-122.3)
> 

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier
  2020-09-04  8:13   ` Serge Petrenko
@ 2020-09-07 22:54     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-07 22:54 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov

Hi! Thanks for the review!

>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index c1d07ca54..699b5a683 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -67,6 +67,23 @@ applier_set_state(struct applier *applier, enum applier_state state)
>>       trigger_run_xc(&applier->on_state, applier);
>>   }
>>   +static inline void
>> +applier_assign_instance_id(struct applier *applier)
> Maybe call it `applier_set_id`? This way it's shorter and resembles `replica_set_id`.

Indeed, that looks better.

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 699b5a683..db17c7338 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -68,7 +68,7 @@ applier_set_state(struct applier *applier, enum applier_state state)
 }
 
 static inline void
-applier_assign_instance_id(struct applier *applier)
+applier_set_id(struct applier *applier)
 {
 	/*
 	 * After final join, the applier already received latest
@@ -620,7 +620,7 @@ applier_join(struct applier *applier)
 	say_info("final data received");
 
 	applier_set_state(applier, APPLIER_JOINED);
-	applier_assign_instance_id(applier);
+	applier_set_id(applier);
 	applier_set_state(applier, APPLIER_READY);
 }
 
@@ -1225,7 +1225,7 @@ applier_subscribe(struct applier *applier)
 		    instance_id != REPLICA_ID_NIL) {
 			say_info("final data received");
 			applier_set_state(applier, APPLIER_JOINED);
-			applier_assign_instance_id(applier);
+			applier_set_id(applier);
 			applier_set_state(applier, APPLIER_READY);
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count
  2020-09-07 15:45   ` Sergey Ostanevich
@ 2020-09-07 22:54     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-07 22:54 UTC (permalink / raw)
  To: Sergey Ostanevich; +Cc: tarantool-patches

Hi! Thanks for the review!

>> diff --git a/src/box/replication.cc b/src/box/replication.cc
>> index ef0e2411d..20f16206a 100644
>> --- a/src/box/replication.cc
>> +++ b/src/box/replication.cc
>> @@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
>>  						   tt_uuid_str(&replica->uuid));
>>  	}
>>  	replicaset.replica_by_id[replica_id] = replica;
>> +	++replicaset.size;
> 
> There's another use of the replica_set_id() inside the  register_replica()
> of alter.cc. Apparently, it's just for re-assign the ID without adding of
> a new node. I would propose to move this counter into replicaset_add()

A replica can become anon, and can be deleted. So I can't manage the counter
only in replicaset_add. It should decrease somewhere.

Talking of alter.cc, instance ID is a primary key of _cluster. So it can't be
changed for an existing replica. It needs to be deleted and added with a new
ID. It means, instance ID is essentially a read-only field.

Also this is validated in replica_set_id() - it checks the replica wasn't
registered before in an assertion - and in replica_clear_id() - it checks the
replica is registered. There are no cases, when an ID is updated for an existing
replica.

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count
  2020-09-04  8:24   ` Serge Petrenko
@ 2020-09-07 22:54     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-07 22:54 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov

Thanks for the review!

>> diff --git a/src/box/replication.h b/src/box/replication.h
>> index ddc2bddf4..69cc820c9 100644
>> --- a/src/box/replication.h
>> +++ b/src/box/replication.h
>> @@ -217,6 +217,13 @@ struct replicaset {
>>       bool is_joining;
>>       /* A number of anonymous replicas following this instance. */
>>       int anon_count;
>> +    /**
>> +     * Number of registered replicas. That includes all of them - connected,
>> +     * disconnected, connected not directly, just present in _cluster. If an
>> +     * instance has an ID, has the same replicaset UUID, then it is
>> +     * accounted here.
>> +     */
>> +    int size;
> 
> We already have `anon_count`. Maybe call this field `registered_count` for consistency?
> Besides `size` is too general of a term.

Agree.

====================
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 20f16206a..6852026c8 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,7 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
 						   tt_uuid_str(&replica->uuid));
 	}
 	replicaset.replica_by_id[replica_id] = replica;
-	++replicaset.size;
+	++replicaset.registered_count;
 
 	say_info("assigned id %d to replica %s",
 		 replica->id, tt_uuid_str(&replica->uuid));
@@ -268,8 +268,8 @@ replica_clear_id(struct replica *replica)
 	 * replication.
 	 */
 	replicaset.replica_by_id[replica->id] = NULL;
-	assert(replicaset.size > 0);
-	--replicaset.size;
+	assert(replicaset.registered_count > 0);
+	--replicaset.registered_count;
 	if (replica->id == instance_id) {
 		/* See replica_check_id(). */
 		assert(replicaset.is_joining);
diff --git a/src/box/replication.h b/src/box/replication.h
index 69cc820c9..3e46c592a 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -223,7 +223,7 @@ struct replicaset {
 	 * instance has an ID, has the same replicaset UUID, then it is
 	 * accounted here.
 	 */
-	int size;
+	int registered_count;
 	/** Applier state. */
 	struct {
 		/**

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options
  2020-09-04  9:07   ` Serge Petrenko
@ 2020-09-07 22:55     ` Vladislav Shpilevoy
  0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-07 22:55 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov

Thanks for the review!

>> diff --git a/src/box/box.cc b/src/box/box.cc
>> index 281917af2..5f04a1a78 100644
>> --- a/src/box/box.cc
>> +++ b/src/box/box.cc
>> @@ -472,6 +472,40 @@ box_check_uri(const char *source, const char *option_name)
>>       }
>>   }
>>   +static int
>> +box_check_raft_is_enabled(void)
>> +{
>> +    int b = cfg_getb("raft_is_enabled");
>> +    if (b < 0) {
>> +        diag_set(ClientError, ER_CFG, "raft_is_enabled",
>> +             "the value must be a boolean");
>> +    }
>> +    return b;
>> +}
>> +
>> +static int
>> +box_check_raft_is_candidate(void)
>> +{
>> +    int b = cfg_getb("raft_is_candidate");
>> +    if (b < 0) {
>> +        diag_set(ClientError, ER_CFG, "raft_is_candidate",
>> +             "the value must be a boolean");
>> +    }
>> +    return b;
>> +}
>> +
>> +static double
>> +box_check_raft_election_timeout(void)
>> +{
>> +    double d = cfg_getd("raft_election_timeout");
>> +    if (d == 0) {
> 
> Should be "d <= 0" here?
> 
> Otherwise you end up with a diag_raise without appropriate diag_set
> when raft_election_timeout is negative.

Yes, this is a typo:

====================
diff --git a/src/box/box.cc b/src/box/box.cc
index 5f04a1a78..5c87d86d7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -498,7 +498,7 @@ static double
 box_check_raft_election_timeout(void)
 {
 	double d = cfg_getd("raft_election_timeout");
-	if (d == 0) {
+	if (d <= 0) {
 		diag_set(ClientError, ER_CFG, "raft_election_timeout",
 			 "the value must be a positive number");
 		return -1;

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine
  2020-09-04 11:36   ` Serge Petrenko
@ 2020-09-07 22:57     ` Vladislav Shpilevoy
  2020-09-09  8:04       ` Serge Petrenko
  0 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-07 22:57 UTC (permalink / raw)
  To: Serge Petrenko, tarantool-patches, gorcunov

Thanks for the review!

>>  src/box/applier.cc |  18 +-
>>  src/box/box.cc     |   7 +-
>>  src/box/raft.c     | 711 +++++++++++++++++++++++++++++++++++++++++++--
>>  src/box/raft.h     | 118 ++++++++
>>  src/box/relay.cc   |  19 ++
>>  5 files changed, 847 insertions(+), 26 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 5459a1dc1..c7c486ee4 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>>   * Return 0 for success or -1 in case of an error.
>>   */
>>  static int
>> -applier_apply_tx(struct stailq *rows)
>> +applier_apply_tx(struct applier *applier, struct stailq *rows)
>>  {
>> +	/*
>> +	 * Rows received not directly from a leader are ignored. That is a
>> +	 * protection against the case when an old leader keeps sending data
>> +	 * around not knowing yet that it is not a leader anymore.
>> +	 *
>> +	 * XXX: it may be that this can be fine to apply leader transactions by
>> +	 * looking at their replica_id field if it is equal to leader id. That
>> +	 * can be investigated as an 'optimization'. Even though may not give
>> +	 * anything, because won't change total number of rows sent in the
>> +	 * network anyway.
>> +	 */
>> +	if (!raft_is_source_allowed(applier->instance_id))
>> +		return 0;
> Have we abandoned Rule 66?

Yes, it was officially deprecated.

>> diff --git a/src/box/raft.h b/src/box/raft.h
>> index db64cf933..111a9c16e 100644
>> --- a/src/box/raft.h
>> +++ b/src/box/raft.h
>> @@ -31,34 +31,141 @@
>>  struct raft {
>> +	/** Instance ID of leader of the current term. */
>>  	uint32_t leader;
>> +	/** State of the instance. */
>>  	enum raft_state state;
>> +	/**
>> +	 * Volatile part of the Raft state, whose WAL write may be still
>> +	 * in-progress, and yet the state may be already used. Volatile state is
>> +	 * never sent to anywhere, but the state machine makes decisions based
>> +	 * on it. That is vital.
>> +	 * As an example, volatile vote needs to be used to reject votes inside
>> +	 * a term, where the instance already voted (even if the vote WAL write
>> +	 * is not finished yet). Otherwise the instance would try to write
>> +	 * several votes inside one term.
>> +	 */
>> +	uint64_t volatile_term;
>> +	uint32_t volatile_vote;
>> +	/**
>> +	 * Flag whether Raft is enabled. When disabled, it still persists terms
>> +	 * so as to quickly enroll into the cluster when (if) it is enabled. In
>> +	 * everything else disabled Raft does not affect instance work.
>> +	 */
>>  	bool is_enabled;
>> +	/**
>> +	 * Flag whether the node can become a leader. It is an accumulated value
>> +	 * of configuration options Raft enabled and Raft candidate. If at least
>> +	 * one is false - the instance is not a candidate.
>> +	 */
>>  	bool is_candidate;
>> +	/** Flag whether the instance is allowed to be a leader. */
>> +	bool is_cfg_candidate;
>> +	/**
>> +	 * Flag whether Raft currently tries to write something into WAL. It
>> +	 * happens asynchronously, not right after Raft state is updated.
>> +	 */
>> +	bool is_write_in_progress;
>> +	/**
>> +	 * Persisted Raft state. These values are used when need to tell current
>> +	 * Raft state to other nodes.
>> +	 */
>>  	uint64_t term;
>>  	uint32_t vote;
>> +	/**
>> +	 * Bit 1 on position N means that a vote from instance with ID = N was
>> +	 * obtained.
>> +	 */
>> +	uint32_t vote_mask;
>> +	/** Number of votes for this instance. Valid only in candidate state. */
>> +	int vote_count;
>> +	/** State machine timed event trigger. */
>> +	struct ev_timer timer;
>> +	/**
>> +	 * Dump of Raft state in the end of event loop, when it is changed.
>> +	 */
>> +	struct ev_check io;
> 
> You probably need the ev_prepare watcher.
> 
> Here's what the libev doc  says:
> 
> All |ev_prepare| watchers are invoked just /before/ |ev_run| starts
> to gather new events, and all |ev_check| watchers are queued (not
> invoked) just after |ev_run| has gathered them, but before it
> queues any callbacks for any received events. That means
> |ev_prepare| watchers are the last watchers invoked before the
> event loop sleeps or polls for new events, and |ev_check| watchers
> will be invoked before any other watchers of the same or lower
> priority within an event loop iteration.

I am not so sure. The documentation is hard to read. From what I understand,
this is how it works: ev_prepare is invoked, then events are collected, then
ev_check is invoked, then the callbacks are called. In our case it seems to
be not important whether we use prepare or check. But on the other hand, it
is probably not how it works, because in fiber.top check is called in the
beginning of an iteration, and prepare is called afterwards, and it fiber.top
returns sane results.

Here I see that check is called after prepare. So check is at least 'closer' to
event loop iteration end, isn't it?:

	http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#code_ev_prepare_code_and_code_ev_che
	Prepare and check watchers are often (but not always) used in pairs: prepare watchers get invoked before the process blocks and check watchers afterwards.

I also couldn't get any info from libev source code, because it is unreadable
mostly.

Is there a case, when ev_check won't work, and ev_prepare will? Is
ev_prepare actually closer to an event loop iteration end, and I missed something?

>> +	/** Configured election timeout in seconds. */
>>  	double election_timeout;
>>  };
>> @@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
>>  	struct xrow_header row;
>>  	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
>>  	try {
>> +		if (msg->req.state == RAFT_STATE_LEADER)
>> +			relay_restart_recovery(msg->relay);
>>  		relay_send(msg->relay, &row);
> 
> Looks like you should first send the message and then restart the recovery.
> Otherwise the resent data will be ignored once again.

Yes, true.

====================
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4f9bbc0de..d63711600 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -803,9 +803,14 @@ relay_raft_msg_push(struct cmsg *base)
 	struct xrow_header row;
 	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
 	try {
+		/*
+		 * Send the message before restarting the recovery. Otherwise
+		 * all the rows would be sent from under a non-leader role and
+		 * would be ignored again.
+		 */
+		relay_send(msg->relay, &row);
 		if (msg->req.state == RAFT_STATE_LEADER)
 			relay_restart_recovery(msg->relay);
-		relay_send(msg->relay, &row);
 	} catch (Exception *e) {
 		relay_set_error(msg->relay, e);
 		fiber_cancel(fiber());

^ permalink raw reply	[flat|nested] 27+ messages in thread

* Re: [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine
  2020-09-07 22:57     ` Vladislav Shpilevoy
@ 2020-09-09  8:04       ` Serge Petrenko
  0 siblings, 0 replies; 27+ messages in thread
From: Serge Petrenko @ 2020-09-09  8:04 UTC (permalink / raw)
  To: Vladislav Shpilevoy, tarantool-patches, gorcunov


08.09.2020 01:57, Vladislav Shpilevoy пишет:
> Thanks for the review!


Hi! Thanks for your reply!

>
>>>   src/box/applier.cc |  18 +-
>>>   src/box/box.cc     |   7 +-
>>>   src/box/raft.c     | 711 +++++++++++++++++++++++++++++++++++++++++++--
>>>   src/box/raft.h     | 118 ++++++++
>>>   src/box/relay.cc   |  19 ++
>>>   5 files changed, 847 insertions(+), 26 deletions(-)
>>>
>>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>>> index 5459a1dc1..c7c486ee4 100644
>>> --- a/src/box/applier.cc
>>> +++ b/src/box/applier.cc
>>> @@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
>>>    * Return 0 for success or -1 in case of an error.
>>>    */
>>>   static int
>>> -applier_apply_tx(struct stailq *rows)
>>> +applier_apply_tx(struct applier *applier, struct stailq *rows)
>>>   {
>>> +	/*
>>> +	 * Rows received not directly from a leader are ignored. That is a
>>> +	 * protection against the case when an old leader keeps sending data
>>> +	 * around not knowing yet that it is not a leader anymore.
>>> +	 *
>>> +	 * XXX: it may be that this can be fine to apply leader transactions by
>>> +	 * looking at their replica_id field if it is equal to leader id. That
>>> +	 * can be investigated as an 'optimization'. Even though may not give
>>> +	 * anything, because won't change total number of rows sent in the
>>> +	 * network anyway.
>>> +	 */
>>> +	if (!raft_is_source_allowed(applier->instance_id))
>>> +		return 0;
>> Have we abandoned Rule 66?
> Yes, it was officially deprecated.
>
>>> diff --git a/src/box/raft.h b/src/box/raft.h
>>> index db64cf933..111a9c16e 100644
>>> --- a/src/box/raft.h
>>> +++ b/src/box/raft.h
>>> @@ -31,34 +31,141 @@
>>>   struct raft {
>>> +	/** Instance ID of leader of the current term. */
>>>   	uint32_t leader;
>>> +	/** State of the instance. */
>>>   	enum raft_state state;
>>> +	/**
>>> +	 * Volatile part of the Raft state, whose WAL write may be still
>>> +	 * in-progress, and yet the state may be already used. Volatile state is
>>> +	 * never sent to anywhere, but the state machine makes decisions based
>>> +	 * on it. That is vital.
>>> +	 * As an example, volatile vote needs to be used to reject votes inside
>>> +	 * a term, where the instance already voted (even if the vote WAL write
>>> +	 * is not finished yet). Otherwise the instance would try to write
>>> +	 * several votes inside one term.
>>> +	 */
>>> +	uint64_t volatile_term;
>>> +	uint32_t volatile_vote;
>>> +	/**
>>> +	 * Flag whether Raft is enabled. When disabled, it still persists terms
>>> +	 * so as to quickly enroll into the cluster when (if) it is enabled. In
>>> +	 * everything else disabled Raft does not affect instance work.
>>> +	 */
>>>   	bool is_enabled;
>>> +	/**
>>> +	 * Flag whether the node can become a leader. It is an accumulated value
>>> +	 * of configuration options Raft enabled and Raft candidate. If at least
>>> +	 * one is false - the instance is not a candidate.
>>> +	 */
>>>   	bool is_candidate;
>>> +	/** Flag whether the instance is allowed to be a leader. */
>>> +	bool is_cfg_candidate;
>>> +	/**
>>> +	 * Flag whether Raft currently tries to write something into WAL. It
>>> +	 * happens asynchronously, not right after Raft state is updated.
>>> +	 */
>>> +	bool is_write_in_progress;
>>> +	/**
>>> +	 * Persisted Raft state. These values are used when need to tell current
>>> +	 * Raft state to other nodes.
>>> +	 */
>>>   	uint64_t term;
>>>   	uint32_t vote;
>>> +	/**
>>> +	 * Bit 1 on position N means that a vote from instance with ID = N was
>>> +	 * obtained.
>>> +	 */
>>> +	uint32_t vote_mask;
>>> +	/** Number of votes for this instance. Valid only in candidate state. */
>>> +	int vote_count;
>>> +	/** State machine timed event trigger. */
>>> +	struct ev_timer timer;
>>> +	/**
>>> +	 * Dump of Raft state in the end of event loop, when it is changed.
>>> +	 */
>>> +	struct ev_check io;
>> You probably need the ev_prepare watcher.
>>
>> Here's what the libev doc  says:
>>
>> All |ev_prepare| watchers are invoked just /before/ |ev_run| starts
>> to gather new events, and all |ev_check| watchers are queued (not
>> invoked) just after |ev_run| has gathered them, but before it
>> queues any callbacks for any received events. That means
>> |ev_prepare| watchers are the last watchers invoked before the
>> event loop sleeps or polls for new events, and |ev_check| watchers
>> will be invoked before any other watchers of the same or lower
>> priority within an event loop iteration.
> I am not so sure. The documentation is hard to read. From what I understand,
> this is how it works: ev_prepare is invoked, then events are collected, then
> ev_check is invoked, then the callbacks are called.

Yes, it works this way. Aren't we speaking of the same things?
By 'ev_loop iteration' I mean the time when our fibers perform some 
useful work and
various ev callbacks are called. This means EV_CHECK is called in the 
beginning of
'ev_loop iteration' and EV_PREPARE is called in the end.

> In our case it seems to
> be not important whether we use prepare or check. But on the other hand, it
> is probably not how it works, because in fiber.top check is called in the
> beginning of an iteration, and prepare is called afterwards, and it fiber.top
> returns sane results.
This is so because all the fibers perform their work between ev_check 
and ev_prepare.
>
> Here I see that check is called after prepare. So check is at least 'closer' to
> event loop iteration end, isn't it?:
>
> 	http://pod.tst.eu/http://cvs.schmorp.de/libev/ev.pod#code_ev_prepare_code_and_code_ev_che
> 	Prepare and check watchers are often (but not always) used in pairs: prepare watchers get invoked before the process blocks and check watchers afterwards.

As I understand it, EV_CHECK is called in  the 'beginning' of an ev_loop 
iteration.
Anyway, looks like may use any of the watchers. I don't think it makes 
any difference.

>
> I also couldn't get any info from libev source code, because it is unreadable
> mostly.
>
> Is there a case, when ev_check won't work, and ev_prepare will? Is
> ev_prepare actually closer to an event loop iteration end, and I missed something?
>
>>> +	/** Configured election timeout in seconds. */
>>>   	double election_timeout;
>>>   };
>>> @@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
>>>   	struct xrow_header row;
>>>   	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
>>>   	try {
>>> +		if (msg->req.state == RAFT_STATE_LEADER)
>>> +			relay_restart_recovery(msg->relay);
>>>   		relay_send(msg->relay, &row);
>> Looks like you should first send the message and then restart the recovery.
>> Otherwise the resent data will be ignored once again.
> Yes, true.
>
> ====================
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index 4f9bbc0de..d63711600 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -803,9 +803,14 @@ relay_raft_msg_push(struct cmsg *base)
>   	struct xrow_header row;
>   	xrow_encode_raft(&row, &fiber()->gc, &msg->req);
>   	try {
> +		/*
> +		 * Send the message before restarting the recovery. Otherwise
> +		 * all the rows would be sent from under a non-leader role and
> +		 * would be ignored again.
> +		 */
> +		relay_send(msg->relay, &row);
>   		if (msg->req.state == RAFT_STATE_LEADER)
>   			relay_restart_recovery(msg->relay);
> -		relay_send(msg->relay, &row);
>   	} catch (Exception *e) {
>   		relay_set_error(msg->relay, e);
>   		fiber_cancel(fiber());

-- 
Serge Petrenko

^ permalink raw reply	[flat|nested] 27+ messages in thread

end of thread, other threads:[~2020-09-09  8:04 UTC | newest]

Thread overview: 27+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-09-03 22:51 [Tarantool-patches] [PATCH v2 00/10] dRaft Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
2020-09-04  8:13   ` Serge Petrenko
2020-09-07 22:54     ` Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 10/10] raft: introduce box.info.raft Vladislav Shpilevoy
2020-09-04 11:38   ` Serge Petrenko
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 02/10] box: introduce summary RO flag Vladislav Shpilevoy
2020-09-04  8:17   ` Serge Petrenko
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
2020-09-04  8:20   ` Serge Petrenko
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 04/10] replication: track registered replica count Vladislav Shpilevoy
2020-09-04  8:24   ` Serge Petrenko
2020-09-07 22:54     ` Vladislav Shpilevoy
2020-09-07 15:45   ` Sergey Ostanevich
2020-09-07 22:54     ` Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
2020-09-04  8:59   ` Serge Petrenko
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 06/10] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
2020-09-04  9:07   ` Serge Petrenko
2020-09-07 22:55     ` Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 07/10] raft: relay status updates to followers Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 08/10] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
2020-09-04  9:22   ` Serge Petrenko
2020-09-03 22:51 ` [Tarantool-patches] [PATCH v2 09/10] raft: introduce state machine Vladislav Shpilevoy
2020-09-04 11:36   ` Serge Petrenko
2020-09-07 22:57     ` Vladislav Shpilevoy
2020-09-09  8:04       ` Serge Petrenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox