* [Tarantool-patches] [PATCH v3 01/10] applier: store instance_id in struct applier
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 10/10] raft: add tests Vladislav Shpilevoy
` (10 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
Applier is going to need its numeric ID in order to tell the
future Raft module who is a sender of a Raft message. An
alternative would be to add sender ID to each Raft message, but
this looks like a crutch. Moreover, applier still needs to know
its numeric ID in order to notify Raft about heartbeats from the
peer node.
Needed for #1146
---
src/box/applier.cc | 1 +
src/box/applier.h | 2 ++
2 files changed, 3 insertions(+)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index e272a7af6..1a0b55640 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1118,6 +1118,7 @@ applier_subscribe(struct applier *applier)
vclock_create(&applier->remote_vclock_at_subscribe);
xrow_decode_subscribe_response_xc(&row, &cluster_id,
&applier->remote_vclock_at_subscribe);
+ applier->instance_id = row.replica_id;
/*
* If master didn't send us its cluster id
* assume that it has done all the checks.
diff --git a/src/box/applier.h b/src/box/applier.h
index 6e979a806..15ca1fcfd 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -95,6 +95,8 @@ struct applier {
ev_tstamp lag;
/** The last box_error_code() logged to avoid log flooding */
uint32_t last_logged_errcode;
+ /** Remote instance ID. */
+ uint32_t instance_id;
/** Remote instance UUID */
struct tt_uuid uuid;
/** Remote URI (string) */
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 10/10] raft: add tests
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 02/10] box: introduce summary RO flag Vladislav Shpilevoy
` (9 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
Part of #1146
---
test/replication/election_basic.result | 278 +++++++++++++++++++++++
test/replication/election_basic.test.lua | 117 ++++++++++
test/replication/election_replica.lua | 30 +++
test/replication/election_replica1.lua | 1 +
test/replication/election_replica2.lua | 1 +
test/replication/election_replica3.lua | 1 +
6 files changed, 428 insertions(+)
create mode 100644 test/replication/election_basic.result
create mode 100644 test/replication/election_basic.test.lua
create mode 100644 test/replication/election_replica.lua
create mode 120000 test/replication/election_replica1.lua
create mode 120000 test/replication/election_replica2.lua
create mode 120000 test/replication/election_replica3.lua
diff --git a/test/replication/election_basic.result b/test/replication/election_basic.result
new file mode 100644
index 000000000..e59386f90
--- /dev/null
+++ b/test/replication/election_basic.result
@@ -0,0 +1,278 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+--
+-- gh-1146: Raft protocol for automated leader election.
+--
+
+old_election_timeout = box.cfg_election_timeout
+ | ---
+ | ...
+
+-- Election is turned off by default.
+assert(not box.cfg.election_is_enabled)
+ | ---
+ | - true
+ | ...
+-- Is candidate by default. Although it does not matter, until election is
+-- turned on.
+assert(box.cfg.election_is_candidate)
+ | ---
+ | - true
+ | ...
+-- Ensure election options are validated.
+box.cfg{election_is_enabled = 100}
+ | ---
+ | - error: 'Incorrect value for option ''election_is_enabled'': should be of type boolean'
+ | ...
+box.cfg{election_is_candidate = 100}
+ | ---
+ | - error: 'Incorrect value for option ''election_is_candidate'': should be of type
+ | boolean'
+ | ...
+box.cfg{election_timeout = -1}
+ | ---
+ | - error: 'Incorrect value for option ''election_timeout'': the value must be a positive
+ | number'
+ | ...
+box.cfg{election_timeout = 0}
+ | ---
+ | - error: 'Incorrect value for option ''election_timeout'': the value must be a positive
+ | number'
+ | ...
+
+-- When election is disabled, the instance is a follower. Does not try to become
+-- a leader, and does not block write operations.
+term = box.info.election.term
+ | ---
+ | ...
+vote = box.info.election.vote
+ | ---
+ | ...
+assert(box.info.election.state == 'follower')
+ | ---
+ | - true
+ | ...
+assert(box.info.election.leader == 0)
+ | ---
+ | - true
+ | ...
+assert(not box.info.ro)
+ | ---
+ | - true
+ | ...
+
+-- Turned on election blocks writes until the instance becomes a leader.
+box.cfg{election_is_candidate = false}
+ | ---
+ | ...
+box.cfg{election_is_enabled = true}
+ | ---
+ | ...
+assert(box.info.election.state == 'follower')
+ | ---
+ | - true
+ | ...
+assert(box.info.ro)
+ | ---
+ | - true
+ | ...
+-- Term is not changed, because the instance can't be a candidate,
+-- and therefore didn't try to vote nor to bump the term.
+assert(box.info.election.term == term)
+ | ---
+ | - true
+ | ...
+assert(box.info.election.vote == vote)
+ | ---
+ | - true
+ | ...
+assert(box.info.election.leader == 0)
+ | ---
+ | - true
+ | ...
+
+-- Candidate instance votes immediately, if sees no leader.
+box.cfg{election_timeout = 1000}
+ | ---
+ | ...
+box.cfg{election_is_candidate = true}
+ | ---
+ | ...
+test_run:wait_cond(function() return box.info.election.state == 'leader' end)
+ | ---
+ | - true
+ | ...
+assert(box.info.election.term > term)
+ | ---
+ | - true
+ | ...
+assert(box.info.election.vote == box.info.id)
+ | ---
+ | - true
+ | ...
+assert(box.info.election.leader == box.info.id)
+ | ---
+ | - true
+ | ...
+
+box.cfg{ \
+ election_is_enabled = false, \
+ election_is_candidate = true, \
+ election_timeout = old_election_timeout \
+}
+ | ---
+ | ...
+
+--
+-- See if bootstrap with election enabled works.
+--
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+ | ---
+ | ...
+test_run:create_cluster(SERVERS, "replication")
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+is_leader_cmd = 'return box.info.election.state == \'leader\''
+ | ---
+ | ...
+leader_id_cmd = 'return box.info.election.leader'
+ | ---
+ | ...
+is_r1_leader = test_run:eval('election_replica1', is_leader_cmd)[1]
+ | ---
+ | ...
+is_r2_leader = test_run:eval('election_replica2', is_leader_cmd)[1]
+ | ---
+ | ...
+is_r3_leader = test_run:eval('election_replica3', is_leader_cmd)[1]
+ | ---
+ | ...
+leader_count = is_r1_leader and 1 or 0
+ | ---
+ | ...
+leader_count = leader_count + (is_r2_leader and 1 or 0)
+ | ---
+ | ...
+leader_count = leader_count + (is_r3_leader and 1 or 0)
+ | ---
+ | ...
+assert(leader_count == 1)
+ | ---
+ | - true
+ | ...
+-- All nodes have the same leader.
+r1_leader = test_run:eval('election_replica1', leader_id_cmd)[1]
+ | ---
+ | ...
+r2_leader = test_run:eval('election_replica2', leader_id_cmd)[1]
+ | ---
+ | ...
+r3_leader = test_run:eval('election_replica3', leader_id_cmd)[1]
+ | ---
+ | ...
+assert(r1_leader ~= 0)
+ | ---
+ | - true
+ | ...
+assert(r1_leader == r2_leader)
+ | ---
+ | - true
+ | ...
+assert(r1_leader == r3_leader)
+ | ---
+ | - true
+ | ...
+
+--
+-- Leader death starts a new election.
+--
+leader_name = nil
+ | ---
+ | ...
+nonleader1_name = nil
+ | ---
+ | ...
+nonleader2_name = nil
+ | ---
+ | ...
+if is_r1_leader then \
+ leader_name = 'election_replica1' \
+ nonleader1_name = 'election_replica2' \
+ nonleader2_name = 'election_replica3' \
+elseif is_r2_leader then \
+ leader_name = 'election_replica2' \
+ nonleader1_name = 'election_replica1' \
+ nonleader2_name = 'election_replica3' \
+else \
+ leader_name = 'election_replica3' \
+ nonleader1_name = 'election_replica1' \
+ nonleader2_name = 'election_replica2' \
+end
+ | ---
+ | ...
+-- Lower the quorum so the 2 alive nodes could elect a new leader when the third
+-- node dies.
+test_run:switch(nonleader1_name)
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum = 2}
+ | ---
+ | ...
+-- Switch via default where the names are defined.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:switch(nonleader2_name)
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum = 2}
+ | ---
+ | ...
+
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+test_run:cmd(string.format('stop server %s', leader_name))
+ | ---
+ | - true
+ | ...
+test_run:wait_cond(function() \
+ is_r1_leader = test_run:eval(nonleader1_name, is_leader_cmd)[1] \
+ is_r2_leader = test_run:eval(nonleader2_name, is_leader_cmd)[1] \
+ return is_r1_leader or is_r2_leader \
+end)
+ | ---
+ | - true
+ | ...
+r1_leader = test_run:eval(nonleader1_name, leader_id_cmd)[1]
+ | ---
+ | ...
+r2_leader = test_run:eval(nonleader2_name, leader_id_cmd)[1]
+ | ---
+ | ...
+assert(r1_leader ~= 0)
+ | ---
+ | - true
+ | ...
+assert(r1_leader == r2_leader)
+ | ---
+ | - true
+ | ...
+
+test_run:cmd(string.format('start server %s', leader_name))
+ | ---
+ | - true
+ | ...
+
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
diff --git a/test/replication/election_basic.test.lua b/test/replication/election_basic.test.lua
new file mode 100644
index 000000000..506d5ec4e
--- /dev/null
+++ b/test/replication/election_basic.test.lua
@@ -0,0 +1,117 @@
+test_run = require('test_run').new()
+--
+-- gh-1146: Raft protocol for automated leader election.
+--
+
+old_election_timeout = box.cfg_election_timeout
+
+-- Election is turned off by default.
+assert(not box.cfg.election_is_enabled)
+-- Is candidate by default. Although it does not matter, until election is
+-- turned on.
+assert(box.cfg.election_is_candidate)
+-- Ensure election options are validated.
+box.cfg{election_is_enabled = 100}
+box.cfg{election_is_candidate = 100}
+box.cfg{election_timeout = -1}
+box.cfg{election_timeout = 0}
+
+-- When election is disabled, the instance is a follower. Does not try to become
+-- a leader, and does not block write operations.
+term = box.info.election.term
+vote = box.info.election.vote
+assert(box.info.election.state == 'follower')
+assert(box.info.election.leader == 0)
+assert(not box.info.ro)
+
+-- Turned on election blocks writes until the instance becomes a leader.
+box.cfg{election_is_candidate = false}
+box.cfg{election_is_enabled = true}
+assert(box.info.election.state == 'follower')
+assert(box.info.ro)
+-- Term is not changed, because the instance can't be a candidate,
+-- and therefore didn't try to vote nor to bump the term.
+assert(box.info.election.term == term)
+assert(box.info.election.vote == vote)
+assert(box.info.election.leader == 0)
+
+-- Candidate instance votes immediately, if sees no leader.
+box.cfg{election_timeout = 1000}
+box.cfg{election_is_candidate = true}
+test_run:wait_cond(function() return box.info.election.state == 'leader' end)
+assert(box.info.election.term > term)
+assert(box.info.election.vote == box.info.id)
+assert(box.info.election.leader == box.info.id)
+
+box.cfg{ \
+ election_is_enabled = false, \
+ election_is_candidate = true, \
+ election_timeout = old_election_timeout \
+}
+
+--
+-- See if bootstrap with election enabled works.
+--
+SERVERS = {'election_replica1', 'election_replica2', 'election_replica3'}
+test_run:create_cluster(SERVERS, "replication")
+test_run:wait_fullmesh(SERVERS)
+is_leader_cmd = 'return box.info.election.state == \'leader\''
+leader_id_cmd = 'return box.info.election.leader'
+is_r1_leader = test_run:eval('election_replica1', is_leader_cmd)[1]
+is_r2_leader = test_run:eval('election_replica2', is_leader_cmd)[1]
+is_r3_leader = test_run:eval('election_replica3', is_leader_cmd)[1]
+leader_count = is_r1_leader and 1 or 0
+leader_count = leader_count + (is_r2_leader and 1 or 0)
+leader_count = leader_count + (is_r3_leader and 1 or 0)
+assert(leader_count == 1)
+-- All nodes have the same leader.
+r1_leader = test_run:eval('election_replica1', leader_id_cmd)[1]
+r2_leader = test_run:eval('election_replica2', leader_id_cmd)[1]
+r3_leader = test_run:eval('election_replica3', leader_id_cmd)[1]
+assert(r1_leader ~= 0)
+assert(r1_leader == r2_leader)
+assert(r1_leader == r3_leader)
+
+--
+-- Leader death starts a new election.
+--
+leader_name = nil
+nonleader1_name = nil
+nonleader2_name = nil
+if is_r1_leader then \
+ leader_name = 'election_replica1' \
+ nonleader1_name = 'election_replica2' \
+ nonleader2_name = 'election_replica3' \
+elseif is_r2_leader then \
+ leader_name = 'election_replica2' \
+ nonleader1_name = 'election_replica1' \
+ nonleader2_name = 'election_replica3' \
+else \
+ leader_name = 'election_replica3' \
+ nonleader1_name = 'election_replica1' \
+ nonleader2_name = 'election_replica2' \
+end
+-- Lower the quorum so the 2 alive nodes could elect a new leader when the third
+-- node dies.
+test_run:switch(nonleader1_name)
+box.cfg{replication_synchro_quorum = 2}
+-- Switch via default where the names are defined.
+test_run:switch('default')
+test_run:switch(nonleader2_name)
+box.cfg{replication_synchro_quorum = 2}
+
+test_run:switch('default')
+test_run:cmd(string.format('stop server %s', leader_name))
+test_run:wait_cond(function() \
+ is_r1_leader = test_run:eval(nonleader1_name, is_leader_cmd)[1] \
+ is_r2_leader = test_run:eval(nonleader2_name, is_leader_cmd)[1] \
+ return is_r1_leader or is_r2_leader \
+end)
+r1_leader = test_run:eval(nonleader1_name, leader_id_cmd)[1]
+r2_leader = test_run:eval(nonleader2_name, leader_id_cmd)[1]
+assert(r1_leader ~= 0)
+assert(r1_leader == r2_leader)
+
+test_run:cmd(string.format('start server %s', leader_name))
+
+test_run:drop_cluster(SERVERS)
diff --git a/test/replication/election_replica.lua b/test/replication/election_replica.lua
new file mode 100644
index 000000000..36ea1f077
--- /dev/null
+++ b/test/replication/election_replica.lua
@@ -0,0 +1,30 @@
+#!/usr/bin/env tarantool
+
+local INSTANCE_ID = string.match(arg[0], "%d")
+local SOCKET_DIR = require('fio').cwd()
+
+local function instance_uri(instance_id)
+ return SOCKET_DIR..'/autobootstrap'..instance_id..'.sock';
+end
+
+require('console').listen(os.getenv('ADMIN'))
+
+box.cfg({
+ listen = instance_uri(INSTANCE_ID),
+ replication = {
+ instance_uri(1),
+ instance_uri(2),
+ instance_uri(3),
+ },
+ replication_timeout = 0.1,
+ election_is_enabled = true,
+ election_is_candidate = true,
+ election_timeout = 0.1,
+ replication_synchro_quorum = 3,
+ -- To reveal more election logs.
+ log_level = 6,
+})
+
+box.once("bootstrap", function()
+ box.schema.user.grant('guest', 'super')
+end)
diff --git a/test/replication/election_replica1.lua b/test/replication/election_replica1.lua
new file mode 120000
index 000000000..61ba93fc8
--- /dev/null
+++ b/test/replication/election_replica1.lua
@@ -0,0 +1 @@
+election_replica.lua
\ No newline at end of file
diff --git a/test/replication/election_replica2.lua b/test/replication/election_replica2.lua
new file mode 120000
index 000000000..61ba93fc8
--- /dev/null
+++ b/test/replication/election_replica2.lua
@@ -0,0 +1 @@
+election_replica.lua
\ No newline at end of file
diff --git a/test/replication/election_replica3.lua b/test/replication/election_replica3.lua
new file mode 120000
index 000000000..61ba93fc8
--- /dev/null
+++ b/test/replication/election_replica3.lua
@@ -0,0 +1 @@
+election_replica.lua
\ No newline at end of file
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 02/10] box: introduce summary RO flag
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 01/10] applier: store instance_id in struct applier Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 10/10] raft: add tests Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
` (8 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
An instance is writable if box.cfg.read_only is false, and it is
not orphan. Update of the final read-only state of the instance
needs to fire read-only update triggers, and notify the engines.
These 2 flags were easy and cheap to check on each operation, and
the triggers were easy to use since both flags are stored and
updated inside box.cc.
That is going to change when Raft is introduced. Raft will add 2
more checks:
- A flag if Raft is enabled on the node. If it is not, then Raft
state won't affect whether the instance is writable;
- When Raft is enabled, it will allow writes on a leader only.
It means a check for being read-only would look like this:
is_ro || is_orphan || (raft_is_enabled() && !raft_is_leader())
This is significantly slower. Besides, Raft somehow needs to
access the read-only triggers and engine API - this looks wrong.
The patch introduces a new flag is_ro_summary. The flag
incorporates all the read-only conditions into one flag. When some
subsystem may change read-only state of the instance, it needs to
call box_update_ro_summary(), and the function takes care of
updating the summary flag, running the triggers, and notifying the
engines.
Raft will use this function when its state or config will change.
Needed for #1146
---
src/box/box.cc | 44 +++++++++++++++++++++++++++-----------------
src/box/box.h | 6 ++++++
2 files changed, 33 insertions(+), 17 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 0b1f6c237..8de451db5 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -129,6 +129,14 @@ static bool is_local_recovery = false;
*/
static bool is_orphan;
+/**
+ * Summary flag incorporating all the instance attributes,
+ * affecting ability to write. Currently these are:
+ * - is_ro;
+ * - is_orphan;
+ */
+static bool is_ro_summary = true;
+
/**
* The pool of fibers in the transaction processor thread
* working on incoming messages from net, wal and other
@@ -144,11 +152,24 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+void
+box_update_ro_summary(void)
+{
+ bool old_is_ro_summary = is_ro_summary;
+ is_ro_summary = is_ro || is_orphan;
+ /* In 99% nothing changes. Filter this out first. */
+ if (is_ro_summary == old_is_ro_summary)
+ return;
+
+ if (is_ro_summary)
+ engine_switch_to_ro();
+ fiber_cond_broadcast(&ro_cond);
+}
+
static int
box_check_writable(void)
{
- /* box is only writable if box.cfg.read_only == false and */
- if (is_ro || is_orphan) {
+ if (is_ro_summary) {
diag_set(ClientError, ER_READONLY);
diag_log();
return -1;
@@ -253,20 +274,14 @@ box_check_ro(void);
void
box_set_ro(void)
{
- bool ro = box_check_ro();
- if (ro == is_ro)
- return; /* nothing to do */
- if (ro)
- engine_switch_to_ro();
-
- is_ro = ro;
- fiber_cond_broadcast(&ro_cond);
+ is_ro = box_check_ro();
+ box_update_ro_summary();
}
bool
box_is_ro(void)
{
- return is_ro || is_orphan;
+ return is_ro_summary;
}
bool
@@ -293,13 +308,8 @@ box_wait_ro(bool ro, double timeout)
void
box_do_set_orphan(bool orphan)
{
- if (is_orphan == orphan)
- return; /* nothing to do */
- if (orphan)
- engine_switch_to_ro();
-
is_orphan = orphan;
- fiber_cond_broadcast(&ro_cond);
+ box_update_ro_summary();
}
void
diff --git a/src/box/box.h b/src/box/box.h
index f9bd8b98d..5988264a5 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -137,6 +137,12 @@ box_set_orphan(bool orphan);
void
box_do_set_orphan(bool orphan);
+/**
+ * Update the final RO flag based on the instance flags and state.
+ */
+void
+box_update_ro_summary(void);
+
/**
* Iterate over all spaces and save them to the
* snapshot file.
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 03/10] wal: don't touch box.cfg.wal_dir more than once
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (2 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 02/10] box: introduce summary RO flag Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 04/10] replication: track registered replica count Vladislav Shpilevoy
` (7 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
Relay.cc and box.cc obtained box.cfg.wal_dir value using
cfg_gets() call. To initialize WAL and create struct recovery
objects.
That is not only a bit dangerous (cfg_gets() uses Lua API and can
throw a Lua error) and slow, but also not necessary - wal_dir
parameter is constant, it can't be changed after instance start.
It means, the value can be stored somewhere one time and then used
without Lua.
Main motivation is that the WAL directory path will be needed
inside relay threads to restart their recovery iterators in the
Raft patch. They can't use cfg_gets(), because Lua lives in TX
thread. But can access a constant global variable, introduced in
this patch (it existed before, but now has a method to get it).
Needed for #1146
---
src/box/box.cc | 9 ++++-----
src/box/relay.cc | 7 ++-----
src/box/wal.c | 6 ++++++
src/box/wal.h | 7 +++++++
4 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 8de451db5..fbf6a7434 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2401,8 +2401,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
wal_stream_create(&wal_stream);
struct recovery *recovery;
- recovery = recovery_new(cfg_gets("wal_dir"),
- cfg_geti("force_recovery"),
+ recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
checkpoint_vclock);
/*
@@ -2475,7 +2474,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
cfg_getd("wal_dir_rescan_delay"));
while (true) {
- if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock))
+ if (path_lock(wal_dir(), &wal_dir_lock))
diag_raise();
if (wal_dir_lock >= 0)
break;
@@ -2622,7 +2621,7 @@ box_cfg_xc(void)
* Lock the write ahead log directory to avoid multiple
* instances running in the same dir.
*/
- if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock) < 0)
+ if (path_lock(wal_dir(), &wal_dir_lock) < 0)
diag_raise();
if (wal_dir_lock < 0) {
/**
@@ -2631,7 +2630,7 @@ box_cfg_xc(void)
* WAL dir must contain at least one xlog.
*/
if (!cfg_geti("hot_standby") || checkpoint == NULL)
- tnt_raise(ClientError, ER_ALREADY_RUNNING, cfg_gets("wal_dir"));
+ tnt_raise(ClientError, ER_ALREADY_RUNNING, wal_dir());
}
struct journal bootstrap_journal;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..124b0f52f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -34,7 +34,6 @@
#include "tt_static.h"
#include "scoped_guard.h"
#include "cbus.h"
-#include "cfg.h"
#include "errinj.h"
#include "fiber.h"
#include "say.h"
@@ -369,8 +368,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
relay_delete(relay);
});
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- start_vclock);
+ relay->r = recovery_new(wal_dir(), false, start_vclock);
vclock_copy(&relay->stop_vclock, stop_vclock);
int rc = cord_costart(&relay->cord, "final_join",
@@ -731,8 +729,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- replica_clock);
+ relay->r = recovery_new(wal_dir(), false, replica_clock);
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
diff --git a/src/box/wal.c b/src/box/wal.c
index ea707aa5e..84abaa7b2 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -201,6 +201,12 @@ wal_mode(void)
return wal_writer_singleton.wal_mode;
}
+const char *
+wal_dir(void)
+{
+ return wal_writer_singleton.wal_dir.dirname;
+}
+
static void
wal_write_to_disk(struct cmsg *msg);
diff --git a/src/box/wal.h b/src/box/wal.h
index 9d0cada46..581306fe9 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -98,6 +98,13 @@ wal_enable(void);
void
wal_free(void);
+/**
+ * Get WAL directory path. The value never changes after box is
+ * configured first time. Safe to use from multiple threads.
+ */
+const char *
+wal_dir(void);
+
struct wal_watcher_msg {
struct cmsg cmsg;
struct wal_watcher *watcher;
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 04/10] replication: track registered replica count
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (3 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 03/10] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
` (6 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
Struct replicaset didn't store a number of registered replicas.
Only an array, which was necessary to fullscan each time when want
to find the count.
That is going to be needed in Raft to calculate election quorum.
The patch makes the count tracked so as it could be found for
constant time by simply reading an integer.
Needed for #1146
---
src/box/replication.cc | 4 +++-
src/box/replication.h | 7 +++++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1fcdb660..c19f8c693 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,7 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
tt_uuid_str(&replica->uuid));
}
replicaset.replica_by_id[replica_id] = replica;
-
+ ++replicaset.registered_count;
say_info("assigned id %d to replica %s",
replica->id, tt_uuid_str(&replica->uuid));
replica->anon = false;
@@ -267,6 +267,8 @@ replica_clear_id(struct replica *replica)
* replication.
*/
replicaset.replica_by_id[replica->id] = NULL;
+ assert(replicaset.registered_count > 0);
+ --replicaset.registered_count;
if (replica->id == instance_id) {
/* See replica_check_id(). */
assert(replicaset.is_joining);
diff --git a/src/box/replication.h b/src/box/replication.h
index ddc2bddf4..3e46c592a 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -217,6 +217,13 @@ struct replicaset {
bool is_joining;
/* A number of anonymous replicas following this instance. */
int anon_count;
+ /**
+ * Number of registered replicas. That includes all of them - connected,
+ * disconnected, connected not directly, just present in _cluster. If an
+ * instance has an ID, has the same replicaset UUID, then it is
+ * accounted here.
+ */
+ int registered_count;
/** Applier state. */
struct {
/**
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 05/10] raft: introduce persistent raft state
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (4 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 04/10] replication: track registered replica count Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 06/10] raft: introduce box.cfg.election_* options Vladislav Shpilevoy
` (5 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
The patch introduces a sceleton of Raft module and a method to
persist a Raft state in snapshot, not bound to any space.
Part of #1146
---
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 8 +++++
src/box/iproto_constants.h | 13 ++++++++
src/box/lua/misc.cc | 1 +
src/box/memtx_engine.c | 35 ++++++++++++++++++++
src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++++
src/box/raft.h | 67 ++++++++++++++++++++++++++++++++++++++
src/box/xrow.c | 56 +++++++++++++++++++++++++++++++
src/box/xrow.h | 12 +++++++
9 files changed, 258 insertions(+)
create mode 100644 src/box/raft.c
create mode 100644 src/box/raft.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 2ed72703a..8b2e704cf 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -171,6 +171,7 @@ add_library(box STATIC
port.c
txn.c
txn_limbo.c
+ raft.c
box.cc
gc.c
checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index fbf6a7434..48fed9b2c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
+#include "raft.h"
#include "trivia/util.h"
static char status[64] = "unknown";
@@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
diag_raise();
return;
}
+ if (iproto_type_is_raft_request(row->type)) {
+ struct raft_request raft_req;
+ if (xrow_decode_raft(row, &raft_req) != 0)
+ diag_raise();
+ raft_process_recovery(&raft_req);
+ return;
+ }
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 9c2fb6058..f9347f555 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -220,6 +220,8 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
+ IPROTO_RAFT = 30,
+
/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
/** A rollback message for synchronous transactions. */
@@ -259,6 +261,11 @@ enum iproto_type {
/** IPROTO type name by code */
extern const char *iproto_type_strs[];
+enum iproto_raft_keys {
+ IPROTO_RAFT_TERM = 0,
+ IPROTO_RAFT_VOTE = 1,
+};
+
/**
* Returns IPROTO type name by @a type code.
* @param type IPROTO type.
@@ -333,6 +340,12 @@ iproto_type_is_synchro_request(uint32_t type)
return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
}
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+ return type == IPROTO_RAFT;
+}
+
/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..e356f2d4b 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,7 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
+#include "box/xrow.h"
#include "mpstream/mpstream.h"
static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 302decc7f..2f38c2647 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -49,6 +49,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "raft.h"
/* sync snapshot every 16MB */
#define SNAP_SYNC_INTERVAL (1 << 24)
@@ -201,12 +202,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
return 0;
}
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+ assert(row->type == IPROTO_RAFT);
+ struct raft_request req;
+ if (xrow_decode_raft(row, &req) != 0)
+ return -1;
+ raft_process_recovery(&req);
+ return 0;
+}
+
static int
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct xrow_header *row)
{
assert(row->bodycnt == 1); /* always 1 for read */
if (row->type != IPROTO_INSERT) {
+ if (row->type == IPROTO_RAFT)
+ return memtx_engine_recover_raft(row);
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row->type);
return -1;
@@ -514,6 +528,7 @@ struct checkpoint {
/** The vclock of the snapshot file. */
struct vclock vclock;
struct xdir dir;
+ struct raft_request raft;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
@@ -538,6 +553,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
+ raft_serialize_for_disk(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
@@ -609,6 +625,23 @@ checkpoint_add_space(struct space *sp, void *data)
return 0;
};
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+ struct xrow_header row;
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ int rc = -1;
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto finish;
+ if (checkpoint_write_row(l, &row) != 0)
+ goto finish;
+ rc = 0;
+finish:
+ region_truncate(region, svp);
+ return rc;
+}
+
static int
checkpoint_f(va_list ap)
{
@@ -644,6 +677,8 @@ checkpoint_f(va_list ap)
if (rc != 0)
goto fail;
}
+ if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+ goto fail;
if (xlog_flush(&snap) < 0)
goto fail;
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..511fe42f5
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+ .term = 1,
+ .vote = 0,
+};
+
+void
+raft_process_recovery(const struct raft_request *req)
+{
+ if (req->term != 0)
+ raft.term = req->term;
+ if (req->vote != 0)
+ raft.vote = req->vote;
+}
+
+void
+raft_serialize_for_network(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
+
+void
+raft_serialize_for_disk(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..31f7becdb
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,67 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft_request;
+
+struct raft {
+ uint64_t term;
+ uint32_t vote;
+};
+
+extern struct raft raft;
+
+/** Process a raft entry stored in WAL/snapshot. */
+void
+raft_process_recovery(const struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be sent to other instances of the
+ * cluster. It is allowed to save anything here, not only persistent state.
+ */
+void
+raft_serialize_for_network(struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be persisted on disk. Only term
+ * and vote are being persisted.
+ */
+void
+raft_serialize_for_disk(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 2edcb2f8f..b9bbb19a0 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,6 +958,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
return 0;
}
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r)
+{
+ size_t size = mp_sizeof_map(2) +
+ mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term) +
+ mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ char *buf = region_alloc(region, size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "buf");
+ return -1;
+ }
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_RAFT;
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = size;
+ row->group_id = GROUP_LOCAL;
+ row->bodycnt = 1;
+ buf = mp_encode_map(buf, 2);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+ buf = mp_encode_uint(buf, r->term);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+ /* TODO: handle bad format. */
+ assert(row->type == IPROTO_RAFT);
+ assert(row->bodycnt == 1);
+ assert(row->group_id == GROUP_LOCAL);
+ memset(r, 0, sizeof(*r));
+ const char *pos = row->body[0].iov_base;
+ uint32_t map_size = mp_decode_map(&pos);
+ for (uint32_t i = 0; i < map_size; ++i)
+ {
+ uint64_t key = mp_decode_uint(&pos);
+ switch (key) {
+ case IPROTO_RAFT_TERM:
+ r->term = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VOTE:
+ r->vote = mp_decode_uint(&pos);
+ break;
+ default:
+ mp_next(&pos);
+ break;
+ }
+ }
+ return 0;
+}
+
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0dc9eb71a..1740df614 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
/**
* CALL/EVAL request.
*/
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 06/10] raft: introduce box.cfg.election_* options
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (5 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 05/10] raft: introduce persistent raft state Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 07/10] raft: relay status updates to followers Vladislav Shpilevoy
` (4 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
The new options are:
- election_is_enabled - enable/disable leader election (via
Raft). When disabled, the node is supposed to work like if Raft
does not exist. Like earlier;
- election_is_candidate - a flag whether the instance can try to
become a leader. Note, it can vote for other nodes regardless
of value of this option;
- election_timeout - how long need to wait until election end, in
seconds.
The options don't do anything now. They are added separately in
order to keep such mundane changes from the main Raft commit, to
simplify its review.
Option names don't mention 'Raft' on purpose, because
- Not all users know what is Raft, so they may not even know it
is related to leader election;
- In future the algorithm may change from Raft to something else,
so better not to depend on it too much in the public API.
Part of #1146
---
src/box/box.cc | 92 +++++++++++++++++++++++++++++++++
src/box/box.h | 3 ++
src/box/lua/cfg.cc | 27 ++++++++++
src/box/lua/load_cfg.lua | 15 ++++++
src/box/raft.c | 30 +++++++++++
src/box/raft.h | 35 +++++++++++++
test/app-tap/init_script.result | 3 ++
test/box/admin.result | 6 +++
test/box/cfg.result | 12 +++++
9 files changed, 223 insertions(+)
diff --git a/src/box/box.cc b/src/box/box.cc
index 48fed9b2c..99a15bfd0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -472,6 +472,40 @@ box_check_uri(const char *source, const char *option_name)
}
}
+static int
+box_check_election_is_enabled(void)
+{
+ int b = cfg_getb("election_is_enabled");
+ if (b < 0) {
+ diag_set(ClientError, ER_CFG, "election_is_enabled",
+ "the value must be a boolean");
+ }
+ return b;
+}
+
+static int
+box_check_election_is_candidate(void)
+{
+ int b = cfg_getb("election_is_candidate");
+ if (b < 0) {
+ diag_set(ClientError, ER_CFG, "election_is_candidate",
+ "the value must be a boolean");
+ }
+ return b;
+}
+
+static double
+box_check_election_timeout(void)
+{
+ double d = cfg_getd("election_timeout");
+ if (d <= 0) {
+ diag_set(ClientError, ER_CFG, "election_timeout",
+ "the value must be a positive number");
+ return -1;
+ }
+ return d;
+}
+
static void
box_check_replication(void)
{
@@ -729,6 +763,12 @@ box_check_config(void)
box_check_uri(cfg_gets("listen"), "listen");
box_check_instance_uuid(&uuid);
box_check_replicaset_uuid(&uuid);
+ if (box_check_election_is_enabled() < 0)
+ diag_raise();
+ if (box_check_election_is_candidate() < 0)
+ diag_raise();
+ if (box_check_election_timeout() < 0)
+ diag_raise();
box_check_replication();
box_check_replication_timeout();
box_check_replication_connect_timeout();
@@ -751,6 +791,36 @@ box_check_config(void)
diag_raise();
}
+int
+box_set_election_is_enabled(void)
+{
+ int b = box_check_election_is_enabled();
+ if (b < 0)
+ return -1;
+ raft_cfg_is_enabled(b);
+ return 0;
+}
+
+int
+box_set_election_is_candidate(void)
+{
+ int b = box_check_election_is_candidate();
+ if (b < 0)
+ return -1;
+ raft_cfg_is_candidate(b);
+ return 0;
+}
+
+int
+box_set_election_timeout(void)
+{
+ double d = box_check_election_timeout();
+ if (d < 0)
+ return -1;
+ raft_cfg_election_timeout(d);
+ return 0;
+}
+
/*
* Parse box.cfg.replication and create appliers.
*/
@@ -835,6 +905,7 @@ void
box_set_replication_timeout(void)
{
replication_timeout = box_check_replication_timeout();
+ raft_cfg_death_timeout();
}
void
@@ -865,6 +936,7 @@ box_set_replication_synchro_quorum(void)
return -1;
replication_synchro_quorum = value;
txn_limbo_on_parameters_change(&txn_limbo);
+ raft_cfg_election_quorum();
return 0;
}
@@ -2686,6 +2758,26 @@ box_cfg_xc(void)
fiber_gc();
is_box_configured = true;
+ /*
+ * Fill in leader election parameters after bootstrap. Before it is not
+ * possible - there may be relevant data to recover from WAL and
+ * snapshot. Also until recovery is done, it is not possible to write
+ * new records into WAL. It is also totally safe, because relaying is
+ * not started until the box is configured. So it can't happen, that
+ * this election-enabled node will try to relay to another
+ * election-enabled node without election actually enabled leading to
+ * disconnect.
+ */
+ if (box_set_election_is_candidate() != 0)
+ diag_raise();
+ if (box_set_election_timeout() != 0)
+ diag_raise();
+ /*
+ * Election is enabled last. So as all the parameters are installed by
+ * that time.
+ */
+ if (box_set_election_is_enabled() != 0)
+ diag_raise();
title("running");
say_info("ready to accept requests");
diff --git a/src/box/box.h b/src/box/box.h
index 5988264a5..45ff8bbbf 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -245,6 +245,9 @@ void box_set_vinyl_memory(void);
void box_set_vinyl_max_tuple_size(void);
void box_set_vinyl_cache(void);
void box_set_vinyl_timeout(void);
+int box_set_election_is_enabled(void);
+int box_set_election_is_candidate(void);
+int box_set_election_timeout(void);
void box_set_replication_timeout(void);
void box_set_replication_connect_timeout(void);
void box_set_replication_connect_quorum(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index d481155cd..bbb92f038 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -269,6 +269,30 @@ lbox_cfg_set_worker_pool_threads(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_election_is_enabled(struct lua_State *L)
+{
+ if (box_set_election_is_enabled() != 0)
+ luaT_error(L);
+ return 0;
+}
+
+static int
+lbox_cfg_set_election_is_candidate(struct lua_State *L)
+{
+ if (box_set_election_is_candidate() != 0)
+ luaT_error(L);
+ return 0;
+}
+
+static int
+lbox_cfg_set_election_timeout(struct lua_State *L)
+{
+ if (box_set_election_timeout() != 0)
+ luaT_error(L);
+ return 0;
+}
+
static int
lbox_cfg_set_replication_timeout(struct lua_State *L)
{
@@ -382,6 +406,9 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_vinyl_max_tuple_size", lbox_cfg_set_vinyl_max_tuple_size},
{"cfg_set_vinyl_cache", lbox_cfg_set_vinyl_cache},
{"cfg_set_vinyl_timeout", lbox_cfg_set_vinyl_timeout},
+ {"cfg_set_election_is_enabled", lbox_cfg_set_election_is_enabled},
+ {"cfg_set_election_is_candidate", lbox_cfg_set_election_is_candidate},
+ {"cfg_set_election_timeout", lbox_cfg_set_election_timeout},
{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
{"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum},
{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 92347a9fd..d558e7ac9 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -87,6 +87,9 @@ local default_cfg = {
checkpoint_wal_threshold = 1e18,
checkpoint_count = 2,
worker_pool_threads = 4,
+ election_is_enabled = false,
+ election_is_candidate = true,
+ election_timeout = 5,
replication_timeout = 1,
replication_sync_lag = 10,
replication_sync_timeout = 300,
@@ -165,6 +168,9 @@ local template_cfg = {
hot_standby = 'boolean',
memtx_use_mvcc_engine = 'boolean',
worker_pool_threads = 'number',
+ election_is_enabled = 'boolean',
+ election_is_candidate = 'boolean',
+ election_timeout = 'number',
replication_timeout = 'number',
replication_sync_lag = 'number',
replication_sync_timeout = 'number',
@@ -281,6 +287,9 @@ local dynamic_cfg = {
require('title').update(box.cfg.custom_proc_title)
end,
force_recovery = function() end,
+ election_is_enabled = private.cfg_set_election_is_enabled,
+ election_is_candidate = private.cfg_set_election_is_candidate,
+ election_timeout = private.cfg_set_election_timeout,
replication_timeout = private.cfg_set_replication_timeout,
replication_connect_timeout = private.cfg_set_replication_connect_timeout,
replication_connect_quorum = private.cfg_set_replication_connect_quorum,
@@ -335,6 +344,9 @@ local dynamic_cfg_order = {
-- the new one. This should be fixed when box.cfg is able to
-- apply some parameters together and atomically.
replication_anon = 250,
+ election_is_enabled = 300,
+ election_is_candidate = 310,
+ election_timeout = 320,
}
local function sort_cfg_cb(l, r)
@@ -352,6 +364,9 @@ local dynamic_cfg_skip_at_load = {
vinyl_cache = true,
vinyl_timeout = true,
too_long_threshold = true,
+ election_is_enabled = true,
+ election_is_candidate = true,
+ election_timeout = true,
replication = true,
replication_timeout = true,
replication_connect_timeout = true,
diff --git a/src/box/raft.c b/src/box/raft.c
index 511fe42f5..ee54d02b7 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -37,6 +37,8 @@
/** Raft state of this instance. */
struct raft raft = {
+ .is_enabled = false,
+ .is_candidate = false,
.term = 1,
.vote = 0,
};
@@ -63,3 +65,31 @@ raft_serialize_for_disk(struct raft_request *req)
req->term = raft.term;
req->vote = raft.vote;
}
+
+void
+raft_cfg_is_enabled(bool is_enabled)
+{
+ raft.is_enabled = is_enabled;
+}
+
+void
+raft_cfg_is_candidate(bool is_candidate)
+{
+ raft.is_candidate = is_candidate;
+}
+
+void
+raft_cfg_election_timeout(double timeout)
+{
+ raft.election_timeout = timeout;
+}
+
+void
+raft_cfg_election_quorum(void)
+{
+}
+
+void
+raft_cfg_death_timeout(void)
+{
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 31f7becdb..f27222752 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,6 +30,7 @@
* SUCH DAMAGE.
*/
#include <stdint.h>
+#include <stdbool.h>
#if defined(__cplusplus)
extern "C" {
@@ -38,8 +39,11 @@ extern "C" {
struct raft_request;
struct raft {
+ bool is_enabled;
+ bool is_candidate;
uint64_t term;
uint32_t vote;
+ double election_timeout;
};
extern struct raft raft;
@@ -48,6 +52,37 @@ extern struct raft raft;
void
raft_process_recovery(const struct raft_request *req);
+/** Configure whether Raft is enabled. */
+void
+raft_cfg_is_enabled(bool is_enabled);
+
+/**
+ * Configure whether the instance can be elected as Raft leader. Even if false,
+ * the node still can vote, when Raft is enabled.
+ */
+void
+raft_cfg_is_candidate(bool is_candidate);
+
+/** Configure Raft leader election timeout. */
+void
+raft_cfg_election_timeout(double timeout);
+
+/**
+ * Configure Raft leader election quorum. There is no a separate option.
+ * Instead, synchronous replication quorum is used. Since Raft is tightly bound
+ * with synchronous replication.
+ */
+void
+raft_cfg_election_quorum(void);
+
+/**
+ * Configure Raft leader death timeout. I.e. number of seconds without
+ * heartbeats from the leader to consider it dead. There is no a separate
+ * option. Raft uses replication timeout for that.
+ */
+void
+raft_cfg_death_timeout(void);
+
/**
* Save complete Raft state into a request to be sent to other instances of the
* cluster. It is allowed to save anything here, not only persistent state.
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index c8974d708..d8969278b 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -8,6 +8,9 @@ checkpoint_count:2
checkpoint_interval:3600
checkpoint_wal_threshold:1e+18
coredump:false
+election_is_candidate:true
+election_is_enabled:false
+election_timeout:5
feedback_enabled:true
feedback_host:https://feedback.tarantool.io
feedback_interval:3600
diff --git a/test/box/admin.result b/test/box/admin.result
index d1540a71e..52b62356f 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -37,6 +37,12 @@ cfg_filter(box.cfg)
- 1000000000000000000
- - coredump
- false
+ - - election_is_candidate
+ - true
+ - - election_is_enabled
+ - false
+ - - election_timeout
+ - 5
- - feedback_enabled
- true
- - feedback_host
diff --git a/test/box/cfg.result b/test/box/cfg.result
index fcfc64b22..f19f4bff7 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -25,6 +25,12 @@ cfg_filter(box.cfg)
| - 1000000000000000000
| - - coredump
| - false
+ | - - election_is_candidate
+ | - true
+ | - - election_is_enabled
+ | - false
+ | - - election_timeout
+ | - 5
| - - feedback_enabled
| - true
| - - feedback_host
@@ -134,6 +140,12 @@ cfg_filter(box.cfg)
| - 1000000000000000000
| - - coredump
| - false
+ | - - election_is_candidate
+ | - true
+ | - - election_is_enabled
+ | - false
+ | - - election_timeout
+ | - 5
| - - feedback_enabled
| - true
| - - feedback_host
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 07/10] raft: relay status updates to followers
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (6 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 06/10] raft: introduce box.cfg.election_* options Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 08/10] raft: introduce state machine Vladislav Shpilevoy
` (3 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
From: sergepetrenko <sergepetrenko@tarantool.org>
The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.
Part of #1146
Part of #5204
---
src/box/applier.cc | 28 ++++++-
src/box/box.cc | 17 ++++-
src/box/iproto_constants.h | 2 +
src/box/memtx_engine.c | 3 +-
src/box/raft.c | 73 +++++++++++++++++-
src/box/raft.h | 35 ++++++++-
src/box/relay.cc | 150 ++++++++++++++++++++++++++++++++++++-
src/box/relay.h | 7 ++
src/box/xrow.c | 87 +++++++++++++++++----
src/box/xrow.h | 5 +-
10 files changed, 382 insertions(+), 25 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1a0b55640..9fed3c071 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
#include "scoped_guard.h"
#include "txn_limbo.h"
#include "journal.h"
+#include "raft.h"
STRS(applier_state, applier_STATE);
@@ -878,6 +879,18 @@ err:
return -1;
}
+static int
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
+{
+ assert(iproto_type_is_raft_request(row->type));
+
+ struct raft_request req;
+ struct vclock candidate_clock;
+ if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
+ return -1;
+ return raft_process_msg(&req, applier->instance_id);
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -1222,11 +1235,20 @@ applier_subscribe(struct applier *applier)
* In case of an heartbeat message wake a writer up
* and check applier state.
*/
- if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0)
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (applier_handle_raft(applier,
+ first_row) != 0)
+ diag_raise();
+ }
applier_signal_ack(applier);
- else if (applier_apply_tx(&rows) != 0)
+ } else if (applier_apply_tx(&rows) != 0) {
diag_raise();
+ }
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index 99a15bfd0..a8542cb38 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
}
if (iproto_type_is_raft_request(row->type)) {
struct raft_request raft_req;
- if (xrow_decode_raft(row, &raft_req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &raft_req, NULL) != 0)
diag_raise();
raft_process_recovery(&raft_req);
return;
@@ -2146,7 +2147,19 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
say_info("remote vclock %s local vclock %s",
vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+ if (raft_is_enabled()) {
+ /*
+ * Send out the current raft state of the instance. Don't do
+ * that if Raft is disabled. It can be that a part of the
+ * cluster still contains old versions, which can't handle Raft
+ * messages. So when it is disabled, its network footprint
+ * should be 0.
+ */
+ struct raft_request req;
+ raft_serialize_for_network(&req, &vclock);
+ xrow_encode_raft(&row, &fiber()->gc, &req);
+ coio_write_xrow(io, &row);
+ }
/*
* Replica clock is used in gc state and recovery
* initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index f9347f555..d3738c705 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -264,6 +264,8 @@ extern const char *iproto_type_strs[];
enum iproto_raft_keys {
IPROTO_RAFT_TERM = 0,
IPROTO_RAFT_VOTE = 1,
+ IPROTO_RAFT_STATE = 2,
+ IPROTO_RAFT_VCLOCK = 3,
};
/**
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 2f38c2647..8147557f6 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -207,7 +207,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT);
struct raft_request req;
- if (xrow_decode_raft(row, &req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &req, NULL) != 0)
return -1;
raft_process_recovery(&req);
return 0;
diff --git a/src/box/raft.c b/src/box/raft.c
index ee54d02b7..024433369 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,9 +34,20 @@
#include "journal.h"
#include "xrow.h"
#include "small/region.h"
+#include "replication.h"
+#include "relay.h"
+
+const char *raft_state_strs[] = {
+ NULL,
+ "follower",
+ "candidate",
+ "leader",
+};
/** Raft state of this instance. */
struct raft raft = {
+ .leader = 0,
+ .state = RAFT_STATE_FOLLOWER,
.is_enabled = false,
.is_candidate = false,
.term = 1,
@@ -50,18 +61,71 @@ raft_process_recovery(const struct raft_request *req)
raft.term = req->term;
if (req->vote != 0)
raft.vote = req->vote;
+ /*
+ * Role is never persisted. If recovery is happening, the
+ * node was restarted, and the former role can be false
+ * anyway.
+ */
+ assert(req->state == 0);
+ /*
+ * Vclock is always persisted by some other subsystem - WAL, snapshot.
+ * It is used only to decide to whom to give the vote during election,
+ * as a part of the volatile state.
+ */
+ assert(req->vclock == NULL);
+ /* Raft is not enabled until recovery is finished. */
+ assert(!raft_is_enabled());
+}
+
+int
+raft_process_msg(const struct raft_request *req, uint32_t source)
+{
+ (void)source;
+ if (req->term > raft.term) {
+ // Update term.
+ // The logic will be similar, but the code
+ // below is for testing purposes.
+ raft.term = req->term;
+ }
+ if (req->vote > 0) {
+ // Check whether the vote's for us.
+ }
+ switch (req->state) {
+ case RAFT_STATE_FOLLOWER:
+ break;
+ case RAFT_STATE_CANDIDATE:
+ // Perform voting logic.
+ break;
+ case RAFT_STATE_LEADER:
+ // Switch to a new leader.
+ break;
+ default:
+ break;
+ }
+ return 0;
}
void
-raft_serialize_for_network(struct raft_request *req)
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
{
+ memset(req, 0, sizeof(*req));
req->term = raft.term;
req->vote = raft.vote;
+ req->state = raft.state;
+ /*
+ * Raft does not own vclock, so it always expects it passed externally.
+ * Vclock is sent out only by candidate instances.
+ */
+ if (req->state == RAFT_STATE_CANDIDATE) {
+ req->vclock = vclock;
+ vclock_copy(vclock, &replicaset.vclock);
+ }
}
void
raft_serialize_for_disk(struct raft_request *req)
{
+ memset(req, 0, sizeof(*req));
req->term = raft.term;
req->vote = raft.vote;
}
@@ -93,3 +157,10 @@ void
raft_cfg_death_timeout(void)
{
}
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+ replicaset_foreach(replica)
+ relay_push_raft(replica->relay, req);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index f27222752..8abde4f4c 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,8 +37,19 @@ extern "C" {
#endif
struct raft_request;
+struct vclock;
+
+enum raft_state {
+ RAFT_STATE_FOLLOWER = 1,
+ RAFT_STATE_CANDIDATE = 2,
+ RAFT_STATE_LEADER = 3,
+};
+
+extern const char *raft_state_strs[];
struct raft {
+ uint32_t leader;
+ enum raft_state state;
bool is_enabled;
bool is_candidate;
uint64_t term;
@@ -48,10 +59,25 @@ struct raft {
extern struct raft raft;
+/** Check if Raft is enabled. */
+static inline bool
+raft_is_enabled(void)
+{
+ return raft.is_enabled;
+}
+
/** Process a raft entry stored in WAL/snapshot. */
void
raft_process_recovery(const struct raft_request *req);
+/**
+ * Process a raft status message coming from the network.
+ * @param req Raft request.
+ * @param source Instance ID of the message sender.
+ */
+int
+raft_process_msg(const struct raft_request *req, uint32_t source);
+
/** Configure whether Raft is enabled. */
void
raft_cfg_is_enabled(bool is_enabled);
@@ -88,7 +114,7 @@ raft_cfg_death_timeout(void);
* cluster. It is allowed to save anything here, not only persistent state.
*/
void
-raft_serialize_for_network(struct raft_request *req);
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
/**
* Save complete Raft state into a request to be persisted on disk. Only term
@@ -97,6 +123,13 @@ raft_serialize_for_network(struct raft_request *req);
void
raft_serialize_for_disk(struct raft_request *req);
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 124b0f52f..76430caa6 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
#include "xstream.h"
#include "wal.h"
#include "txn_limbo.h"
+#include "raft.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -145,6 +146,12 @@ struct relay {
alignas(CACHELINE_SIZE)
/** Known relay vclock. */
struct vclock vclock;
+ /**
+ * True if the relay needs Raft updates. It can live fine
+ * without sending Raft updates, if it is a relay to an
+ * anonymous replica, for example.
+ */
+ bool is_raft_enabled;
} tx;
};
@@ -572,6 +579,74 @@ relay_send_heartbeat(struct relay *relay)
}
}
+/** A message to set Raft enabled flag in TX thread from a relay thread. */
+struct relay_is_raft_enabled_msg {
+ /** Base cbus message. */
+ struct cmsg base;
+ /**
+ * First hop - TX thread, second hop - a relay thread, to notify about
+ * the flag being set.
+ */
+ struct cmsg_hop route[2];
+ /** Relay pointer to set the flag in. */
+ struct relay *relay;
+ /** New flag value. */
+ bool value;
+ /** Flag to wait for the flag being set, in a relay thread. */
+ bool is_finished;
+};
+
+/** TX thread part of the Raft flag setting, first hop. */
+static void
+tx_set_is_raft_enabled(struct cmsg *base)
+{
+ struct relay_is_raft_enabled_msg *msg =
+ (struct relay_is_raft_enabled_msg *)base;
+ msg->relay->tx.is_raft_enabled = msg->value;
+}
+
+/** Relay thread part of the Raft flag setting, second hop. */
+static void
+relay_set_is_raft_enabled(struct cmsg *base)
+{
+ struct relay_is_raft_enabled_msg *msg =
+ (struct relay_is_raft_enabled_msg *)base;
+ msg->is_finished = true;
+}
+
+/**
+ * Set relay Raft enabled flag from a relay thread to be accessed by the TX
+ * thread.
+ */
+static void
+relay_send_is_raft_enabled(struct relay *relay,
+ struct relay_is_raft_enabled_msg *msg, bool value)
+{
+ msg->route[0].f = tx_set_is_raft_enabled;
+ msg->route[0].pipe = &relay->relay_pipe;
+ msg->route[1].f = relay_set_is_raft_enabled;
+ msg->route[1].pipe = NULL;
+ msg->relay = relay;
+ msg->value = value;
+ msg->is_finished = false;
+ cmsg_init(&msg->base, msg->route);
+ cpipe_push(&relay->tx_pipe, &msg->base);
+ /*
+ * cbus_call() can't be used, because it works only if the sender thread
+ * is a simple cbus_process() loop. But the relay thread is not -
+ * instead it calls cbus_process() manually when ready. And the thread
+ * loop consists of the main fiber wakeup. So cbus_call() would just
+ * hang, because cbus_process() wouldn't be called by the scheduler
+ * fiber.
+ */
+ while (!msg->is_finished) {
+ cbus_process(&relay->endpoint);
+ if (msg->is_finished)
+ break;
+ fiber_yield();
+ }
+}
+
/**
* A libev callback invoked when a relay client socket is ready
* for read. This currently only happens when the client closes
@@ -592,6 +667,10 @@ relay_subscribe_f(va_list ap)
cbus_pair("tx", relay->endpoint.name, &relay->tx_pipe,
&relay->relay_pipe, NULL, NULL, cbus_process);
+ struct relay_is_raft_enabled_msg raft_enabler;
+ if (!relay->replica->anon)
+ relay_send_is_raft_enabled(relay, &raft_enabler, true);
+
/*
* Setup garbage collection trigger.
* Not needed for anonymous replicas, since they
@@ -671,6 +750,9 @@ relay_subscribe_f(va_list ap)
cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
}
+ if (!relay->replica->anon)
+ relay_send_is_raft_enabled(relay, &raft_enabler, false);
+
/*
* Log the error that caused the relay to break the loop.
* Don't clear the error for status reporting.
@@ -770,13 +852,75 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+struct relay_raft_msg {
+ struct cmsg base;
+ struct cmsg_hop route;
+ struct raft_request req;
+ struct vclock vclock;
+ struct relay *relay;
+};
+
+static void
+relay_raft_msg_push(struct cmsg *base)
+{
+ struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+ struct xrow_header row;
+ xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+ try {
+ relay_send(msg->relay, &row);
+ } catch (Exception *e) {
+ relay_set_error(msg->relay, e);
+ fiber_cancel(fiber());
+ }
+ free(msg);
+}
+
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req)
+{
+ /*
+ * Raft updates don't stack. They are thrown away if can't be pushed
+ * now. This is fine, as long as relay's live much longer that the
+ * timeouts in Raft are set.
+ */
+ if (!relay->tx.is_raft_enabled)
+ return;
+ /*
+ * XXX: the message should be preallocated. It should
+ * work like Kharon in IProto. Relay should have 2 raft
+ * messages rotating. When one is sent, the other can be
+ * updated and a flag is set. When the first message is
+ * sent, the control returns to TX thread, sees the set
+ * flag, rotates the buffers, and sends it again. And so
+ * on. This is how it can work in future, with 0 heap
+ * allocations. Current solution with alloc-per-update is
+ * good enough as a start. Another option - wait until all
+ * is moved to WAL thread, where this will all happen
+ * in one thread and will be much simpler.
+ */
+ struct relay_raft_msg *msg =
+ (struct relay_raft_msg *)malloc(sizeof(*msg));
+ if (msg == NULL) {
+ panic("Couldn't allocate raft message");
+ return;
+ }
+ msg->req = *req;
+ if (req->vclock != NULL) {
+ msg->req.vclock = &msg->vclock;
+ vclock_copy(&msg->vclock, req->vclock);
+ }
+ msg->route.f = relay_raft_msg_push;
+ msg->route.pipe = NULL;
+ cmsg_init(&msg->base, &msg->route);
+ msg->relay = relay;
+ cpipe_push(&relay->relay_pipe, &msg->base);
+}
+
/** Send a single row to the client. */
static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type) ||
- iproto_type_is_synchro_request(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
@@ -793,6 +937,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
}
+ assert(iproto_type_is_dml(packet->type) ||
+ iproto_type_is_synchro_request(packet->type));
/* Check if the rows from the instance are filtered. */
if ((1 << packet->replica_id & relay->id_filter) != 0)
return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
double
relay_last_row_time(const struct relay *relay);
+/**
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
+ */
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index b9bbb19a0..da5c6ffae 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -962,11 +962,30 @@ int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_RAFT_TERM) +
- mp_sizeof_uint(r->term) +
- mp_sizeof_uint(IPROTO_RAFT_VOTE) +
- mp_sizeof_uint(r->vote);
+ /*
+ * Terms is encoded always. Sometimes the rest can be even ignored if
+ * the term is too old.
+ */
+ int map_size = 1;
+ size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term);
+ if (r->vote != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ }
+ if (r->state != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+ mp_sizeof_uint(r->state);
+ }
+ if (r->vclock != NULL) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
+ mp_sizeof_vclock_ignore0(r->vclock);
+ }
+ size += mp_sizeof_map(map_size);
+
char *buf = region_alloc(region, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -975,43 +994,83 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
memset(row, 0, sizeof(*row));
row->type = IPROTO_RAFT;
row->body[0].iov_base = buf;
- row->body[0].iov_len = size;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
- buf = mp_encode_map(buf, 2);
+ const char *begin = buf;
+
+ buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
- buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
- buf = mp_encode_uint(buf, r->vote);
+ if (r->vote != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ }
+ if (r->state != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+ buf = mp_encode_uint(buf, r->state);
+ }
+ if (r->vclock != NULL) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
+ buf = mp_encode_vclock_ignore0(buf, r->vclock);
+ }
+ row->body[0].iov_len = buf - begin;
return 0;
}
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock)
{
- /* TODO: handle bad format. */
assert(row->type == IPROTO_RAFT);
- assert(row->bodycnt == 1);
- assert(row->group_id == GROUP_LOCAL);
+ if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "malformed raft request");
+ return -1;
+ }
memset(r, 0, sizeof(*r));
- const char *pos = row->body[0].iov_base;
+
+ const char *begin = row->body[0].iov_base;
+ const char *end = begin + row->body[0].iov_len;
+ const char *pos = begin;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->vote = mp_decode_uint(&pos);
break;
+ case IPROTO_RAFT_STATE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
+ r->state = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VCLOCK:
+ r->vclock = vclock;
+ if (r->vclock == NULL)
+ mp_next(&pos);
+ else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+ goto bad_msgpack;
+ break;
default:
mp_next(&pos);
break;
}
}
return 0;
+
+bad_msgpack:
+ xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+ return -1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 1740df614..25985ad7f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
struct raft_request {
uint64_t term;
uint32_t vote;
+ uint32_t state;
+ struct vclock *vclock;
};
int
@@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r);
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock);
/**
* CALL/EVAL request.
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 08/10] raft: introduce state machine
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (7 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 07/10] raft: relay status updates to followers Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 09/10] raft: introduce box.info.election Vladislav Shpilevoy
` (2 subsequent siblings)
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
The commit is a core part of Raft implementation. It introduces
the Raft state machine implementation and its integration into the
instance's life cycle.
The implementation follows the protocol to the letter except a few
important details.
Firstly, the original Raft assumes, that all nodes share the same
log record numbers. In Tarantool they are called LSNs. But in case
of Tarantool each node has its own LSN in its own component of
vclock. That makes the election messages a bit heavier, because
the nodes need to send and compare complete vclocks of each other
instead of a single number like in the original Raft. But logic
becomes simpler. Because in the original Raft there is a problem
of uncertainty about what to do with records of an old leader
right after a new leader is elected. They could be rolled back or
confirmed depending on circumstances. The issue disappears when
vclock is used.
Secondly, leader election works differently during cluster
bootstrap, until number of bootstrapped replicas becomes >=
election quorum. That arises from specifics of replicas bootstrap
and order of systems initialization. In short: during bootstrap a
leader election may use a smaller election quorum than the
configured one. See more details in the code.
Part of #1146
---
src/box/applier.cc | 23 +-
src/box/box.cc | 19 +-
src/box/raft.c | 897 +++++++++++++++++++++++++++++++++++++++++++--
src/box/raft.h | 135 ++++++-
src/box/relay.cc | 24 ++
5 files changed, 1062 insertions(+), 36 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 9fed3c071..7686d6cbc 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -883,6 +883,11 @@ static int
applier_handle_raft(struct applier *applier, struct xrow_header *row)
{
assert(iproto_type_is_raft_request(row->type));
+ if (applier->instance_id == 0) {
+ diag_set(ClientError, ER_PROTOCOL, "Can't apply a Raft request "
+ "from an instance without an ID");
+ return -1;
+ }
struct raft_request req;
struct vclock candidate_clock;
@@ -897,8 +902,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
* Return 0 for success or -1 in case of an error.
*/
static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
{
+ /*
+ * Rows received not directly from a leader are ignored. That is a
+ * protection against the case when an old leader keeps sending data
+ * around not knowing yet that it is not a leader anymore.
+ *
+ * XXX: it may be that this can be fine to apply leader transactions by
+ * looking at their replica_id field if it is equal to leader id. That
+ * can be investigated as an 'optimization'. Even though may not give
+ * anything, because won't change total number of rows sent in the
+ * network anyway.
+ */
+ if (!raft_is_source_allowed(applier->instance_id))
+ return 0;
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
struct xrow_header *last_row;
@@ -1238,6 +1256,7 @@ applier_subscribe(struct applier *applier)
struct xrow_header *first_row =
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
+ raft_process_heartbeat(applier->instance_id);
if (first_row->lsn == 0) {
if (unlikely(iproto_type_is_raft_request(
first_row->type))) {
@@ -1246,7 +1265,7 @@ applier_subscribe(struct applier *applier)
diag_raise();
}
applier_signal_ack(applier);
- } else if (applier_apply_tx(&rows) != 0) {
+ } else if (applier_apply_tx(applier, &rows) != 0) {
diag_raise();
}
diff --git a/src/box/box.cc b/src/box/box.cc
index a8542cb38..6ec813c12 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
box_update_ro_summary(void)
{
bool old_is_ro_summary = is_ro_summary;
- is_ro_summary = is_ro || is_orphan;
+ is_ro_summary = is_ro || is_orphan || raft_is_ro();
/* In 99% nothing changes. Filter this out first. */
if (is_ro_summary == old_is_ro_summary)
return;
@@ -171,6 +171,10 @@ static int
box_check_writable(void)
{
if (is_ro_summary) {
+ /*
+ * XXX: return a special error when the node is not a leader to
+ * reroute to the leader node.
+ */
diag_set(ClientError, ER_READONLY);
diag_log();
return -1;
@@ -2648,6 +2652,7 @@ box_init(void)
txn_limbo_init();
sequence_init();
+ raft_init();
}
bool
@@ -2795,8 +2800,18 @@ box_cfg_xc(void)
title("running");
say_info("ready to accept requests");
- if (!is_bootstrap_leader)
+ if (!is_bootstrap_leader) {
replicaset_sync();
+ } else {
+ /*
+ * When the cluster is just bootstrapped and this instance is a
+ * leader, it makes no sense to wait for a leader appearance.
+ * There is no one. Moreover this node *is* a leader, so it
+ * should take the control over the situation and start a new
+ * term immediately.
+ */
+ raft_new_term();
+ }
/* box.cfg.read_only is not read yet. */
assert(box_is_ro());
diff --git a/src/box/raft.c b/src/box/raft.c
index 024433369..e88e5adb6 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,13 @@
#include "small/region.h"
#include "replication.h"
#include "relay.h"
+#include "box.h"
+#include "tt_static.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
const char *raft_state_strs[] = {
NULL,
@@ -48,19 +55,264 @@ const char *raft_state_strs[] = {
struct raft raft = {
.leader = 0,
.state = RAFT_STATE_FOLLOWER,
+ .volatile_term = 1,
+ .volatile_vote = 0,
.is_enabled = false,
.is_candidate = false,
+ .is_cfg_candidate = false,
+ .is_write_in_progress = false,
+ .is_broadcast_scheduled = false,
.term = 1,
.vote = 0,
+ .vote_mask = 0,
+ .vote_count = 0,
+ .worker = NULL,
+ .election_timeout = 5,
};
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted for self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+ return raft.volatile_term == raft.term &&
+ raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where max value of the
+ * factor is a constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+ double timeout = raft.election_timeout;
+ /* Translate to ms. Integer is needed to be able to use mod below. */
+ uint32_t rand_part =
+ (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+ if (rand_part == 0)
+ rand_part = 1;
+ /*
+ * XXX: this is not giving a good distribution, but it is not so trivial
+ * to implement a correct random value generator. There is a task to
+ * unify all such places. Not critical here.
+ */
+ rand_part = rand() % (rand_part + 1);
+ return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+ int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+ return cmp == 0 || cmp == 1;
+}
+
+/**
+ * Election quorum is not strictly equal to synchronous replication quorum.
+ * Sometimes it can be lowered. That is about bootstrap.
+ *
+ * The problem with bootstrap is that when the replicaset boots, all the
+ * instances can't write to WAL and can't recover from their initial snapshot.
+ * They need one node which will boot first, and then they will replicate from
+ * it.
+ *
+ * This one node should boot from its zero snapshot, create replicaset UUID,
+ * register self with ID 1 in _cluster space, and then register all the other
+ * instances here. To do that the node must be writable. It should have
+ * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
+ * is enabled.
+ *
+ * To be elected a Raft leader it needs to perform election. But it can't be
+ * done before at least synchronous quorum of the replicas is bootstrapped. And
+ * they can't be bootstrapped because wait for a leader to initialize _cluster.
+ * Cyclic dependency.
+ *
+ * This is resolved by truncation of the election quorum to the number of
+ * registered replicas, if their count is less than synchronous quorum. That
+ * helps to elect a first leader.
+ *
+ * It may seem that the first node could just declare itself a leader and then
+ * strictly follow the protocol from now on, but that won't work, because if the
+ * first node will restart after it is booted, but before quorum of replicas is
+ * booted, the cluster will stuck again.
+ *
+ * The current solution is totally safe because
+ *
+ * - after all the cluster will have node count >= quorum, if user used a
+ * correct config (God help him if he didn't);
+ *
+ * - synchronous replication quorum is untouched - it is not truncated. Only
+ * leader election quorum is affected. So synchronous data won't be lost.
+ */
+static inline int
+raft_election_quorum(void)
+{
+ return MIN(replication_synchro_quorum, replicaset.registered_count);
+}
+
+/** Schedule broadcast of the complete Raft state to all the followers. */
+static void
+raft_schedule_broadcast(void);
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine. Now until Raft is re-enabled,
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * Wait for the leader death timeout until a leader lets the node know he is
+ * alive. Otherwise the node will start a new term. Can be useful when it is not
+ * known whether the leader is alive, but it is undesirable to start a new term
+ * immediately. Because in case the leader is alive, a new term would stun him
+ * and therefore would stun DB write requests. Usually happens when a follower
+ * restarts and may need some time to hear something from the leader.
+ */
+static void
+raft_sm_wait_leader_found(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events);
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void);
+
+static void
+raft_sm_become_leader(void);
+
+static void
+raft_sm_follow_leader(uint32_t leader);
+
+static void
+raft_sm_become_candidate(void);
+
+static const char *
+raft_request_to_string(const struct raft_request *req)
+{
+ assert(req->term != 0);
+ int size = 1024;
+ char buf[1024];
+ char *pos = buf;
+ int rc = snprintf(pos, size, "{term: %llu", req->term);
+ assert(rc >= 0);
+ pos += rc;
+ size -= rc;
+ if (req->vote != 0) {
+ rc = snprintf(pos, size, ", vote: %u", req->vote);
+ assert(rc >= 0);
+ pos += rc;
+ size -= rc;
+ }
+ if (req->state != 0) {
+ rc = snprintf(pos, size, ", state: %s",
+ raft_state_strs[req->state]);
+ assert(rc >= 0);
+ pos += rc;
+ size -= rc;
+ }
+ if (req->vclock != NULL) {
+ rc = snprintf(pos, size, ", vclock: %s",
+ vclock_to_string(req->vclock));
+ assert(rc >= 0);
+ pos += rc;
+ size -= rc;
+ }
+ rc = snprintf(pos, size, "}");
+ assert(rc >= 0);
+ pos += rc;
+ return tt_cstr(buf, pos - buf);
+}
+
void
raft_process_recovery(const struct raft_request *req)
{
- if (req->term != 0)
+ say_verbose("RAFT: recover %s", raft_request_to_string(req));
+ if (req->term != 0) {
raft.term = req->term;
- if (req->vote != 0)
+ raft.volatile_term = req->term;
+ }
+ if (req->vote != 0) {
raft.vote = req->vote;
+ raft.volatile_vote = req->vote;
+ }
/*
* Role is never persisted. If recovery is happening, the
* node was restarted, and the former role can be false
@@ -80,35 +332,550 @@ raft_process_recovery(const struct raft_request *req)
int
raft_process_msg(const struct raft_request *req, uint32_t source)
{
- (void)source;
- if (req->term > raft.term) {
- // Update term.
- // The logic will be similar, but the code
- // below is for testing purposes.
- raft.term = req->term;
+ say_info("RAFT: message %s from %u", raft_request_to_string(req),
+ source);
+ assert(source > 0);
+ assert(source != instance_id);
+ if (req->term == 0 || req->state == 0) {
+ diag_set(ClientError, ER_PROTOCOL, "Raft term and state can't "
+ "be zero");
+ return -1;
+ }
+ if (req->state == RAFT_STATE_CANDIDATE &&
+ (req->vote != source || req->vclock == NULL)) {
+ diag_set(ClientError, ER_PROTOCOL, "Candidate should always "
+ "vote for self and provide its vclock");
+ return -1;
+ }
+ /* Outdated request. */
+ if (req->term < raft.volatile_term) {
+ say_info("RAFT: the message is ignored due to outdated term - "
+ "current term is %u", raft.volatile_term);
+ return 0;
+ }
+
+ /* Term bump. */
+ if (req->term > raft.volatile_term)
+ raft_sm_schedule_new_term(req->term);
+ /*
+ * Either a vote request during an on-going election. Or an old vote
+ * persisted long time ago and still broadcasted. Or a vote response.
+ */
+ if (req->vote != 0) {
+ switch (raft.state) {
+ case RAFT_STATE_FOLLOWER:
+ case RAFT_STATE_LEADER:
+ if (!raft.is_enabled) {
+ say_info("RAFT: vote request is skipped - RAFT "
+ "is disabled");
+ break;
+ }
+ if (raft.leader != 0) {
+ say_info("RAFT: vote request is skipped - the "
+ "leader is already known - %u",
+ raft.leader);
+ break;
+ }
+ if (req->vote == instance_id) {
+ /*
+ * This is entirely valid. This instance could
+ * request a vote, then become a follower or
+ * leader, and then get the response.
+ */
+ say_info("RAFT: vote request is skipped - "
+ "can't accept vote for self if not a "
+ "candidate");
+ break;
+ }
+ if (req->state != RAFT_STATE_CANDIDATE) {
+ say_info("RAFT: vote request is skipped - "
+ "this is a notification about a vote "
+ "for a third node, not a request");
+ break;
+ }
+ if (raft.volatile_vote != 0) {
+ say_info("RAFT: vote request is skipped - "
+ "already voted in this term");
+ break;
+ }
+ /* Vclock is not NULL, validated above. */
+ if (!raft_can_vote_for(req->vclock)) {
+ say_info("RAFT: vote request is skipped - the "
+ "vclock is not acceptable");
+ break;
+ }
+ /*
+ * Either the term is new, or didn't vote in the current
+ * term yet. Anyway can vote now.
+ */
+ raft_sm_schedule_new_vote(req->vote);
+ break;
+ case RAFT_STATE_CANDIDATE:
+ /* Check if this is a vote for a competing candidate. */
+ if (req->vote != instance_id) {
+ say_info("RAFT: vote request is skipped - "
+ "competing candidate");
+ break;
+ }
+ /*
+ * Vote for self was requested earlier in this round,
+ * and now was answered by some other instance.
+ */
+ assert(raft.volatile_vote == instance_id);
+ int quorum = raft_election_quorum();
+ bool was_set = bit_set(&raft.vote_mask, source);
+ raft.vote_count += !was_set;
+ if (raft.vote_count < quorum) {
+ say_info("RAFT: accepted vote for self, vote "
+ "count is %d/%d", raft.vote_count,
+ quorum);
+ break;
+ }
+ raft_sm_become_leader();
+ break;
+ default:
+ unreachable();
+ }
+ }
+ if (req->state != RAFT_STATE_LEADER) {
+ if (source == raft.leader) {
+ say_info("RAFT: the node %u has resigned from the "
+ "leader role", raft.leader);
+ raft_sm_schedule_new_election();
+ }
+ return 0;
+ }
+ /* The node is a leader, but it is already known. */
+ if (source == raft.leader)
+ return 0;
+ /*
+ * XXX: A message from a conflicting leader. Split brain, basically.
+ * Need to decide what to do. Current solution is to do nothing. In
+ * future either this node should try to become a leader, or should stop
+ * all writes and require manual intervention.
+ */
+ if (raft.leader != 0) {
+ say_warn("RAFT: conflicting leader detected in one term - "
+ "known is %u, received %u", raft.leader, source);
+ return 0;
+ }
+
+ /* New leader was elected. */
+ raft_sm_follow_leader(source);
+ return 0;
+}
+
+void
+raft_process_heartbeat(uint32_t source)
+{
+ /*
+ * Raft handles heartbeats from all instances, including anon instances
+ * which don't participate in Raft.
+ */
+ if (source == 0)
+ return;
+ /*
+ * When not a candidate - don't wait for anything. Therefore do not care
+ * about the leader being dead.
+ */
+ if (!raft.is_candidate)
+ return;
+ /* Don't care about heartbeats when this node is a leader itself. */
+ if (raft.state == RAFT_STATE_LEADER)
+ return;
+ /* Not interested in heartbeats from not a leader. */
+ if (raft.leader != source)
+ return;
+ /*
+ * The instance currently is busy with writing something on disk. Can't
+ * react to heartbeats.
+ */
+ if (raft.is_write_in_progress)
+ return;
+ /*
+ * XXX: it may be expensive to reset the timer like that. It may be less
+ * expensive to let the timer work, and remember last timestamp when
+ * anything was heard from the leader. Then in the timer callback check
+ * the timestamp, and restart the timer, if it is fine.
+ */
+ assert(ev_is_active(&raft.timer));
+ ev_timer_stop(loop(), &raft.timer);
+ raft_sm_wait_leader_dead();
+}
+
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+ fiber_wakeup(entry->complete_data);
+}
+
+/** Synchronously write a Raft request into WAL. */
+static void
+raft_write_request(const struct raft_request *req)
+{
+ assert(raft.is_write_in_progress);
+ /*
+ * Vclock is never persisted by Raft. It is used only to
+ * be sent to network when vote for self.
+ */
+ assert(req->vclock == NULL);
+ /*
+ * State is not persisted. That would be strictly against Raft protocol.
+ * The reason is that it does not make much sense - even if the node is
+ * a leader now, after the node is restarted, there will be another
+ * leader elected by that time likely.
+ */
+ assert(req->state == 0);
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ struct xrow_header row;
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
+
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto fail;
+ journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+ fiber());
+
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ goto fail;
}
- if (req->vote > 0) {
- // Check whether the vote's for us.
- }
- switch (req->state) {
- case RAFT_STATE_FOLLOWER:
- break;
- case RAFT_STATE_CANDIDATE:
- // Perform voting logic.
- break;
- case RAFT_STATE_LEADER:
- // Switch to a new leader.
- break;
- default:
- break;
+
+ region_truncate(region, svp);
+ return;
+fail:
+ /*
+ * XXX: the stub is supposed to be removed once it is defined what to do
+ * when a raft request WAL write fails.
+ */
+ panic("Could not write a raft request to WAL\n");
+}
+
+/* Dump Raft state to WAL in a blocking way. */
+static void
+raft_worker_handle_io(void)
+{
+ assert(raft.is_write_in_progress);
+ /* During write Raft can't be anything but a follower. */
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ struct raft_request req;
+
+ if (raft_is_fully_on_disk()) {
+end_dump:
+ raft.is_write_in_progress = false;
+ /*
+ * The state machine is stable. Can see now, to what state to
+ * go.
+ */
+ if (!raft.is_candidate) {
+ /*
+ * If not a candidate, can't do anything except vote for
+ * somebody (if Raft is enabled). Nothing to do except
+ * staying a follower without timeouts.
+ */
+ } else if (raft.leader != 0) {
+ /* There is a known leader. Wait until it is dead. */
+ raft_sm_wait_leader_dead();
+ } else if (raft.vote == instance_id) {
+ /* Just wrote own vote. */
+ if (raft_election_quorum() == 1)
+ raft_sm_become_leader();
+ else
+ raft_sm_become_candidate();
+ } else if (raft.vote != 0) {
+ /*
+ * Voted for some other node. Wait if it manages to
+ * become a leader.
+ */
+ raft_sm_wait_election_end();
+ } else {
+ /* No leaders, no votes. */
+ raft_sm_schedule_new_vote(instance_id);
+ }
+ } else {
+ memset(&req, 0, sizeof(req));
+ assert(raft.volatile_term >= raft.term);
+ req.term = raft.volatile_term;
+ req.vote = raft.volatile_vote;
+
+ raft_write_request(&req);
+ say_info("RAFT: persisted state %s",
+ raft_request_to_string(&req));
+
+ assert(req.term >= raft.term);
+ raft.term = req.term;
+ raft.vote = req.vote;
+ /*
+ * Persistent state is visible, and it was changed - broadcast.
+ */
+ raft_schedule_broadcast();
+ if (raft_is_fully_on_disk())
+ goto end_dump;
+ }
+}
+
+/* Broadcast Raft complete state to the followers. */
+static void
+raft_worker_handle_broadcast(void)
+{
+ assert(raft.is_broadcast_scheduled);
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.term = raft.term;
+ req.vote = raft.vote;
+ req.state = raft.state;
+ if (req.state == RAFT_STATE_CANDIDATE) {
+ assert(raft.vote == instance_id);
+ req.vclock = &replicaset.vclock;
+ }
+ replicaset_foreach(replica)
+ relay_push_raft(replica->relay, &req);
+ raft.is_broadcast_scheduled = false;
+}
+
+static int
+raft_worker_f(va_list args)
+{
+ (void)args;
+ bool is_idle;
+ while (!fiber_is_cancelled()) {
+ is_idle = true;
+ if (raft.is_write_in_progress) {
+ raft_worker_handle_io();
+ is_idle = false;
+ }
+ if (raft.is_broadcast_scheduled) {
+ raft_worker_handle_broadcast();
+ is_idle = false;
+ }
+ fiber_sleep(0);
+ if (!is_idle)
+ continue;
+ assert(raft_is_fully_on_disk());
+ fiber_yield();
}
return 0;
}
+static void
+raft_sm_pause_and_dump(void)
+{
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ if (raft.is_write_in_progress)
+ return;
+ ev_timer_stop(loop(), &raft.timer);
+ raft.is_write_in_progress = true;
+ if (raft.worker == NULL)
+ raft.worker = fiber_new("raft_worker", raft_worker_f);
+ fiber_wakeup(raft.worker);
+}
+
+static void
+raft_sm_become_leader(void)
+{
+ assert(raft.state != RAFT_STATE_LEADER);
+ say_info("RAFT: enter leader state with quorum %d",
+ raft_election_quorum());
+ assert(raft.leader == 0);
+ assert(raft.is_candidate);
+ assert(!raft.is_write_in_progress);
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ ev_timer_stop(loop(), &raft.timer);
+ /* Make read-write (if other subsystems allow that. */
+ box_update_ro_summary();
+ /* State is visible and it is changed - broadcast. */
+ raft_schedule_broadcast();
+}
+
+static void
+raft_sm_follow_leader(uint32_t leader)
+{
+ say_info("RAFT: leader is %u, follow", leader);
+ assert(raft.state != RAFT_STATE_LEADER);
+ assert(raft.leader == 0);
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft.leader = leader;
+ if (!raft.is_write_in_progress && raft.is_candidate) {
+ ev_timer_stop(loop(), &raft.timer);
+ raft_sm_wait_leader_dead();
+ }
+ /* State is visible and it is changed - broadcast. */
+ raft_schedule_broadcast();
+}
+
+static void
+raft_sm_become_candidate(void)
+{
+ say_info("RAFT: enter candidate state with 1 self vote");
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ assert(raft.leader == 0);
+ assert(raft.vote == instance_id);
+ assert(raft.is_candidate);
+ assert(!raft.is_write_in_progress);
+ assert(raft_election_quorum() > 1);
+ raft.state = RAFT_STATE_CANDIDATE;
+ raft.vote_count = 1;
+ raft.vote_mask = 0;
+ bit_set(&raft.vote_mask, instance_id);
+ raft_sm_wait_election_end();
+ /* State is visible and it is changed - broadcast. */
+ raft_schedule_broadcast();
+}
+
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+ say_info("RAFT: bump term to %llu, follow", new_term);
+ assert(new_term > raft.volatile_term);
+ assert(raft.volatile_term >= raft.term);
+ raft.volatile_term = new_term;
+ /* New terms means completely new Raft state. */
+ raft.volatile_vote = 0;
+ raft.leader = 0;
+ raft.state = RAFT_STATE_FOLLOWER;
+ box_update_ro_summary();
+ raft_sm_pause_and_dump();
+ /*
+ * State is visible and it is changed - broadcast. Term is also visible,
+ * but only persistent term. Volatile term is not broadcasted until
+ * saved to disk.
+ */
+ raft_schedule_broadcast();
+}
+
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+ say_info("RAFT: vote for %u, follow", new_vote, raft.volatile_term);
+ assert(raft.volatile_vote == 0);
+ assert(raft.leader == 0);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.volatile_vote = new_vote;
+ raft_sm_pause_and_dump();
+ /* Nothing visible is changed - no broadcast. */
+}
+
+static void
+raft_sm_schedule_new_election(void)
+{
+ say_info("RAFT: begin new election round");
+ assert(raft_is_fully_on_disk());
+ assert(raft.is_candidate);
+ /* Everyone is a follower until its vote for self is persisted. */
+ raft_sm_schedule_new_term(raft.term + 1);
+ raft_sm_schedule_new_vote(instance_id);
+ box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events)
+{
+ assert(timer == &raft.timer);
+ (void)events;
+ ev_timer_stop(loop, timer);
+ raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ assert(raft.leader != 0);
+ double death_timeout = replication_disconnect_timeout();
+ ev_timer_set(&raft.timer, death_timeout, death_timeout);
+ ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_wait_leader_found(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ assert(raft.leader == 0);
+ double death_timeout = replication_disconnect_timeout();
+ ev_timer_set(&raft.timer, death_timeout, death_timeout);
+ ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_wait_election_end(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER ||
+ (raft.state == RAFT_STATE_CANDIDATE &&
+ raft.volatile_vote == instance_id));
+ assert(raft.leader == 0);
+ double election_timeout = raft.election_timeout +
+ raft_new_random_election_shift();
+ ev_timer_set(&raft.timer, election_timeout, election_timeout);
+ ev_timer_start(loop(), &raft.timer);
+}
+
+static void
+raft_sm_start(void)
+{
+ say_info("RAFT: start state machine");
+ assert(!ev_is_active(&raft.timer));
+ assert(!raft.is_write_in_progress);
+ assert(!raft.is_enabled);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.is_enabled = true;
+ raft.is_candidate = raft.is_cfg_candidate;
+ if (!raft.is_candidate) {
+ /* Nop. */;
+ } else if (raft.leader != 0) {
+ raft_sm_wait_leader_dead();
+ } else {
+ /*
+ * Don't start new election. The situation is most likely
+ * happened because this node was restarted. Instance restarts
+ * may happen in the cluster, and each restart shouldn't
+ * disturb the current leader. Give it time to notify this node
+ * that there is a leader.
+ */
+ raft_sm_wait_leader_found();
+ }
+ box_update_ro_summary();
+}
+
+static void
+raft_sm_stop(void)
+{
+ say_info("RAFT: stop state machine");
+ assert(raft.is_enabled);
+ raft.is_enabled = false;
+ raft.is_candidate = false;
+ if (raft.state == RAFT_STATE_LEADER)
+ raft.leader = 0;
+ raft.state = RAFT_STATE_FOLLOWER;
+ ev_timer_stop(loop(), &raft.timer);
+ box_update_ro_summary();
+ /* State is visible and changed - broadcast. */
+ raft_schedule_broadcast();
+}
+
void
raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
{
memset(req, 0, sizeof(*req));
+ /*
+ * Volatile state is never used for any communications.
+ * Use only persisted state.
+ */
req->term = raft.term;
req->vote = raft.vote;
req->state = raft.state;
@@ -133,34 +900,110 @@ raft_serialize_for_disk(struct raft_request *req)
void
raft_cfg_is_enabled(bool is_enabled)
{
- raft.is_enabled = is_enabled;
+ if (is_enabled == raft.is_enabled)
+ return;
+
+ if (!is_enabled)
+ raft_sm_stop();
+ else
+ raft_sm_start();
}
void
raft_cfg_is_candidate(bool is_candidate)
{
- raft.is_candidate = is_candidate;
+ bool old_is_candidate = raft.is_candidate;
+ raft.is_cfg_candidate = is_candidate;
+ raft.is_candidate = is_candidate && raft.is_enabled;
+ if (raft.is_candidate == old_is_candidate)
+ return;
+
+ if (raft.is_candidate) {
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ /*
+ * If there is an on-going WAL write, it means there was some
+ * node who sent newer data to this node.
+ */
+ if (raft.leader == 0 && raft_is_fully_on_disk())
+ raft_sm_wait_leader_found();
+ } else if (raft.state != RAFT_STATE_FOLLOWER) {
+ if (raft.state == RAFT_STATE_LEADER)
+ raft.leader = 0;
+ raft.state = RAFT_STATE_FOLLOWER;
+ /* State is visible and changed - broadcast. */
+ raft_schedule_broadcast();
+ }
+ box_update_ro_summary();
}
void
raft_cfg_election_timeout(double timeout)
{
+ if (timeout == raft.election_timeout)
+ return;
+
raft.election_timeout = timeout;
+ if (raft.vote != 0 && raft.leader == 0 && raft.is_candidate) {
+ assert(ev_is_active(&raft.timer));
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + raft.election_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ ev_timer_start(loop(), &raft.timer);
+ }
}
void
raft_cfg_election_quorum(void)
{
+ if (raft.state != RAFT_STATE_CANDIDATE ||
+ raft.state == RAFT_STATE_LEADER)
+ return;
+ if (raft.vote_count < raft_election_quorum())
+ return;
+ raft_sm_become_leader();
}
void
raft_cfg_death_timeout(void)
{
+ if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+ raft.leader != 0) {
+ assert(ev_is_active(&raft.timer));
+ double death_timeout = replication_disconnect_timeout();
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + death_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ ev_timer_start(loop(), &raft.timer);
+ }
}
void
-raft_broadcast(const struct raft_request *req)
+raft_new_term(void)
{
- replicaset_foreach(replica)
- relay_push_raft(replica->relay, req);
+ if (raft.is_enabled)
+ raft_sm_schedule_new_term(raft.volatile_term + 1);
+}
+
+static void
+raft_schedule_broadcast(void)
+{
+ raft.is_broadcast_scheduled = true;
+ /*
+ * Don't wake the fiber if it writes something. Otherwise it would be a
+ * spurious wakeup breaking the WAL write not adapted to this.
+ */
+ if (raft.is_write_in_progress)
+ return;
+ if (raft.worker == NULL)
+ raft.worker = fiber_new("raft_worker", raft_worker_f);
+ if (raft.worker != fiber())
+ fiber_wakeup(raft.worker);
+}
+
+void
+raft_init(void)
+{
+ ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
}
diff --git a/src/box/raft.h b/src/box/raft.h
index 8abde4f4c..be77a5473 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,34 +31,147 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "tarantool_ev.h"
#if defined(__cplusplus)
extern "C" {
#endif
+/**
+ * This is an implementation of Raft leader election protocol, separated from
+ * synchronous replication part.
+ *
+ * The protocol describes an algorithm which helps to elect a single leader in
+ * the cluster, which is supposed to handle write requests. And re-elect a new
+ * leader, when the current leader dies.
+ *
+ * The implementation follows the protocol to the letter except a few important
+ * details.
+ *
+ * Firstly, the original Raft assumes, that all nodes share the same log record
+ * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
+ * node has its own LSN in its own component of vclock. That makes the election
+ * messages a bit heavier, because the nodes need to send and compare complete
+ * vclocks of each other instead of a single number like in the original Raft.
+ * But logic becomes simpler. Because in the original Raft there is a problem of
+ * uncertainty about what to do with records of an old leader right after a new
+ * leader is elected. They could be rolled back or confirmed depending on
+ * circumstances. The issue disappears when vclock is used.
+ *
+ * Secondly, leader election works differently during cluster bootstrap, until
+ * number of bootstrapped replicas becomes >= election quorum. That arises from
+ * specifics of replicas bootstrap and order of systems initialization. In
+ * short: during bootstrap a leader election may use a smaller election quorum
+ * than the configured one. See more details in the code.
+ */
+
+struct fiber;
struct raft_request;
struct vclock;
enum raft_state {
+ /**
+ * Can't write. Can only accept data from a leader. Node in this state
+ * either monitors an existing leader, or there is an on-going election
+ * and the node voted for another node, or it can't be a candidate and
+ * does not do anything.
+ */
RAFT_STATE_FOLLOWER = 1,
+ /**
+ * The node can't write. There is an active election, in which the node
+ * voted for self. Now it waits for election outcome.
+ */
RAFT_STATE_CANDIDATE = 2,
+ /** Election was successful. The node accepts write requests. */
RAFT_STATE_LEADER = 3,
};
extern const char *raft_state_strs[];
struct raft {
+ /** Instance ID of leader of the current term. */
uint32_t leader;
+ /** State of the instance. */
enum raft_state state;
+ /**
+ * Volatile part of the Raft state, whose WAL write may be still
+ * in-progress, and yet the state may be already used. Volatile state is
+ * never sent to anywhere, but the state machine makes decisions based
+ * on it. That is vital.
+ * As an example, volatile vote needs to be used to reject votes inside
+ * a term, where the instance already voted (even if the vote WAL write
+ * is not finished yet). Otherwise the instance would try to write
+ * several votes inside one term.
+ */
+ uint64_t volatile_term;
+ uint32_t volatile_vote;
+ /**
+ * Flag whether Raft is enabled. When disabled, it still persists terms
+ * so as to quickly enroll into the cluster when (if) it is enabled. In
+ * everything else disabled Raft does not affect instance work.
+ */
bool is_enabled;
+ /**
+ * Flag whether the node can become a leader. It is an accumulated value
+ * of configuration options Raft enabled and Raft candidate. If at least
+ * one is false - the instance is not a candidate.
+ */
bool is_candidate;
+ /** Flag whether the instance is allowed to be a leader. */
+ bool is_cfg_candidate;
+ /**
+ * Flag whether Raft currently tries to write something into WAL. It
+ * happens asynchronously, not right after Raft state is updated.
+ */
+ bool is_write_in_progress;
+ /**
+ * Flag whether Raft wants to broadcast its state. It is done
+ * asynchronously in the worker fiber. That allows to collect multiple
+ * updates into one batch if they happen in one event loop iteration.
+ * Usually even in one function.
+ */
+ bool is_broadcast_scheduled;
+ /**
+ * Persisted Raft state. These values are used when need to tell current
+ * Raft state to other nodes.
+ */
uint64_t term;
uint32_t vote;
+ /**
+ * Bit 1 on position N means that a vote from instance with ID = N was
+ * obtained.
+ */
+ uint32_t vote_mask;
+ /** Number of votes for this instance. Valid only in candidate state. */
+ int vote_count;
+ /** State machine timed event trigger. */
+ struct ev_timer timer;
+ /** Worker fiber to execute blocking tasks like IO. */
+ struct fiber *worker;
+ /** Configured election timeout in seconds. */
double election_timeout;
};
extern struct raft raft;
+/**
+ * A flag whether the instance is read-only according to Raft. Even if Raft
+ * allows writes though, it does not mean the instance is writable. It can be
+ * affected by box.cfg.read_only, connection quorum.
+ */
+static inline bool
+raft_is_ro(void)
+{
+ return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+/** See if the instance can accept rows from an instance with the given ID. */
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+ return !raft.is_enabled || raft.leader == source_id;
+}
+
/** Check if Raft is enabled. */
static inline bool
raft_is_enabled(void)
@@ -78,6 +191,13 @@ raft_process_recovery(const struct raft_request *req);
int
raft_process_msg(const struct raft_request *req, uint32_t source);
+/**
+ * Process a heartbeat message from an instance with the given ID. It is used to
+ * watch leader's health and start election when necessary.
+ */
+void
+raft_process_heartbeat(uint32_t source);
+
/** Configure whether Raft is enabled. */
void
raft_cfg_is_enabled(bool is_enabled);
@@ -109,6 +229,14 @@ raft_cfg_election_quorum(void);
void
raft_cfg_death_timeout(void);
+/**
+ * Bump the term. When it is persisted, the node checks if there is a leader,
+ * and if there is not, a new election is started. That said, this function can
+ * be used as tool to forcefully start new election, or restart an existing.
+ */
+void
+raft_new_term(void);
+
/**
* Save complete Raft state into a request to be sent to other instances of the
* cluster. It is allowed to save anything here, not only persistent state.
@@ -123,12 +251,9 @@ raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
void
raft_serialize_for_disk(struct raft_request *req);
-/**
- * Broadcast the changes in this instance's raft status to all
- * the followers.
- */
+/** Initialize Raft global data structures. */
void
-raft_broadcast(const struct raft_request *req);
+raft_init(void);
#if defined(__cplusplus)
}
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 76430caa6..096f455a1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -852,6 +852,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+ recovery_delete(relay->r);
+ relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+ recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
struct relay_raft_msg {
struct cmsg base;
struct cmsg_hop route;
@@ -867,7 +884,14 @@ relay_raft_msg_push(struct cmsg *base)
struct xrow_header row;
xrow_encode_raft(&row, &fiber()->gc, &msg->req);
try {
+ /*
+ * Send the message before restarting the recovery. Otherwise
+ * all the rows would be sent from under a non-leader role and
+ * would be ignored again.
+ */
relay_send(msg->relay, &row);
+ if (msg->req.state == RAFT_STATE_LEADER)
+ relay_restart_recovery(msg->relay);
} catch (Exception *e) {
relay_set_error(msg->relay, e);
fiber_cancel(fiber());
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [PATCH v3 09/10] raft: introduce box.info.election
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (8 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 08/10] raft: introduce state machine Vladislav Shpilevoy
@ 2020-09-29 22:11 ` Vladislav Shpilevoy
2020-09-30 7:06 ` [Tarantool-patches] [PATCH v3 00/10] Raft Serge Petrenko
2020-09-30 11:04 ` Kirill Yukhin
11 siblings, 0 replies; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-29 22:11 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko
Box.info.election returns a table of form:
{
state: <string>,
term: <number>,
vote: <instance ID>,
leader: <instance ID>
}
The fields correspond to the same named Raft concepts one to one.
This info dump is supposed to help with the tests, first of all.
And with investigation of problems in a real cluster.
The API doesn't mention 'Raft' on purpose, to keep it not
depending specifically on Raft, and not to confuse users who
don't know anything about Raft (even that it is about leader
election and synchronous replication).
Part of #1146
---
src/box/lua/info.c | 17 +++++++++++++++++
test/box/info.result | 1 +
2 files changed, 18 insertions(+)
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 1c131caec..cac3fd475 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -49,6 +49,7 @@
#include "main.h"
#include "version.h"
#include "box/box.h"
+#include "box/raft.h"
#include "lua/utils.h"
#include "fiber.h"
#include "tt_static.h"
@@ -577,6 +578,21 @@ lbox_info_listen(struct lua_State *L)
return 1;
}
+static int
+lbox_info_election(struct lua_State *L)
+{
+ lua_createtable(L, 0, 4);
+ lua_pushstring(L, raft_state_strs[raft.state]);
+ lua_setfield(L, -2, "state");
+ luaL_pushuint64(L, raft.volatile_term);
+ lua_setfield(L, -2, "term");
+ lua_pushinteger(L, raft.volatile_vote);
+ lua_setfield(L, -2, "vote");
+ lua_pushinteger(L, raft.leader);
+ lua_setfield(L, -2, "leader");
+ return 1;
+}
+
static const struct luaL_Reg lbox_info_dynamic_meta[] = {
{"id", lbox_info_id},
{"uuid", lbox_info_uuid},
@@ -595,6 +611,7 @@ static const struct luaL_Reg lbox_info_dynamic_meta[] = {
{"vinyl", lbox_info_vinyl},
{"sql", lbox_info_sql},
{"listen", lbox_info_listen},
+ {"election", lbox_info_election},
{NULL, NULL}
};
diff --git a/test/box/info.result b/test/box/info.result
index 40eeae069..c8037818b 100644
--- a/test/box/info.result
+++ b/test/box/info.result
@@ -75,6 +75,7 @@ table.sort(t)
t
---
- - cluster
+ - election
- gc
- id
- listen
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 00/10] Raft
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (9 preceding siblings ...)
2020-09-29 22:11 ` [Tarantool-patches] [PATCH v3 09/10] raft: introduce box.info.election Vladislav Shpilevoy
@ 2020-09-30 7:06 ` Serge Petrenko
2020-09-30 11:04 ` Kirill Yukhin
11 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-09-30 7:06 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches, Kirill Yukhin
30.09.2020 01:11, Vladislav Shpilevoy пишет:
> The patchset is a first approach to implementing Raft leader election.
> Synchronous replication is already implemented, and the leader election is
> supposed to complement it. Although this is still far being from production
> ready. Main purpose of the patch is to get Raft in 2.6.1 so as we could work on
> its stabilization and testing in 2.6.x releases, not making the users wait for
> 2.7.
Hi! Thanks for the patchset!
LGTM.
> Branch: http://github.com/tarantool/tarantool/tree/gh-1146-raft
> Issue: https://github.com/tarantool/tarantool/issues/1146
>
> @ChangeLog
> * Automated leader election via Raft algorithm (gh-1146).
>
> Changed from v2:
> - Lots of minor bugs fixed;
> - New naming in the public API - 'election' instead of 'raft'.
>
> Vladislav Shpilevoy (9):
> applier: store instance_id in struct applier
> box: introduce summary RO flag
> wal: don't touch box.cfg.wal_dir more than once
> replication: track registered replica count
> raft: introduce persistent raft state
> raft: introduce box.cfg.election_* options
> raft: introduce state machine
> raft: introduce box.info.election
> raft: add tests
>
> sergepetrenko (1):
> raft: relay status updates to followers
>
> src/box/CMakeLists.txt | 1 +
> src/box/applier.cc | 50 +-
> src/box/applier.h | 2 +
> src/box/box.cc | 185 +++-
> src/box/box.h | 9 +
> src/box/iproto_constants.h | 15 +
> src/box/lua/cfg.cc | 27 +
> src/box/lua/info.c | 17 +
> src/box/lua/load_cfg.lua | 15 +
> src/box/lua/misc.cc | 1 +
> src/box/memtx_engine.c | 36 +
> src/box/raft.c | 1009 ++++++++++++++++++++++
> src/box/raft.h | 260 ++++++
> src/box/relay.cc | 181 +++-
> src/box/relay.h | 7 +
> src/box/replication.cc | 4 +-
> src/box/replication.h | 7 +
> src/box/wal.c | 6 +
> src/box/wal.h | 7 +
> src/box/xrow.c | 115 +++
> src/box/xrow.h | 15 +
> test/app-tap/init_script.result | 3 +
> test/box/admin.result | 6 +
> test/box/cfg.result | 12 +
> test/box/info.result | 1 +
> test/replication/election_basic.result | 278 ++++++
> test/replication/election_basic.test.lua | 117 +++
> test/replication/election_replica.lua | 30 +
> test/replication/election_replica1.lua | 1 +
> test/replication/election_replica2.lua | 1 +
> test/replication/election_replica3.lua | 1 +
> 31 files changed, 2383 insertions(+), 36 deletions(-)
> create mode 100644 src/box/raft.c
> create mode 100644 src/box/raft.h
> create mode 100644 test/replication/election_basic.result
> create mode 100644 test/replication/election_basic.test.lua
> create mode 100644 test/replication/election_replica.lua
> create mode 120000 test/replication/election_replica1.lua
> create mode 120000 test/replication/election_replica2.lua
> create mode 120000 test/replication/election_replica3.lua
>
--
Serge Petrenko
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 00/10] Raft
2020-09-29 22:11 [Tarantool-patches] [PATCH v3 00/10] Raft Vladislav Shpilevoy
` (10 preceding siblings ...)
2020-09-30 7:06 ` [Tarantool-patches] [PATCH v3 00/10] Raft Serge Petrenko
@ 2020-09-30 11:04 ` Kirill Yukhin
11 siblings, 0 replies; 13+ messages in thread
From: Kirill Yukhin @ 2020-09-30 11:04 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
Hello,
On 30 сен 00:11, Vladislav Shpilevoy wrote:
> The patchset is a first approach to implementing Raft leader election.
> Synchronous replication is already implemented, and the leader election is
> supposed to complement it. Although this is still far being from production
> ready. Main purpose of the patch is to get Raft in 2.6.1 so as we could work on
> its stabilization and testing in 2.6.x releases, not making the users wait for
> 2.7.
>
> Branch: http://github.com/tarantool/tarantool/tree/gh-1146-raft
> Issue: https://github.com/tarantool/tarantool/issues/1146
>
> @ChangeLog
> * Automated leader election via Raft algorithm (gh-1146).
I've checked your patchset into master.
--
Regards, Kirill Yukhin
^ permalink raw reply [flat|nested] 13+ messages in thread