* [Tarantool-patches] [PATCH 1/8] applier: store instance_id in struct applier
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 2/8] box: introduce summary RO flag Vladislav Shpilevoy
` (7 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
Applier is going to need its numeric ID in order to tell the
future Raft module who is a sender of a Raft message. An
alternative would be to add sender ID to each Raft message, but
this looks like a crutch. Moreover, applier still needs to know
its numeric ID in order to notify Raft about heartbeats from the
peer node.
Needed for #1146
---
src/box/applier.cc | 19 +++++++++++++++++++
src/box/applier.h | 2 ++
2 files changed, 21 insertions(+)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..699b5a683 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -67,6 +67,23 @@ applier_set_state(struct applier *applier, enum applier_state state)
trigger_run_xc(&applier->on_state, applier);
}
+static inline void
+applier_assign_instance_id(struct applier *applier)
+{
+ /*
+ * After final join, the applier already received latest
+ * records from _cluster, including the record about
+ * source instance. It can be absent in case the source is
+ * an anonymous replica.
+ */
+ assert(applier->state == APPLIER_JOINED);
+ struct replica *replica = replica_by_uuid(&applier->uuid);
+ if (replica != NULL)
+ applier->instance_id = replica->id;
+ else
+ assert(applier->instance_id == 0);
+}
+
/**
* Write a nice error message to log file on SocketError or ClientError
* in applier_f().
@@ -603,6 +620,7 @@ applier_join(struct applier *applier)
say_info("final data received");
applier_set_state(applier, APPLIER_JOINED);
+ applier_assign_instance_id(applier);
applier_set_state(applier, APPLIER_READY);
}
@@ -1207,6 +1225,7 @@ applier_subscribe(struct applier *applier)
instance_id != REPLICA_ID_NIL) {
say_info("final data received");
applier_set_state(applier, APPLIER_JOINED);
+ applier_assign_instance_id(applier);
applier_set_state(applier, APPLIER_READY);
applier_set_state(applier, APPLIER_FOLLOW);
}
diff --git a/src/box/applier.h b/src/box/applier.h
index 6e979a806..15ca1fcfd 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -95,6 +95,8 @@ struct applier {
ev_tstamp lag;
/** The last box_error_code() logged to avoid log flooding */
uint32_t last_logged_errcode;
+ /** Remote instance ID. */
+ uint32_t instance_id;
/** Remote instance UUID */
struct tt_uuid uuid;
/** Remote URI (string) */
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 2/8] box: introduce summary RO flag
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 1/8] applier: store instance_id in struct applier Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 3/8] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
` (6 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
An instance is writable if box.cfg.read_only is false, and it is
not orphan. Update of the final read-only state of the instance
needs to fire read-only update triggers, and notify the engines.
These 2 flags were easy and cheap to check on each operation, and
the triggers were easy to use since both flags are stored and
updated inside box.cc.
That is going to change when Raft is introduced. Raft will add 2
more checks:
- A flag if Raft is enabled on the node. If it is not, then Raft
state won't affect whether the instance is writable;
- When Raft is enabled, it will allow writes on a leader only.
It means a check for being read-only would look like this:
is_ro || is_orphan || (raft_is_enabled() && !raft_is_leader())
This is significantly slower. Besides, Raft somehow needs to
access the read-only triggers and engine API - this looks wrong.
The patch introduces a new flag is_ro_summary. The flag
incorporates all the read-only conditions into one flag. When some
subsystem may change read-only state of the instance, it needs to
call box_update_ro_summary(), and the function takes care of
updating the summary flag, running the triggers, and notifying the
engines.
Raft will use this function when its state or config will change.
Needed for #1146
---
src/box/box.cc | 44 +++++++++++++++++++++++++++-----------------
src/box/box.h | 6 ++++++
2 files changed, 33 insertions(+), 17 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index faffd5769..0813603c0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -129,6 +129,14 @@ static bool is_local_recovery = false;
*/
static bool is_orphan;
+/**
+ * Summary flag incorporating all the instance attributes,
+ * affecting ability to write. Currently these are:
+ * - is_ro;
+ * - is_orphan;
+ */
+static bool is_ro_summary = true;
+
/**
* The pool of fibers in the transaction processor thread
* working on incoming messages from net, wal and other
@@ -144,11 +152,24 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+void
+box_update_ro_summary(void)
+{
+ bool old_is_ro_summary = is_ro_summary;
+ is_ro_summary = is_ro || is_orphan;
+ /* In 99% nothing changes. Filter this out first. */
+ if (is_ro_summary == old_is_ro_summary)
+ return;
+
+ if (is_ro_summary)
+ engine_switch_to_ro();
+ fiber_cond_broadcast(&ro_cond);
+}
+
static int
box_check_writable(void)
{
- /* box is only writable if box.cfg.read_only == false and */
- if (is_ro || is_orphan) {
+ if (is_ro_summary) {
diag_set(ClientError, ER_READONLY);
diag_log();
return -1;
@@ -253,20 +274,14 @@ box_check_ro(void);
void
box_set_ro(void)
{
- bool ro = box_check_ro();
- if (ro == is_ro)
- return; /* nothing to do */
- if (ro)
- engine_switch_to_ro();
-
- is_ro = ro;
- fiber_cond_broadcast(&ro_cond);
+ is_ro = box_check_ro();
+ box_update_ro_summary();
}
bool
box_is_ro(void)
{
- return is_ro || is_orphan;
+ return is_ro_summary;
}
bool
@@ -293,13 +308,8 @@ box_wait_ro(bool ro, double timeout)
void
box_do_set_orphan(bool orphan)
{
- if (is_orphan == orphan)
- return; /* nothing to do */
- if (orphan)
- engine_switch_to_ro();
-
is_orphan = orphan;
- fiber_cond_broadcast(&ro_cond);
+ box_update_ro_summary();
}
void
diff --git a/src/box/box.h b/src/box/box.h
index f9bd8b98d..5988264a5 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -137,6 +137,12 @@ box_set_orphan(bool orphan);
void
box_do_set_orphan(bool orphan);
+/**
+ * Update the final RO flag based on the instance flags and state.
+ */
+void
+box_update_ro_summary(void);
+
/**
* Iterate over all spaces and save them to the
* snapshot file.
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 3/8] wal: don't touch box.cfg.wal_dir more than once
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 1/8] applier: store instance_id in struct applier Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 2/8] box: introduce summary RO flag Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 4/8] raft: introduce persistent raft state Vladislav Shpilevoy
` (5 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
Relay.cc and box.cc obtained box.cfg.wal_dir value using
cfg_gets() call. To initialize WAL and create struct recovery
objects.
That is not only a bit dangerous (cfg_gets() uses Lua API and can
throw a Lua error) and slow, but also not necessary - wal_dir
parameter is constant, it can't be changed after instance start.
It means, the value can be stored somewhere one time and then used
without Lua.
Main motivation is that the WAL directory path will be needed
inside relay threads to restart their recovery iterators in the
Raft patch. They can't use cfg_gets(), because Lua lives in TX
thread. But can access a constant global variable, introduced in
this patch (it existed before, but now has a method to get it).
Needed for #1146
---
src/box/box.cc | 9 ++++-----
src/box/relay.cc | 7 ++-----
src/box/wal.c | 6 ++++++
src/box/wal.h | 7 +++++++
4 files changed, 19 insertions(+), 10 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 0813603c0..eeb00d5e2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2395,8 +2395,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
wal_stream_create(&wal_stream);
struct recovery *recovery;
- recovery = recovery_new(cfg_gets("wal_dir"),
- cfg_geti("force_recovery"),
+ recovery = recovery_new(wal_dir(), cfg_geti("force_recovery"),
checkpoint_vclock);
/*
@@ -2469,7 +2468,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
cfg_getd("wal_dir_rescan_delay"));
while (true) {
- if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock))
+ if (path_lock(wal_dir(), &wal_dir_lock))
diag_raise();
if (wal_dir_lock >= 0)
break;
@@ -2616,7 +2615,7 @@ box_cfg_xc(void)
* Lock the write ahead log directory to avoid multiple
* instances running in the same dir.
*/
- if (path_lock(cfg_gets("wal_dir"), &wal_dir_lock) < 0)
+ if (path_lock(wal_dir(), &wal_dir_lock) < 0)
diag_raise();
if (wal_dir_lock < 0) {
/**
@@ -2625,7 +2624,7 @@ box_cfg_xc(void)
* WAL dir must contain at least one xlog.
*/
if (!cfg_geti("hot_standby") || checkpoint == NULL)
- tnt_raise(ClientError, ER_ALREADY_RUNNING, cfg_gets("wal_dir"));
+ tnt_raise(ClientError, ER_ALREADY_RUNNING, wal_dir());
}
struct journal bootstrap_journal;
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..124b0f52f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -34,7 +34,6 @@
#include "tt_static.h"
#include "scoped_guard.h"
#include "cbus.h"
-#include "cfg.h"
#include "errinj.h"
#include "fiber.h"
#include "say.h"
@@ -369,8 +368,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
relay_delete(relay);
});
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- start_vclock);
+ relay->r = recovery_new(wal_dir(), false, start_vclock);
vclock_copy(&relay->stop_vclock, stop_vclock);
int rc = cord_costart(&relay->cord, "final_join",
@@ -731,8 +729,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
});
vclock_copy(&relay->local_vclock_at_subscribe, &replicaset.vclock);
- relay->r = recovery_new(cfg_gets("wal_dir"), false,
- replica_clock);
+ relay->r = recovery_new(wal_dir(), false, replica_clock);
vclock_copy(&relay->tx.vclock, replica_clock);
relay->version_id = replica_version_id;
diff --git a/src/box/wal.c b/src/box/wal.c
index 045006b60..e181e58d9 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -201,6 +201,12 @@ wal_mode(void)
return wal_writer_singleton.wal_mode;
}
+const char *
+wal_dir(void)
+{
+ return wal_writer_singleton.wal_dir.dirname;
+}
+
static void
wal_write_to_disk(struct cmsg *msg);
diff --git a/src/box/wal.h b/src/box/wal.h
index 9d0cada46..581306fe9 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -98,6 +98,13 @@ wal_enable(void);
void
wal_free(void);
+/**
+ * Get WAL directory path. The value never changes after box is
+ * configured first time. Safe to use from multiple threads.
+ */
+const char *
+wal_dir(void);
+
struct wal_watcher_msg {
struct cmsg cmsg;
struct wal_watcher *watcher;
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 4/8] raft: introduce persistent raft state
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (2 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 3/8] wal: don't touch box.cfg.wal_dir more than once Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 5/8] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
` (4 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
box.internal.raft_*() helper functions were introduced to test
the persistency. Any state change is saved into WAL and into
snapshot.
Part of #1146
---
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 8 +++
src/box/iproto_constants.h | 13 ++++
src/box/lua/misc.cc | 35 +++++++++++
src/box/memtx_engine.c | 35 +++++++++++
src/box/raft.c | 120 +++++++++++++++++++++++++++++++++++++
src/box/raft.h | 61 +++++++++++++++++++
src/box/xrow.c | 56 +++++++++++++++++
src/box/xrow.h | 12 ++++
9 files changed, 341 insertions(+)
create mode 100644 src/box/raft.c
create mode 100644 src/box/raft.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689d2..29c3bfe79 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -170,6 +170,7 @@ add_library(box STATIC
port.c
txn.c
txn_limbo.c
+ raft.c
box.cc
gc.c
checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index eeb00d5e2..281917af2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
+#include "raft.h"
#include "trivia/util.h"
static char status[64] = "unknown";
@@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
diag_raise();
return;
}
+ if (iproto_type_is_raft_request(row->type)) {
+ struct raft_request raft_req;
+ if (xrow_decode_raft(row, &raft_req) != 0)
+ diag_raise();
+ raft_process_recovery(&raft_req);
+ return;
+ }
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4f5a2b195..8a11626b3 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,8 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
+ IPROTO_RAFT = 30,
+
/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
/** A rollback message for synchronous transactions. */
@@ -258,6 +260,11 @@ enum iproto_type {
/** IPROTO type name by code */
extern const char *iproto_type_strs[];
+enum iproto_raft_keys {
+ IPROTO_RAFT_TERM = 0,
+ IPROTO_RAFT_VOTE = 1,
+};
+
/**
* Returns IPROTO type name by @a type code.
* @param type IPROTO type.
@@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
}
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+ return type == IPROTO_RAFT;
+}
+
/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..98e98abe2 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,8 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
+#include "box/raft.h"
+#include "box/xrow.h"
#include "mpstream/mpstream.h"
static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
@@ -246,12 +248,45 @@ lbox_tuple_format_new(struct lua_State *L)
/* }}} */
+static int
+lbox_raft_new_term(struct lua_State *L)
+{
+ uint64_t min_term = luaL_checkuint64(L, 1);
+ raft_new_term(min_term);
+ return 0;
+}
+
+static int
+lbox_raft_vote(struct lua_State *L)
+{
+ uint64_t vote_for = luaL_checkuint64(L, 1);
+ if (vote_for > UINT32_MAX)
+ return luaL_error(L, "Invalid vote");
+ raft_vote(vote_for);
+ return 0;
+}
+
+static int
+lbox_raft_get(struct lua_State *L)
+{
+ lua_createtable(L, 0, 2);
+ luaL_pushuint64(L, raft.term);
+ lua_setfield(L, -2, "term");
+ luaL_pushuint64(L, raft.vote);
+ lua_setfield(L, -2, "vote");
+ return 1;
+}
+
void
box_lua_misc_init(struct lua_State *L)
{
static const struct luaL_Reg boxlib_internal[] = {
{"select", lbox_select},
{"new_tuple_format", lbox_tuple_format_new},
+ /* Temporary helpers to sanity test raft persistency. */
+ {"raft_new_term", lbox_raft_new_term},
+ {"raft_vote", lbox_raft_vote},
+ {"raft_get", lbox_raft_get},
{NULL, NULL}
};
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dfd6fce6e..31a8c260a 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "raft.h"
/* sync snapshot every 16MB */
#define SNAP_SYNC_INTERVAL (1 << 24)
@@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
return 0;
}
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+ assert(row->type == IPROTO_RAFT);
+ struct raft_request req;
+ if (xrow_decode_raft(row, &req) != 0)
+ return -1;
+ raft_process_recovery(&req);
+ return 0;
+}
+
static int
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct xrow_header *row)
{
assert(row->bodycnt == 1); /* always 1 for read */
if (row->type != IPROTO_INSERT) {
+ if (row->type == IPROTO_RAFT)
+ return memtx_engine_recover_raft(row);
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row->type);
return -1;
@@ -477,6 +491,7 @@ struct checkpoint {
/** The vclock of the snapshot file. */
struct vclock vclock;
struct xdir dir;
+ struct raft_request raft;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
@@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
+ raft_serialize(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
@@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
return 0;
};
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+ struct xrow_header row;
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ int rc = -1;
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto finish;
+ if (checkpoint_write_row(l, &row) != 0)
+ goto finish;
+ rc = 0;
+finish:
+ region_truncate(region, svp);
+ return rc;
+}
+
static int
checkpoint_f(va_list ap)
{
@@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
if (rc != 0)
goto fail;
}
+ if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+ goto fail;
if (xlog_flush(&snap) < 0)
goto fail;
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..eb2fc7cf3
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+ .term = 1,
+ .vote = 0,
+};
+
+void
+raft_process_recovery(const struct raft_request *req)
+{
+ if (req->term != 0)
+ raft.term = req->term;
+ if (req->vote != 0)
+ raft.vote = req->vote;
+}
+
+void
+raft_serialize(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
+
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+ fiber_wakeup(entry->complete_data);
+}
+
+static void
+raft_write_request(const struct raft_request *req)
+{
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ struct xrow_header row;
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
+
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto fail;
+ journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+ fiber());
+
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ goto fail;
+ }
+ region_truncate(region, svp);
+ return;
+fail:
+ /*
+ * XXX: the stub is supposed to be removed once it is defined what to do
+ * when a raft request WAL write fails.
+ */
+ panic("Could not write a raft request to WAL\n");
+}
+
+void
+raft_new_term(uint64_t min_new_term)
+{
+ if (raft.term < min_new_term)
+ raft.term = min_new_term + 1;
+ else
+ ++raft.term;
+
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.term = raft.term;
+ raft_write_request(&req);
+}
+
+void
+raft_vote(uint32_t vote_for)
+{
+ raft.vote = vote_for;
+
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.vote = vote_for;
+ raft_write_request(&req);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..a5070788d
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,61 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft_request;
+
+struct raft {
+ uint64_t term;
+ uint32_t vote;
+};
+
+extern struct raft raft;
+
+void
+raft_new_term(uint64_t min_new_term);
+
+void
+raft_vote(uint32_t vote_for);
+
+void
+raft_process_recovery(const struct raft_request *req);
+
+void
+raft_serialize(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 95ddb1fe7..1923bacfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
return 0;
}
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r)
+{
+ size_t size = mp_sizeof_map(2) +
+ mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term) +
+ mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ char *buf = region_alloc(region, size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "buf");
+ return -1;
+ }
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_RAFT;
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = size;
+ row->group_id = GROUP_LOCAL;
+ row->bodycnt = 1;
+ buf = mp_encode_map(buf, 2);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+ buf = mp_encode_uint(buf, r->term);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+ /* TODO: handle bad format. */
+ assert(row->type == IPROTO_RAFT);
+ assert(row->bodycnt == 1);
+ assert(row->group_id == GROUP_LOCAL);
+ memset(r, 0, sizeof(*r));
+ const char *pos = row->body[0].iov_base;
+ uint32_t map_size = mp_decode_map(&pos);
+ for (uint32_t i = 0; i < map_size; ++i)
+ {
+ uint64_t key = mp_decode_uint(&pos);
+ switch (key) {
+ case IPROTO_RAFT_TERM:
+ r->term = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VOTE:
+ r->vote = mp_decode_uint(&pos);
+ break;
+ default:
+ mp_next(&pos);
+ break;
+ }
+ }
+ return 0;
+}
+
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 58d47b12d..c234f6f88 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
/**
* CALL/EVAL request.
*/
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 5/8] raft: introduce box.cfg.raft_* options
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (3 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 4/8] raft: introduce persistent raft state Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 6/8] raft: relay status updates to followers Vladislav Shpilevoy
` (3 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
The new options are:
- raft_is_enabled - enable/disable Raft. When disabled, the node
is supposed to work like if Raft does not exist. Like earlier;
- raft_is_candidate - a flag whether the instance can try to
become a leader. Note, it can vote for other nodes regardless of
value of this option;
- raft_election_timeout - how long need to wait until election
end, in seconds.
The options don't do anything now. They are added separately in
order to keep such mundane changes from the main Raft commit, to
simplify its review.
Part of #1146
---
src/box/box.cc | 91 +++++++++++++++++++++++++++++++++
src/box/box.h | 3 ++
src/box/lua/cfg.cc | 27 ++++++++++
src/box/lua/load_cfg.lua | 15 ++++++
src/box/raft.c | 30 +++++++++++
src/box/raft.h | 20 ++++++++
test/app-tap/init_script.result | 3 ++
test/box/admin.result | 6 +++
test/box/cfg.result | 12 +++++
9 files changed, 207 insertions(+)
diff --git a/src/box/box.cc b/src/box/box.cc
index 281917af2..5f04a1a78 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -472,6 +472,40 @@ box_check_uri(const char *source, const char *option_name)
}
}
+static int
+box_check_raft_is_enabled(void)
+{
+ int b = cfg_getb("raft_is_enabled");
+ if (b < 0) {
+ diag_set(ClientError, ER_CFG, "raft_is_enabled",
+ "the value must be a boolean");
+ }
+ return b;
+}
+
+static int
+box_check_raft_is_candidate(void)
+{
+ int b = cfg_getb("raft_is_candidate");
+ if (b < 0) {
+ diag_set(ClientError, ER_CFG, "raft_is_candidate",
+ "the value must be a boolean");
+ }
+ return b;
+}
+
+static double
+box_check_raft_election_timeout(void)
+{
+ double d = cfg_getd("raft_election_timeout");
+ if (d == 0) {
+ diag_set(ClientError, ER_CFG, "raft_election_timeout",
+ "the value must be a positive number");
+ return -1;
+ }
+ return d;
+}
+
static void
box_check_replication(void)
{
@@ -729,6 +763,12 @@ box_check_config(void)
box_check_uri(cfg_gets("listen"), "listen");
box_check_instance_uuid(&uuid);
box_check_replicaset_uuid(&uuid);
+ if (box_check_raft_is_enabled() < 0)
+ diag_raise();
+ if (box_check_raft_is_candidate() < 0)
+ diag_raise();
+ if (box_check_raft_election_timeout() < 0)
+ diag_raise();
box_check_replication();
box_check_replication_timeout();
box_check_replication_connect_timeout();
@@ -751,6 +791,36 @@ box_check_config(void)
diag_raise();
}
+int
+box_set_raft_is_enabled(void)
+{
+ int b = box_check_raft_is_enabled();
+ if (b < 0)
+ return -1;
+ raft_cfg_is_enabled(b);
+ return 0;
+}
+
+int
+box_set_raft_is_candidate(void)
+{
+ int b = box_check_raft_is_candidate();
+ if (b < 0)
+ return -1;
+ raft_cfg_is_candidate(b);
+ return 0;
+}
+
+int
+box_set_raft_election_timeout(void)
+{
+ double d = box_check_raft_election_timeout();
+ if (d < 0)
+ return -1;
+ raft_cfg_election_timeout(d);
+ return 0;
+}
+
/*
* Parse box.cfg.replication and create appliers.
*/
@@ -835,6 +905,7 @@ void
box_set_replication_timeout(void)
{
replication_timeout = box_check_replication_timeout();
+ raft_cfg_death_timeout();
}
void
@@ -865,6 +936,7 @@ box_set_replication_synchro_quorum(void)
return -1;
replication_synchro_quorum = value;
txn_limbo_on_parameters_change(&txn_limbo);
+ raft_cfg_election_quorum();
return 0;
}
@@ -2680,6 +2752,25 @@ box_cfg_xc(void)
fiber_gc();
is_box_configured = true;
+ /*
+ * Fill in Raft parameters after bootstrap. Before it is not possible -
+ * there may be Raft data to recover from WAL and snapshot. Also until
+ * recovery is done, it is not possible to write new records into WAL.
+ * It is also totally safe, because relaying is not started until the
+ * box is configured. So it can't happen, that this Raft node will try
+ * to relay to another Raft node without Raft enabled leading to
+ * disconnect.
+ */
+ if (box_set_raft_is_candidate() != 0)
+ diag_raise();
+ if (box_set_raft_election_timeout() != 0)
+ diag_raise();
+ /*
+ * Raft is enabled last. So as all the parameters are installed by that
+ * time.
+ */
+ if (box_set_raft_is_enabled() != 0)
+ diag_raise();
title("running");
say_info("ready to accept requests");
diff --git a/src/box/box.h b/src/box/box.h
index 5988264a5..637d10dd3 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -245,6 +245,9 @@ void box_set_vinyl_memory(void);
void box_set_vinyl_max_tuple_size(void);
void box_set_vinyl_cache(void);
void box_set_vinyl_timeout(void);
+int box_set_raft_is_enabled(void);
+int box_set_raft_is_candidate(void);
+int box_set_raft_election_timeout(void);
void box_set_replication_timeout(void);
void box_set_replication_connect_timeout(void);
void box_set_replication_connect_quorum(void);
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index d481155cd..339b85f9d 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -269,6 +269,30 @@ lbox_cfg_set_worker_pool_threads(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_raft_is_enabled(struct lua_State *L)
+{
+ if (box_set_raft_is_enabled() != 0)
+ luaT_error(L);
+ return 0;
+}
+
+static int
+lbox_cfg_set_raft_is_candidate(struct lua_State *L)
+{
+ if (box_set_raft_is_candidate() != 0)
+ luaT_error(L);
+ return 0;
+}
+
+static int
+lbox_cfg_set_raft_election_timeout(struct lua_State *L)
+{
+ if (box_set_raft_election_timeout() != 0)
+ luaT_error(L);
+ return 0;
+}
+
static int
lbox_cfg_set_replication_timeout(struct lua_State *L)
{
@@ -382,6 +406,9 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_vinyl_max_tuple_size", lbox_cfg_set_vinyl_max_tuple_size},
{"cfg_set_vinyl_cache", lbox_cfg_set_vinyl_cache},
{"cfg_set_vinyl_timeout", lbox_cfg_set_vinyl_timeout},
+ {"cfg_set_raft_is_enabled", lbox_cfg_set_raft_is_enabled},
+ {"cfg_set_raft_is_candidate", lbox_cfg_set_raft_is_candidate},
+ {"cfg_set_raft_election_timeout", lbox_cfg_set_raft_election_timeout},
{"cfg_set_replication_timeout", lbox_cfg_set_replication_timeout},
{"cfg_set_replication_connect_quorum", lbox_cfg_set_replication_connect_quorum},
{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 53f572895..2c98fd837 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -86,6 +86,9 @@ local default_cfg = {
checkpoint_wal_threshold = 1e18,
checkpoint_count = 2,
worker_pool_threads = 4,
+ raft_is_enabled = false,
+ raft_is_candidate = true,
+ raft_election_timeout = 5,
replication_timeout = 1,
replication_sync_lag = 10,
replication_sync_timeout = 300,
@@ -163,6 +166,9 @@ local template_cfg = {
read_only = 'boolean',
hot_standby = 'boolean',
worker_pool_threads = 'number',
+ raft_is_enabled = 'boolean',
+ raft_is_candidate = 'boolean',
+ raft_election_timeout = 'number',
replication_timeout = 'number',
replication_sync_lag = 'number',
replication_sync_timeout = 'number',
@@ -279,6 +285,9 @@ local dynamic_cfg = {
require('title').update(box.cfg.custom_proc_title)
end,
force_recovery = function() end,
+ raft_is_enabled = private.cfg_set_raft_is_enabled,
+ raft_is_candidate = private.cfg_set_raft_is_candidate,
+ raft_election_timeout = private.cfg_set_raft_election_timeout,
replication_timeout = private.cfg_set_replication_timeout,
replication_connect_timeout = private.cfg_set_replication_connect_timeout,
replication_connect_quorum = private.cfg_set_replication_connect_quorum,
@@ -333,6 +342,9 @@ local dynamic_cfg_order = {
-- the new one. This should be fixed when box.cfg is able to
-- apply some parameters together and atomically.
replication_anon = 250,
+ raft_is_enabled = 300,
+ raft_is_candidate = 310,
+ raft_election_timeout = 320,
}
local function sort_cfg_cb(l, r)
@@ -350,6 +362,9 @@ local dynamic_cfg_skip_at_load = {
vinyl_cache = true,
vinyl_timeout = true,
too_long_threshold = true,
+ raft_is_enabled = true,
+ raft_is_candidate = true,
+ raft_election_timeout = true,
replication = true,
replication_timeout = true,
replication_connect_timeout = true,
diff --git a/src/box/raft.c b/src/box/raft.c
index eb2fc7cf3..6fd5515f4 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -37,6 +37,8 @@
/** Raft state of this instance. */
struct raft raft = {
+ .is_enabled = false,
+ .is_candidate = false,
.term = 1,
.vote = 0,
};
@@ -108,6 +110,34 @@ raft_new_term(uint64_t min_new_term)
raft_write_request(&req);
}
+void
+raft_cfg_is_enabled(bool is_enabled)
+{
+ raft.is_enabled = is_enabled;
+}
+
+void
+raft_cfg_is_candidate(bool is_candidate)
+{
+ raft.is_candidate = is_candidate;
+}
+
+void
+raft_cfg_election_timeout(double timeout)
+{
+ raft.election_timeout = timeout;
+}
+
+void
+raft_cfg_election_quorum(void)
+{
+}
+
+void
+raft_cfg_death_timeout(void)
+{
+}
+
void
raft_vote(uint32_t vote_for)
{
diff --git a/src/box/raft.h b/src/box/raft.h
index a5070788d..2c4b5036c 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,6 +30,7 @@
* SUCH DAMAGE.
*/
#include <stdint.h>
+#include <stdbool.h>
#if defined(__cplusplus)
extern "C" {
@@ -38,8 +39,11 @@ extern "C" {
struct raft_request;
struct raft {
+ bool is_enabled;
+ bool is_candidate;
uint64_t term;
uint32_t vote;
+ double election_timeout;
};
extern struct raft raft;
@@ -53,6 +57,22 @@ raft_vote(uint32_t vote_for);
void
raft_process_recovery(const struct raft_request *req);
+void
+raft_cfg_is_enabled(bool is_enabled);
+
+void
+raft_cfg_is_candidate(bool is_candidate);
+
+void
+raft_cfg_election_timeout(double timeout);
+
+void
+raft_cfg_election_quorum(void);
+
+void
+raft_cfg_death_timeout(void);
+
+/** Save complete Raft state into the request. */
void
raft_serialize(struct raft_request *req);
diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result
index 857f0c95f..1d191987a 100644
--- a/test/app-tap/init_script.result
+++ b/test/app-tap/init_script.result
@@ -23,6 +23,9 @@ memtx_memory:107374182
memtx_min_tuple_size:16
net_msg_max:768
pid_file:box.pid
+raft_election_timeout:5
+raft_is_candidate:true
+raft_is_enabled:false
read_only:false
readahead:16320
replication_anon:false
diff --git a/test/box/admin.result b/test/box/admin.result
index ab3e80a97..13536a318 100644
--- a/test/box/admin.result
+++ b/test/box/admin.result
@@ -67,6 +67,12 @@ cfg_filter(box.cfg)
- 768
- - pid_file
- <hidden>
+ - - raft_election_timeout
+ - 5
+ - - raft_is_candidate
+ - true
+ - - raft_is_enabled
+ - false
- - read_only
- false
- - readahead
diff --git a/test/box/cfg.result b/test/box/cfg.result
index bdd210b09..11358b2cd 100644
--- a/test/box/cfg.result
+++ b/test/box/cfg.result
@@ -55,6 +55,12 @@ cfg_filter(box.cfg)
| - 768
| - - pid_file
| - <hidden>
+ | - - raft_election_timeout
+ | - 5
+ | - - raft_is_candidate
+ | - true
+ | - - raft_is_enabled
+ | - false
| - - read_only
| - false
| - - readahead
@@ -162,6 +168,12 @@ cfg_filter(box.cfg)
| - 768
| - - pid_file
| - <hidden>
+ | - - raft_election_timeout
+ | - 5
+ | - - raft_is_candidate
+ | - true
+ | - - raft_is_enabled
+ | - false
| - - read_only
| - false
| - - readahead
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 6/8] raft: relay status updates to followers
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (4 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 5/8] raft: introduce box.cfg.raft_* options Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 7/8] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
` (2 subsequent siblings)
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
From: sergepetrenko <sergepetrenko@tarantool.org>
The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.
Part of #1146
Part of #5204
---
src/box/applier.cc | 35 ++++++++++++++--
src/box/box.cc | 21 +++++++++-
src/box/iproto_constants.h | 2 +
src/box/memtx_engine.c | 9 ++++-
src/box/raft.c | 73 ++++++++++++++++++++++++++++++++-
src/box/raft.h | 35 +++++++++++++++-
src/box/relay.cc | 62 +++++++++++++++++++++++++++-
src/box/relay.h | 7 ++++
src/box/xrow.c | 83 ++++++++++++++++++++++++++++++++------
src/box/xrow.h | 5 ++-
10 files changed, 306 insertions(+), 26 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 699b5a683..17e3ce1ae 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
#include "scoped_guard.h"
#include "txn_limbo.h"
#include "journal.h"
+#include "raft.h"
STRS(applier_state, applier_STATE);
@@ -315,6 +316,8 @@ apply_final_join_row(struct xrow_header *row)
*/
if (iproto_type_is_synchro_request(row->type))
return 0;
+ if (iproto_type_is_raft_request(row->type))
+ return 0;
struct txn *txn = txn_begin();
if (txn == NULL)
return -1;
@@ -894,6 +897,21 @@ err:
return -1;
}
+static int
+apply_raft_row(struct xrow_header *row)
+{
+ assert(iproto_type_is_raft_request(row->type));
+
+ struct raft_request req;
+ struct vclock candidate_clock;
+ if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
+ return -1;
+
+ raft_process_msg(&req);
+
+ return 0;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -1238,11 +1256,20 @@ applier_subscribe(struct applier *applier)
* In case of an heartbeat message wake a writer up
* and check applier state.
*/
- if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0)
- applier_signal_ack(applier);
- else if (applier_apply_tx(&rows) != 0)
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (apply_raft_row(first_row) != 0)
+ diag_raise();
+ } else {
+ applier_signal_ack(applier);
+ }
+ } else if (applier_apply_tx(&rows) != 0) {
diag_raise();
+ }
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index 5f04a1a78..427b771b3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -387,7 +387,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
}
if (iproto_type_is_raft_request(row->type)) {
struct raft_request raft_req;
- if (xrow_decode_raft(row, &raft_req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &raft_req, NULL) != 0)
diag_raise();
raft_process_recovery(&raft_req);
return;
@@ -2132,7 +2133,23 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
say_info("remote vclock %s local vclock %s",
vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+ if (raft_is_enabled()) {
+ /*
+ * Send out the current raft state of the instance. Don't do
+ * that if Raft is disabled. It can be that a part of the
+ * cluster still contains old versions, which can't handle Raft
+ * messages. So when it is disabled, its network footprint
+ * should be 0.
+ */
+ struct raft_request req;
+ /*
+ * Omit the candidate vclock, since we've just
+ * sent it in subscribe response.
+ */
+ raft_serialize(&req, NULL);
+ xrow_encode_raft(&row, &fiber()->gc, &req);
+ coio_write_xrow(io, &row);
+ }
/*
* Replica clock is used in gc state and recovery
* initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..3ec397d3c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,8 @@ extern const char *iproto_type_strs[];
enum iproto_raft_keys {
IPROTO_RAFT_TERM = 0,
IPROTO_RAFT_VOTE = 1,
+ IPROTO_RAFT_STATE = 2,
+ IPROTO_RAFT_VCLOCK = 3,
};
/**
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 31a8c260a..b0b744db8 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT);
struct raft_request req;
- if (xrow_decode_raft(row, &req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &req, NULL) != 0)
return -1;
raft_process_recovery(&req);
return 0;
@@ -516,7 +517,11 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
- raft_serialize(&ckpt->raft);
+ /*
+ * Don't encode vclock, because it is stored in the snapshot header
+ * anyway.
+ */
+ raft_serialize(&ckpt->raft, NULL);
ckpt->touch = false;
return ckpt;
}
diff --git a/src/box/raft.c b/src/box/raft.c
index 6fd5515f4..fd8853d6e 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,9 +34,20 @@
#include "journal.h"
#include "xrow.h"
#include "small/region.h"
+#include "replication.h"
+#include "relay.h"
+
+const char *raft_state_strs[] = {
+ NULL,
+ "follower",
+ "candidate",
+ "leader",
+};
/** Raft state of this instance. */
struct raft raft = {
+ .leader = 0,
+ .state = RAFT_STATE_FOLLOWER,
.is_enabled = false,
.is_candidate = false,
.term = 1,
@@ -50,13 +61,59 @@ raft_process_recovery(const struct raft_request *req)
raft.term = req->term;
if (req->vote != 0)
raft.vote = req->vote;
+ /*
+ * Role is never persisted. If recovery is happening, the
+ * node was restarted, and the former role can be false
+ * anyway.
+ */
+ assert(req->state == 0);
+ /*
+ * Vclock is always persisted by some other subsystem - WAL, snapshot.
+ * It is used only to decide to whom to give the vote during election,
+ * as a part of the volatile state.
+ */
+ assert(req->vclock == NULL);
+ /* Raft is not enabled until recovery is finished. */
+ assert(!raft_is_enabled());
+}
+
+void
+raft_process_msg(const struct raft_request *req)
+{
+ if (req->term > raft.term) {
+ // Update term.
+ // The logic will be similar, but the code
+ // below is for testing purposes.
+ raft.term = req->term;
+ }
+ if (req->vote > 0) {
+ // Check whether the vote's for us.
+ }
+ switch (req->state) {
+ case RAFT_STATE_FOLLOWER:
+ break;
+ case RAFT_STATE_CANDIDATE:
+ // Perform voting logic.
+ break;
+ case RAFT_STATE_LEADER:
+ // Switch to a new leader.
+ break;
+ default:
+ break;
+ }
}
void
-raft_serialize(struct raft_request *req)
+raft_serialize(struct raft_request *req, struct vclock *vclock)
{
+ memset(req, 0, sizeof(*req));
req->term = raft.term;
req->vote = raft.vote;
+ req->state = raft.state;
+ /*
+ * Raft does not own vclock, so it always expects it passed externally.
+ */
+ req->vclock = vclock;
}
static void
@@ -86,6 +143,9 @@ raft_write_request(const struct raft_request *req)
diag_log();
goto fail;
}
+
+ raft_broadcast(req);
+
region_truncate(region, svp);
return;
fail:
@@ -148,3 +208,14 @@ raft_vote(uint32_t vote_for)
req.vote = vote_for;
raft_write_request(&req);
}
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+ replicaset_foreach(replica) {
+ if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+ relay_get_state(replica->relay) == RELAY_FOLLOW) {
+ relay_push_raft(replica->relay, req);
+ }
+ }
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 2c4b5036c..0bf87e64b 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,8 +37,19 @@ extern "C" {
#endif
struct raft_request;
+struct vclock;
+
+enum raft_state {
+ RAFT_STATE_FOLLOWER = 1,
+ RAFT_STATE_CANDIDATE = 2,
+ RAFT_STATE_LEADER = 3,
+};
+
+extern const char *raft_state_strs[];
struct raft {
+ uint32_t leader;
+ enum raft_state state;
bool is_enabled;
bool is_candidate;
uint64_t term;
@@ -54,9 +65,24 @@ raft_new_term(uint64_t min_new_term);
void
raft_vote(uint32_t vote_for);
+static inline bool
+raft_is_enabled(void)
+{
+ return raft.is_enabled;
+}
+
+/** Process a raft entry stored in WAL/snapshot. */
void
raft_process_recovery(const struct raft_request *req);
+/** Process a raft status message coming from the network. */
+void
+raft_process_msg(const struct raft_request *req);
+
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
void
raft_cfg_is_enabled(bool is_enabled);
@@ -74,7 +100,14 @@ raft_cfg_death_timeout(void);
/** Save complete Raft state into the request. */
void
-raft_serialize(struct raft_request *req);
+raft_serialize(struct raft_request *req, struct vclock *vclock);
+
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
#if defined(__cplusplus)
}
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 124b0f52f..74581db9c 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -53,6 +53,7 @@
#include "xstream.h"
#include "wal.h"
#include "txn_limbo.h"
+#include "raft.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -770,13 +771,68 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+struct relay_raft_msg {
+ struct cmsg base;
+ struct cmsg_hop route;
+ struct raft_request req;
+ struct vclock vclock;
+ struct relay *relay;
+};
+
+static void
+relay_raft_msg_push(struct cmsg *base)
+{
+ struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+ struct xrow_header row;
+ xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+ try {
+ relay_send(msg->relay, &row);
+ } catch (Exception *e) {
+ relay_set_error(msg->relay, e);
+ fiber_cancel(fiber());
+ }
+ free(msg);
+}
+
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req)
+{
+ /*
+ * XXX: the message should be preallocated. It should
+ * work like Kharon in IProto. Relay should have 2 raft
+ * messages rotating. When one is sent, the other can be
+ * updated and a flag is set. When the first message is
+ * sent, the control returns to TX thread, sees the set
+ * flag, rotates the buffers, and sends it again. And so
+ * on. This is how it can work in future, with 0 heap
+ * allocations. Current solution with alloc-per-update is
+ * good enough as a start. Another option - wait until all
+ * is moved to WAL thread, where this will all happen
+ * in one thread and will be much simpler.
+ */
+ struct relay_raft_msg *msg =
+ (struct relay_raft_msg *)malloc(sizeof(*msg));
+ if (msg == NULL) {
+ panic("Couldn't allocate raft message");
+ return;
+ }
+ msg->req = *req;
+ if (req->vclock != NULL) {
+ msg->req.vclock = &msg->vclock;
+ vclock_copy(&msg->vclock, req->vclock);
+ }
+ msg->route.f = relay_raft_msg_push;
+ msg->route.pipe = NULL;
+ cmsg_init(&msg->base, &msg->route);
+ msg->relay = relay;
+ cpipe_push(&relay->relay_pipe, &msg->base);
+}
+
/** Send a single row to the client. */
static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type) ||
- iproto_type_is_synchro_request(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
@@ -793,6 +849,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
}
+ assert(iproto_type_is_dml(packet->type) ||
+ iproto_type_is_synchro_request(packet->type));
/* Check if the rows from the instance are filtered. */
if ((1 << packet->replica_id & relay->id_filter) != 0)
return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..b32e2ea2a 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -93,6 +93,13 @@ relay_vclock(const struct relay *relay);
double
relay_last_row_time(const struct relay *relay);
+/**
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
+ */
+void
+relay_push_raft(struct relay *relay, const struct raft_request *req);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..11fdacc0d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,11 +958,30 @@ int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_RAFT_TERM) +
- mp_sizeof_uint(r->term) +
- mp_sizeof_uint(IPROTO_RAFT_VOTE) +
- mp_sizeof_uint(r->vote);
+ /*
+ * Terms is encoded always. Sometimes the rest can be even ignored if
+ * the term is too old.
+ */
+ int map_size = 1;
+ size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term);
+ if (r->vote != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ }
+ if (r->state != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+ mp_sizeof_uint(r->state);
+ }
+ if (r->vclock != NULL) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
+ mp_sizeof_vclock_ignore0(r->vclock);
+ }
+ size += mp_sizeof_map(map_size);
+
char *buf = region_alloc(region, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,40 +993,78 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
row->body[0].iov_len = size;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
- buf = mp_encode_map(buf, 2);
+ buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
- buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
- buf = mp_encode_uint(buf, r->vote);
+ if (r->vote != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ }
+ if (r->state != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+ buf = mp_encode_uint(buf, r->state);
+ }
+ if (r->vclock != NULL) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
+ buf = mp_encode_vclock_ignore0(buf, r->vclock);
+ }
return 0;
}
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock)
{
- /* TODO: handle bad format. */
assert(row->type == IPROTO_RAFT);
- assert(row->bodycnt == 1);
- assert(row->group_id == GROUP_LOCAL);
+ if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "malformed raft request");
+ return -1;
+ }
memset(r, 0, sizeof(*r));
- const char *pos = row->body[0].iov_base;
+ r->vclock = vclock;
+
+ const char *begin = row->body[0].iov_base;
+ const char *end = begin + row->body[0].iov_len;
+ const char *pos = begin;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->vote = mp_decode_uint(&pos);
break;
+ case IPROTO_RAFT_STATE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
+ r->state = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VCLOCK:
+ if (r->vclock == NULL)
+ mp_next(&pos);
+ else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+ goto bad_msgpack;
+ break;
default:
mp_next(&pos);
break;
}
}
return 0;
+
+bad_msgpack:
+ xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+ return -1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..c627102dd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -267,6 +267,8 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
struct raft_request {
uint64_t term;
uint32_t vote;
+ uint32_t state;
+ struct vclock *vclock;
};
int
@@ -274,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r);
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock);
/**
* CALL/EVAL request.
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 7/8] [tosquash] raft: pass source instance_id to raft_process_msg()
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (5 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 6/8] raft: relay status updates to followers Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 8/8] raft: state machine Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
Instance ID of the sender is needed in order to
- be able to vote for him;
- be able to remember its ID as leader ID, when it is a leader.
---
src/box/applier.cc | 9 ++++-----
src/box/raft.c | 3 ++-
src/box/raft.h | 8 ++++++--
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 17e3ce1ae..8de2f799b 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -898,7 +898,7 @@ err:
}
static int
-apply_raft_row(struct xrow_header *row)
+applier_handle_raft(struct applier *applier, struct xrow_header *row)
{
assert(iproto_type_is_raft_request(row->type));
@@ -906,9 +906,7 @@ apply_raft_row(struct xrow_header *row)
struct vclock candidate_clock;
if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
return -1;
-
- raft_process_msg(&req);
-
+ raft_process_msg(&req, applier->instance_id);
return 0;
}
@@ -1262,7 +1260,8 @@ applier_subscribe(struct applier *applier)
if (first_row->lsn == 0) {
if (unlikely(iproto_type_is_raft_request(
first_row->type))) {
- if (apply_raft_row(first_row) != 0)
+ if (applier_handle_raft(applier,
+ first_row) != 0)
diag_raise();
} else {
applier_signal_ack(applier);
diff --git a/src/box/raft.c b/src/box/raft.c
index fd8853d6e..1acffb677 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -78,8 +78,9 @@ raft_process_recovery(const struct raft_request *req)
}
void
-raft_process_msg(const struct raft_request *req)
+raft_process_msg(const struct raft_request *req, uint32_t source)
{
+ (void)source;
if (req->term > raft.term) {
// Update term.
// The logic will be similar, but the code
diff --git a/src/box/raft.h b/src/box/raft.h
index 0bf87e64b..d875707de 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -75,9 +75,13 @@ raft_is_enabled(void)
void
raft_process_recovery(const struct raft_request *req);
-/** Process a raft status message coming from the network. */
+/**
+ * Process a raft status message coming from the network.
+ * @param req Raft request.
+ * @param source Instance ID of the message sender.
+ */
void
-raft_process_msg(const struct raft_request *req);
+raft_process_msg(const struct raft_request *req, uint32_t source);
/**
* Broadcast the changes in this instance's raft status to all
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* [Tarantool-patches] [PATCH 8/8] raft: state machine
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (6 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 7/8] [tosquash] raft: pass source instance_id to raft_process_msg() Vladislav Shpilevoy
@ 2020-09-02 23:33 ` Vladislav Shpilevoy
2020-09-03 22:51 ` [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-02 23:33 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
First look at Raft state machine implementation.
The commit is a draft. It does not contain any tests, and
unlikely passes the existing tests. But gives a picture of where
Raft is going.
---
src/box/applier.cc | 18 +-
src/box/box.cc | 5 +-
src/box/lua/misc.cc | 25 +-
src/box/raft.c | 645 +++++++++++++++++++++++++++++++++++++++++---
src/box/raft.h | 89 ++++++
src/box/relay.cc | 19 ++
6 files changed, 746 insertions(+), 55 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 8de2f799b..7486d9929 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -916,8 +916,21 @@ applier_handle_raft(struct applier *applier, struct xrow_header *row)
* Return 0 for success or -1 in case of an error.
*/
static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct applier *applier, struct stailq *rows)
{
+ /*
+ * Rows received not directly from a leader are ignored. That is a
+ * protection against the case when an old leader keeps sending data
+ * around not knowing yet that it is not a leader anymore.
+ *
+ * XXX: it may be that this can be fine to apply leader transactions by
+ * looking at their replica_id field if it is equal to leader id. That
+ * can be investigated as an 'optimization'. Even though may not give
+ * anything, because won't change total number of rows sent in the
+ * network anyway.
+ */
+ if (!raft_is_source_allowed(applier->instance_id))
+ return 0;
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
struct xrow_header *last_row;
@@ -1257,6 +1270,7 @@ applier_subscribe(struct applier *applier)
struct xrow_header *first_row =
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
+ raft_process_heartbeat(applier->instance_id);
if (first_row->lsn == 0) {
if (unlikely(iproto_type_is_raft_request(
first_row->type))) {
@@ -1266,7 +1280,7 @@ applier_subscribe(struct applier *applier)
} else {
applier_signal_ack(applier);
}
- } else if (applier_apply_tx(&rows) != 0) {
+ } else if (applier_apply_tx(applier, &rows) != 0) {
diag_raise();
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 427b771b3..2e9c90310 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -157,7 +157,7 @@ void
box_update_ro_summary(void)
{
bool old_is_ro_summary = is_ro_summary;
- is_ro_summary = is_ro || is_orphan;
+ is_ro_summary = is_ro || is_orphan || raft_is_ro();
/* In 99% nothing changes. Filter this out first. */
if (is_ro_summary == old_is_ro_summary)
return;
@@ -2646,6 +2646,7 @@ box_init(void)
txn_limbo_init();
sequence_init();
+ raft_init();
}
bool
@@ -2794,6 +2795,8 @@ box_cfg_xc(void)
if (!is_bootstrap_leader)
replicaset_sync();
+ else if (raft_is_enabled())
+ raft_bootstrap_leader();
/* box.cfg.read_only is not read yet. */
assert(box_is_ro());
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 98e98abe2..efbbcfd1f 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -256,24 +256,26 @@ lbox_raft_new_term(struct lua_State *L)
return 0;
}
-static int
-lbox_raft_vote(struct lua_State *L)
-{
- uint64_t vote_for = luaL_checkuint64(L, 1);
- if (vote_for > UINT32_MAX)
- return luaL_error(L, "Invalid vote");
- raft_vote(vote_for);
- return 0;
-}
-
static int
lbox_raft_get(struct lua_State *L)
{
- lua_createtable(L, 0, 2);
+ lua_createtable(L, 0, 8);
luaL_pushuint64(L, raft.term);
lua_setfield(L, -2, "term");
+ luaL_pushuint64(L, raft.volatile_term);
+ lua_setfield(L, -2, "volatile_term");
luaL_pushuint64(L, raft.vote);
lua_setfield(L, -2, "vote");
+ luaL_pushuint64(L, raft.volatile_vote);
+ lua_setfield(L, -2, "volatile_vote");
+ lua_pushstring(L, raft_state_strs[raft.state]);
+ lua_setfield(L, -2, "state");
+ lua_pushinteger(L, raft.vote_count);
+ lua_setfield(L, -2, "vote_count");
+ lua_pushboolean(L, raft.is_write_in_progress);
+ lua_setfield(L, -2, "is_write_in_progress");
+ lua_pushboolean(L, raft.is_candidate);
+ lua_setfield(L, -2, "is_candidate");
return 1;
}
@@ -285,7 +287,6 @@ box_lua_misc_init(struct lua_State *L)
{"new_tuple_format", lbox_tuple_format_new},
/* Temporary helpers to sanity test raft persistency. */
{"raft_new_term", lbox_raft_new_term},
- {"raft_vote", lbox_raft_vote},
{"raft_get", lbox_raft_get},
{NULL, NULL}
};
diff --git a/src/box/raft.c b/src/box/raft.c
index 1acffb677..6f2891291 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -36,6 +36,12 @@
#include "small/region.h"
#include "replication.h"
#include "relay.h"
+#include "box.h"
+
+/**
+ * Maximal random deviation of the election timeout. From the configured value.
+ */
+#define RAFT_RANDOM_ELECTION_FACTOR 0.1
const char *raft_state_strs[] = {
NULL,
@@ -48,19 +54,163 @@ const char *raft_state_strs[] = {
struct raft raft = {
.leader = 0,
.state = RAFT_STATE_FOLLOWER,
+ .volatile_term = 1,
+ .volatile_vote = 0,
.is_enabled = false,
.is_candidate = false,
+ .is_cfg_candidate = false,
+ .is_write_in_progress = false,
.term = 1,
.vote = 0,
+ .vote_mask = 0,
+ .vote_count = 0,
+ .election_timeout = 5,
};
+/**
+ * Check if Raft is completely synced with disk. Meaning all its critical values
+ * are in WAL. Only in that state the node can become a leader or a candidate.
+ * If the node has a not flushed data, it means either the term was bumped, or
+ * a new vote was made.
+ *
+ * In case of term bump it means either there is another node with a newer term,
+ * and this one should be a follower; or this node bumped the term itself along
+ * with making a vote to start a new election - then it is also a follower which
+ * will turn into a candidate when the flush is done.
+ *
+ * In case of a new not flushed vote it means either this node voted for some
+ * other node, and must be a follower; or it voted or self, and also must be a
+ * follower, but will become a candidate when the flush is done.
+ *
+ * In total - when something is not synced with disk, the instance is a follower
+ * in any case.
+ */
+static bool
+raft_is_fully_on_disk(void)
+{
+ return raft.volatile_term == raft.term &&
+ raft.volatile_vote == raft.vote;
+}
+
+/**
+ * Raft protocol says that election timeout should be a bit randomized so as
+ * the nodes wouldn't start election at the same time and end up with not having
+ * a quorum for anybody. This implementation randomizes the election timeout by
+ * adding {election timeout * random factor} value, where the factor is a
+ * constant floating point value > 0.
+ */
+static inline double
+raft_new_random_election_shift(void)
+{
+ double timeout = raft.election_timeout;
+ /* Translate to ms. */
+ uint32_t rand_part =
+ (uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
+ if (rand_part == 0)
+ rand_part = 1;
+ /*
+ * XXX: this is not giving a good distribution, but it is not so trivial
+ * to implement a correct random value generator. There is a task to
+ * unify all such places. Not critical here.
+ */
+ rand_part = rand() % (rand_part + 1);
+ return rand_part / 1000.0;
+}
+
+/**
+ * Raft says that during election a node1 can vote for node2, if node2 has a
+ * bigger term, or has the same term but longer log. In case of Tarantool it
+ * means the node2 vclock should be >= node1 vclock, in all components. It is
+ * not enough to compare only one component. At least because there may be not
+ * a previous leader when the election happens first time. Or a node could
+ * restart and forget who the previous leader was.
+ */
+static inline bool
+raft_can_vote_for(const struct vclock *v)
+{
+ if (v == NULL)
+ return false;
+ int cmp = vclock_compare_ignore0(v, &replicaset.vclock);
+ return cmp == 0 || cmp == 1;
+}
+
+/** Broadcast an event about this node changed its state to all relays. */
+static inline void
+raft_broadcast_new_state(void)
+{
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.term = raft.term;
+ req.state = raft.state;
+ raft_broadcast(&req);
+}
+
+/** Raft state machine methods. 'sm' stands for State Machine. */
+
+/**
+ * Start the state machine. When it is stopped, Raft state is updated and
+ * goes to WAL when necessary, but it does not affect the instance operation.
+ * For example, when Raft is stopped, the instance role does not affect whether
+ * it is writable.
+ */
+static void
+raft_sm_start(void);
+
+/**
+ * Stop the state machine.
+ * - Raft stops affecting the instance operation;
+ * - this node can't become a leader anymore;
+ * - this node can't vote.
+ */
+static void
+raft_sm_stop(void);
+
+/**
+ * When the instance is a follower but is allowed to be a leader, it will wait
+ * for death of the current leader to start new election.
+ */
+static void
+raft_sm_wait_leader_dead(void);
+
+/**
+ * If election is started by this node, or it voted for some other node started
+ * the election, and it can be a leader itself, it will wait until the current
+ * election times out. When it happens, the node will start new election.
+ */
+static void
+raft_sm_wait_election_end(void);
+
+/** Bump volatile term and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term);
+
+/** Bump volatile vote and schedule its flush to disk. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote);
+
+/** Bump volatile term, vote for self, and schedule their flush to disk. */
+static void
+raft_sm_schedule_new_election(void);
+
+/**
+ * The main trigger of Raft state machine - start new election when the current
+ * leader dies, or when there is no a leader and the previous election failed.
+ */
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events);
+
void
raft_process_recovery(const struct raft_request *req)
{
- if (req->term != 0)
+ if (req->term != 0) {
raft.term = req->term;
- if (req->vote != 0)
+ raft.volatile_term = req->term;
+ }
+ if (req->vote != 0) {
raft.vote = req->vote;
+ raft.volatile_vote = req->vote;
+ }
/*
* Role is never persisted. If recovery is happening, the
* node was restarted, and the former role can be false
@@ -80,34 +230,136 @@ raft_process_recovery(const struct raft_request *req)
void
raft_process_msg(const struct raft_request *req, uint32_t source)
{
- (void)source;
- if (req->term > raft.term) {
- // Update term.
- // The logic will be similar, but the code
- // below is for testing purposes.
- raft.term = req->term;
- }
- if (req->vote > 0) {
- // Check whether the vote's for us.
+ assert(source > 0);
+ assert(source != instance_id);
+ /* Outdated request. */
+ if (req->term < raft.volatile_term)
+ return;
+
+ enum raft_state old_state = raft.state;
+
+ /* Term bump. */
+ if (req->term > raft.volatile_term)
+ raft_sm_schedule_new_term(req->term);
+
+ /* Vote request during the on-going election. */
+ if (req->vote != 0) {
+ switch (raft.state) {
+ case RAFT_STATE_FOLLOWER:
+ case RAFT_STATE_LEADER:
+ /*
+ * Can't respond on vote requests when Raft is disabled.
+ */
+ if (!raft.is_enabled)
+ break;
+ /* Check if already voted in this term. */
+ if (raft.volatile_vote != 0)
+ break;
+ /* Not a candidate. Can't accept votes. */
+ if (req->vote == instance_id)
+ break;
+ /* Can't vote for too old or incomparable nodes. */
+ if (!raft_can_vote_for(req->vclock))
+ break;
+ /*
+ * Either the term is new, or didn't vote in the current
+ * term yet. Anyway can vote now.
+ */
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_sm_schedule_new_vote(req->vote);
+ break;
+ case RAFT_STATE_CANDIDATE:
+ /* Check if this is a vote for a competing candidate. */
+ if (req->vote != instance_id)
+ break;
+ /*
+ * Vote for self was requested earlier in this round,
+ * and now was answered by some other instance.
+ */
+ assert(raft.volatile_vote == instance_id);
+ bool was_set = bit_set(&raft.vote_mask, source);
+ raft.vote_count += !was_set;
+ if (raft.vote_count < replication_synchro_quorum)
+ break;
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ break;
+ default:
+ unreachable();
+ }
}
- switch (req->state) {
- case RAFT_STATE_FOLLOWER:
- break;
- case RAFT_STATE_CANDIDATE:
- // Perform voting logic.
- break;
- case RAFT_STATE_LEADER:
- // Switch to a new leader.
- break;
- default:
- break;
+ /*
+ * If the node does not claim to be a leader, nothing interesting. Terms
+ * and votes are already handled.
+ */
+ if (req->state != RAFT_STATE_LEADER)
+ goto end;
+ /* The node is a leader, but it is already known. */
+ if (source == raft.leader)
+ goto end;
+ /*
+ * XXX: A message from a conflicting leader. Split brain, basically.
+ * Need to decide what to do. Current solution is to do nothing. In
+ * future either this node should try to become a leader, or should stop
+ * all writes and require manual intervention.
+ */
+ if (raft.leader != 0)
+ goto end;
+
+ /* New leader was elected. */
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft.leader = source;
+end:
+ if (raft.state != old_state) {
+ /*
+ * If the node stopped being a leader - should become read-only.
+ * If became a leader - should become read-write (if other
+ * subsystems also allow read-write).
+ */
+ box_update_ro_summary();
+ /*
+ * New term and vote are not broadcasted yet. Firstly their WAL
+ * write should be finished. But the state is volatile. It is ok
+ * to broadcast it now.
+ */
+ raft_broadcast_new_state();
}
}
+void
+raft_process_heartbeat(uint32_t source)
+{
+ /*
+ * When not a candidate - don't wait for anything. Therefore does not
+ * care about the leader being dead.
+ */
+ if (!raft.is_candidate)
+ return;
+ /* Don't care about heartbeats when this node is a leader itself. */
+ if (raft.state == RAFT_STATE_LEADER)
+ return;
+ /* Not interested in heartbeats from not a leader. */
+ if (raft.leader != source)
+ return;
+ /*
+ * XXX: it may be expensive to reset the timer like that. It may be less
+ * expensive to let the timer work, and remember last timestamp when
+ * anything was heard from the leader. Then in the timer callback check
+ * the timestamp, and restart the timer, if it is fine.
+ */
+ assert(ev_is_active(&raft.timer));
+ ev_timer_stop(loop(), &raft.timer);
+ raft_sm_wait_leader_dead();
+}
+
void
raft_serialize(struct raft_request *req, struct vclock *vclock)
{
memset(req, 0, sizeof(*req));
+ /*
+ * Volatile state is never used for any communications.
+ * Use only persisted state.
+ */
req->term = raft.term;
req->vote = raft.vote;
req->state = raft.state;
@@ -117,15 +369,23 @@ raft_serialize(struct raft_request *req, struct vclock *vclock)
req->vclock = vclock;
}
+/** Wakeup Raft state writer fiber waiting for WAL write end. */
static void
raft_write_cb(struct journal_entry *entry)
{
fiber_wakeup(entry->complete_data);
}
+/** Synchronously write a Raft request into WAL. */
static void
raft_write_request(const struct raft_request *req)
{
+ assert(raft.is_write_in_progress);
+ /*
+ * Vclock is never persisted by Raft. It is used only to
+ * be sent to network when vote for self.
+ */
+ assert(req->vclock == NULL);
struct region *region = &fiber()->gc;
uint32_t svp = region_used(region);
struct xrow_header row;
@@ -157,57 +417,343 @@ fail:
panic("Could not write a raft request to WAL\n");
}
+/**
+ * Flush Raft state changes to WAL. The callback resets itself, if during the
+ * write more changes appear.
+ */
+static void
+raft_sm_dump_step(ev_loop *loop, ev_check *watcher, int events)
+{
+ assert(watcher == &raft.io);
+ (void) events;
+ assert(raft.is_write_in_progress);
+ /* During write Raft can't be anything but a follower. */
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ struct raft_request req;
+ uint64_t old_term = raft.term;
+ uint32_t old_vote = raft.vote;
+ enum raft_state old_state = raft.state;
+
+ if (raft_is_fully_on_disk()) {
+end_dump:
+ raft.is_write_in_progress = false;
+ ev_check_stop(loop, watcher);
+ /*
+ * The state machine is stable. Can see now, to what state to
+ * go.
+ */
+ if (!raft.is_candidate) {
+ /*
+ * If not a candidate, can't do anything except vote for
+ * somebody (if Raft is enabled). Nothing to do except
+ * staying a follower without timeouts.
+ */
+ } else if (raft.leader != 0) {
+ /* There is a known leader. Wait until it is dead. */
+ raft_sm_wait_leader_dead();
+ } else if (raft.vote == instance_id) {
+ /* Just wrote own vote. */
+ if (replication_synchro_quorum == 1) {
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ /*
+ * Make read-write (if other subsystems allow
+ * that).
+ */
+ box_update_ro_summary();
+ } else {
+ raft.state = RAFT_STATE_CANDIDATE;
+ raft.vote_count = 1;
+ raft.vote_mask = 0;
+ raft_sm_wait_election_end();
+ }
+ } else if (raft.vote != 0) {
+ /*
+ * Voted for some other node. Wait if it manages to
+ * become a leader.
+ */
+ raft_sm_wait_election_end();
+ } else {
+ /* No leaders, no votes. */
+ raft_sm_schedule_new_election();
+ }
+ } else {
+ memset(&req, 0, sizeof(req));
+ assert(raft.volatile_term >= raft.term);
+ /* Term is written always. */
+ req.term = raft.volatile_term;
+ if (raft.volatile_vote != 0 && raft.volatile_vote != raft.vote)
+ req.vote = raft.volatile_vote;
+
+ raft_write_request(&req);
+
+ assert(req.term >= raft.term);
+ if (req.term > raft.term) {
+ raft.term = req.term;
+ raft.vote = 0;
+ }
+ if (req.vote != 0) {
+ assert(raft.vote == 0);
+ raft.vote = req.vote;
+ }
+ if (raft_is_fully_on_disk())
+ goto end_dump;
+ }
+
+ memset(&req, 0, sizeof(req));
+ /* Term is encoded always. */
+ req.term = raft.term;
+ bool has_changes = old_term != raft.term;
+ if (raft.vote != 0 && old_vote != raft.vote) {
+ req.vote = raft.vote;
+ /*
+ * When vote for self, need to send current vclock too. Two
+ * reasons for that:
+ *
+ * - nodes need to vote for the instance containing the newest
+ * data. So as not to loose it, because some of it may be
+ * confirmed by the synchronous replication;
+ *
+ * - replication is basically stopped during election. Other
+ * nodes can't learn vclock of this instance through regular
+ * replication.
+ */
+ if (raft.vote == instance_id)
+ req.vclock = &replicaset.vclock;
+ has_changes = true;
+ }
+ if (raft.state != old_state) {
+ req.state = raft.state;
+ has_changes = true;
+ }
+ if (has_changes)
+ raft_broadcast(&req);
+}
+
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void)
+{
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ if (raft.is_write_in_progress)
+ return;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_check_start(loop(), &raft.io);
+ raft.is_write_in_progress = true;
+}
+
+/** Bump term, reset Raft state, and persist that fact. */
+static void
+raft_sm_schedule_new_term(uint64_t new_term)
+{
+ assert(new_term > raft.volatile_term);
+ assert(raft.volatile_term >= raft.term);
+ raft.volatile_term = new_term;
+ /* New terms means completely new Raft state. */
+ raft.volatile_vote = 0;
+ raft.leader = 0;
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_sm_pause_and_dump();
+}
+
+/** Vote in the current term, and persist that fact. */
+static void
+raft_sm_schedule_new_vote(uint32_t new_vote)
+{
+ assert(raft.volatile_vote == 0);
+ raft.volatile_vote = new_vote;
+ raft_sm_pause_and_dump();
+}
+
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
+static void
+raft_sm_schedule_new_election(void)
+{
+ assert(raft_is_fully_on_disk());
+ assert(raft.is_candidate);
+ assert(raft.leader == 0);
+ /* Everyone is a follower until its vote for self is persisted. */
+ raft_sm_schedule_new_term(raft.term + 1);
+ raft_sm_schedule_new_vote(instance_id);
+ box_update_ro_summary();
+}
+
void
raft_new_term(uint64_t min_new_term)
{
+ uint64_t new_term;
if (raft.term < min_new_term)
- raft.term = min_new_term + 1;
+ new_term = min_new_term + 1;
else
- ++raft.term;
+ new_term = raft.term + 1;
+ enum raft_state old_state = raft.state;
+ raft_sm_schedule_new_term(new_term);
+ if (raft.state != old_state)
+ raft_broadcast_new_state();
+ box_update_ro_summary();
+}
+
+static void
+raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
+ int events)
+{
+ assert(timer == &raft.timer);
+ (void)events;
+ ev_timer_stop(loop, timer);
+ raft_sm_schedule_new_election();
+}
+
+static void
+raft_sm_wait_leader_dead(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ assert(raft.leader != 0);
+ double death_timeout = replication_disconnect_timeout();
+ ev_timer_set(&raft.timer, death_timeout, death_timeout);
+}
+static void
+raft_sm_wait_election_end(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(raft.is_candidate);
+ assert(raft.state == RAFT_STATE_FOLLOWER ||
+ (raft.state == RAFT_STATE_CANDIDATE &&
+ raft.volatile_vote == instance_id));
+ assert(raft.leader == 0);
+ double election_timeout = raft.election_timeout +
+ raft_new_random_election_shift();
+ ev_timer_set(&raft.timer, election_timeout, election_timeout);
+}
+
+static void
+raft_sm_start(void)
+{
+ assert(!ev_is_active(&raft.timer));
+ assert(!ev_is_active(&raft.io));
+ assert(!raft.is_write_in_progress);
+ assert(!raft.is_enabled);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.is_enabled = true;
+ raft.is_candidate = raft.is_cfg_candidate;
+ if (!raft.is_candidate)
+ /* Nop. */;
+ else if (raft.leader != 0)
+ raft_sm_wait_leader_dead();
+ else
+ raft_sm_schedule_new_election();
+ box_update_ro_summary();
+ /*
+ * When Raft is enabled, send the complete state. Because
+ * it wasn't sent in disabled state.
+ */
struct raft_request req;
- memset(&req, 0, sizeof(req));
- req.term = raft.term;
- raft_write_request(&req);
+ raft_serialize(&req, NULL);
+ raft_broadcast(&req);
+}
+
+static void
+raft_sm_stop(void)
+{
+ assert(raft.is_enabled);
+ raft.is_enabled = false;
+ raft.is_candidate = false;
+ box_update_ro_summary();
}
void
raft_cfg_is_enabled(bool is_enabled)
{
- raft.is_enabled = is_enabled;
+ if (is_enabled == raft.is_enabled)
+ return;
+
+ if (!is_enabled)
+ raft_sm_stop();
+ else
+ raft_sm_start();
}
void
raft_cfg_is_candidate(bool is_candidate)
{
- raft.is_candidate = is_candidate;
+ bool old_is_candidate = raft.is_candidate;
+ raft.is_cfg_candidate = is_candidate;
+ raft.is_candidate = is_candidate && raft.is_enabled;
+ if (raft.is_candidate == old_is_candidate)
+ return;
+
+ if (raft.is_candidate) {
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ /*
+ * If there is an on-going WAL write, it means
+ * there was some node who sent newer data to this
+ * node.
+ */
+ if (raft.leader == 0 && raft_is_fully_on_disk())
+ raft_sm_schedule_new_election();
+ } else if (raft.state != RAFT_STATE_FOLLOWER) {
+ raft.state = RAFT_STATE_FOLLOWER;
+ raft_broadcast_new_state();
+ }
+ box_update_ro_summary();
}
void
raft_cfg_election_timeout(double timeout)
{
+ if (timeout == raft.election_timeout)
+ return;
+
raft.election_timeout = timeout;
+ if (raft.vote != 0 && raft.leader == 0) {
+ assert(ev_is_active(&raft.timer));
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + raft.election_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ }
}
void
raft_cfg_election_quorum(void)
{
+ if (raft.state != RAFT_STATE_CANDIDATE ||
+ raft.state == RAFT_STATE_LEADER)
+ return;
+ if (raft.vote_count < replication_synchro_quorum)
+ return;
+ /*
+ * The node is a candidate. It means its state if fully synced with
+ * disk. Otherwise it would be a follower.
+ */
+ assert(!raft.is_write_in_progress);
+ raft.state = RAFT_STATE_LEADER;
+ raft.leader = instance_id;
+ raft_broadcast_new_state();
+ box_update_ro_summary();
}
void
raft_cfg_death_timeout(void)
{
-}
-
-void
-raft_vote(uint32_t vote_for)
-{
- raft.vote = vote_for;
-
- struct raft_request req;
- memset(&req, 0, sizeof(req));
- req.vote = vote_for;
- raft_write_request(&req);
+ if (raft.state == RAFT_STATE_FOLLOWER && raft.is_candidate &&
+ raft.leader != 0) {
+ assert(ev_is_active(&raft.timer));
+ double death_timeout = replication_disconnect_timeout();
+ double timeout = ev_timer_remaining(loop(), &raft.timer) -
+ raft.timer.at + death_timeout;
+ ev_timer_stop(loop(), &raft.timer);
+ ev_timer_set(&raft.timer, timeout, timeout);
+ }
}
void
@@ -220,3 +766,22 @@ raft_broadcast(const struct raft_request *req)
}
}
}
+
+void
+raft_bootstrap_leader(void)
+{
+ assert(raft.is_enabled);
+ assert(raft.volatile_term == 0);
+ assert(raft.volatile_vote == 0);
+ assert(raft.state == RAFT_STATE_FOLLOWER);
+ raft.state = RAFT_STATE_LEADER;
+ raft_broadcast_new_state();
+ box_update_ro_summary();
+}
+
+void
+raft_init(void)
+{
+ ev_timer_init(&raft.timer, raft_sm_schedule_new_election_cb, 0, 0);
+ ev_check_init(&raft.io, raft_sm_dump_step);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index d875707de..57584bc1b 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -31,6 +31,7 @@
*/
#include <stdint.h>
#include <stdbool.h>
+#include "tarantool_ev.h"
#if defined(__cplusplus)
extern "C" {
@@ -48,12 +49,62 @@ enum raft_state {
extern const char *raft_state_strs[];
struct raft {
+ /** Instance ID of leader of the current term. */
uint32_t leader;
+ /** State of the instance. */
enum raft_state state;
+ /**
+ * Volatile part of the Raft state, whose WAL write may be
+ * still in-progress, and yet the state may be already
+ * used. Volatile state is never sent to anywhere, but the
+ * state machine makes decisions based on it. That is
+ * vital.
+ * As an example, volatile vote needs to be used to reject
+ * votes inside a term, where the instance already voted
+ * (even if the vote WAL write is not finished yet).
+ * Otherwise the instance would try to write several votes
+ * inside one term.
+ */
+ uint64_t volatile_term;
+ uint32_t volatile_vote;
+ /**
+ * Flag whether Raft is enabled. When disabled, it still
+ * persists terms so as to quickly enroll into the cluster
+ * when (if) it is enabled. In everything else disabled
+ * Raft does not affect instance work.
+ */
bool is_enabled;
+ /**
+ * Flag whether the node can become a leader. It is an
+ * accumulated value of configuration options Raft enabled
+ * Raft candidate. If at least one is false - the instance
+ * is not a candidate.
+ */
bool is_candidate;
+ /** Flag whether the instance is allowed to be a leader. */
+ bool is_cfg_candidate;
+ /**
+ * Flag whether Raft currently tries to write something into WAL. It
+ * happens asynchronously, not right after Raft state is updated.
+ */
+ bool is_write_in_progress;
+ /**
+ * Persisted Raft state. These values are used when need to tell current
+ * Raft state to other nodes.
+ */
uint64_t term;
uint32_t vote;
+ /** Bit 1 means that a vote from that instance was obtained. */
+ uint32_t vote_mask;
+ /** Number of votes for this instance. Valid only in candidate state. */
+ int vote_count;
+ /** State machine timed event trigger. */
+ struct ev_timer timer;
+ /**
+ * Dump of Raft state in the end of event loop, when it is changed.
+ */
+ struct ev_check io;
+ /** Configured election timeout in seconds. */
double election_timeout;
};
@@ -65,6 +116,18 @@ raft_new_term(uint64_t min_new_term);
void
raft_vote(uint32_t vote_for);
+static inline bool
+raft_is_ro(void)
+{
+ return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
+}
+
+static inline bool
+raft_is_source_allowed(uint32_t source_id)
+{
+ return !raft.is_enabled || raft.leader == source_id;
+}
+
static inline bool
raft_is_enabled(void)
{
@@ -83,6 +146,9 @@ raft_process_recovery(const struct raft_request *req);
void
raft_process_msg(const struct raft_request *req, uint32_t source);
+void
+raft_process_heartbeat(uint32_t source);
+
/**
* Broadcast the changes in this instance's raft status to all
* the followers.
@@ -113,6 +179,29 @@ raft_serialize(struct raft_request *req, struct vclock *vclock);
void
raft_broadcast(const struct raft_request *req);
+/**
+ * Bootstrap the current instance as the first leader of the cluster. That is
+ * done bypassing the Raft election protocol, by just assigning this node a
+ * leader role. That is needed, because when the cluster is not bootstrapped, it
+ * is necessary to find a node, which will generate a replicaset UUID, write it
+ * into _cluster space, and register all the other nodes in _cluster.
+ * Until it is done, all nodes but one won't boot. Their WALs won't work. And
+ * therefore they won't be able to participate in leader election. That
+ * effectively makes the cluster dead from the beginning unless the first
+ * bootstrapped node won't declare itself a leader without elections.
+ *
+ * XXX: That does not solve the problem, when the first node boots, creates a
+ * snapshot, and then immediately dies. After recovery it won't declare itself a
+ * leader. Therefore if quorum > 1, the cluster won't proceed to registering
+ * any replicas and becomes completely dead. Perhaps that must be solved by
+ * truncating quorum down to number of records in _cluster.
+ */
+void
+raft_bootstrap_leader(void);
+
+void
+raft_init(void);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74581db9c..4f9bbc0de 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -771,6 +771,23 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+/**
+ * Recreate recovery cursor from the last confirmed point. That is
+ * used by Raft, when the node becomes a leader. It may happen,
+ * that it already sent some data to other nodes as a follower,
+ * and they ignored the data. Now when the node is a leader, it
+ * should send the not confirmed data again. Otherwise the cluster
+ * will stuck, or worse - the newer data would be sent without the
+ * older sent but ignored data.
+ */
+static void
+relay_restart_recovery(struct relay *relay)
+{
+ recovery_delete(relay->r);
+ relay->r = recovery_new(wal_dir(), false, &relay->recv_vclock);
+ recover_remaining_wals(relay->r, &relay->stream, NULL, true);
+}
+
struct relay_raft_msg {
struct cmsg base;
struct cmsg_hop route;
@@ -786,6 +803,8 @@ relay_raft_msg_push(struct cmsg *base)
struct xrow_header row;
xrow_encode_raft(&row, &fiber()->gc, &msg->req);
try {
+ if (msg->req.state == RAFT_STATE_LEADER)
+ relay_restart_recovery(msg->relay);
relay_send(msg->relay, &row);
} catch (Exception *e) {
relay_set_error(msg->relay, e);
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 10+ messages in thread
* Re: [Tarantool-patches] [PATCH 0/8] dRaft
2020-09-02 23:33 [Tarantool-patches] [PATCH 0/8] dRaft Vladislav Shpilevoy
` (7 preceding siblings ...)
2020-09-02 23:33 ` [Tarantool-patches] [PATCH 8/8] raft: state machine Vladislav Shpilevoy
@ 2020-09-03 22:51 ` Vladislav Shpilevoy
8 siblings, 0 replies; 10+ messages in thread
From: Vladislav Shpilevoy @ 2020-09-03 22:51 UTC (permalink / raw)
To: tarantool-patches, sergepetrenko, gorcunov
I made a few changes to the existing commits and added 2 new.
Here is the complete diff. The patchset is resent in v2 thread.
See the diff below with my comments inlined.
====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 7486d9929..c7c486ee4 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1277,9 +1277,8 @@ applier_subscribe(struct applier *applier)
if (applier_handle_raft(applier,
first_row) != 0)
diag_raise();
- } else {
- applier_signal_ack(applier);
}
+ applier_signal_ack(applier);
==================================================
I made the ACK sent always. Because a Raft message could not change state of the
instance, wouldn't cause a WAL write, and wouldn't generate an ACK after all.
==================================================
} else if (applier_apply_tx(applier, &rows) != 0) {
diag_raise();
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 2e9c90310..9d0782fff 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -171,6 +171,10 @@ static int
box_check_writable(void)
{
if (is_ro_summary) {
+ /*
+ * XXX: return a special error when the node is not a leader to
+ * reroute to the leader node.
+ */
diag_set(ClientError, ER_READONLY);
diag_log();
return -1;
@@ -2143,10 +2147,10 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
*/
struct raft_request req;
/*
- * Omit the candidate vclock, since we've just
- * sent it in subscribe response.
+ * Omit the candidate vclock, since we've just sent it in
+ * subscribe response.
==================================================
Many comments were realigned from 66 to 80.
==================================================
*/
- raft_serialize(&req, NULL);
+ raft_serialize_for_network(&req, NULL);
xrow_encode_raft(&row, &fiber()->gc, &req);
coio_write_xrow(io, &row);
}
@@ -2795,8 +2799,6 @@ box_cfg_xc(void)
if (!is_bootstrap_leader)
replicaset_sync();
- else if (raft_is_enabled())
- raft_bootstrap_leader();
/* box.cfg.read_only is not read yet. */
assert(box_is_ro());
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 1c131caec..8e1dbd497 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -49,6 +49,7 @@
#include "main.h"
#include "version.h"
#include "box/box.h"
+#include "box/raft.h"
#include "lua/utils.h"
#include "fiber.h"
#include "tt_static.h"
@@ -577,6 +578,21 @@ lbox_info_listen(struct lua_State *L)
return 1;
}
+static int
+lbox_info_raft(struct lua_State *L)
+{
+ lua_createtable(L, 0, 4);
+ lua_pushstring(L, raft_state_strs[raft.state]);
+ lua_setfield(L, -2, "state");
+ luaL_pushuint64(L, raft.volatile_term);
+ lua_setfield(L, -2, "term");
+ lua_pushinteger(L, raft.volatile_vote);
+ lua_setfield(L, -2, "vote");
+ lua_pushinteger(L, raft.leader);
+ lua_setfield(L, -2, "leader");
+ return 1;
+}
+
static const struct luaL_Reg lbox_info_dynamic_meta[] = {
{"id", lbox_info_id},
{"uuid", lbox_info_uuid},
@@ -595,6 +611,7 @@ static const struct luaL_Reg lbox_info_dynamic_meta[] = {
{"vinyl", lbox_info_vinyl},
{"sql", lbox_info_sql},
{"listen", lbox_info_listen},
+ {"raft", lbox_info_raft},
{NULL, NULL}
};
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index efbbcfd1f..e356f2d4b 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,7 +40,6 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
-#include "box/raft.h"
#include "box/xrow.h"
#include "mpstream/mpstream.h"
@@ -248,46 +247,12 @@ lbox_tuple_format_new(struct lua_State *L)
/* }}} */
-static int
-lbox_raft_new_term(struct lua_State *L)
-{
- uint64_t min_term = luaL_checkuint64(L, 1);
- raft_new_term(min_term);
- return 0;
-}
-
-static int
-lbox_raft_get(struct lua_State *L)
-{
- lua_createtable(L, 0, 8);
- luaL_pushuint64(L, raft.term);
- lua_setfield(L, -2, "term");
- luaL_pushuint64(L, raft.volatile_term);
- lua_setfield(L, -2, "volatile_term");
- luaL_pushuint64(L, raft.vote);
- lua_setfield(L, -2, "vote");
- luaL_pushuint64(L, raft.volatile_vote);
- lua_setfield(L, -2, "volatile_vote");
- lua_pushstring(L, raft_state_strs[raft.state]);
- lua_setfield(L, -2, "state");
- lua_pushinteger(L, raft.vote_count);
- lua_setfield(L, -2, "vote_count");
- lua_pushboolean(L, raft.is_write_in_progress);
- lua_setfield(L, -2, "is_write_in_progress");
- lua_pushboolean(L, raft.is_candidate);
- lua_setfield(L, -2, "is_candidate");
- return 1;
-}
==================================================
The first helper functions about new vote and new term were removed. Perhaps the
term bump will return as something like box.internal.raft_new_term() for the
tests, but vote should be controlled by Raft completely.
Information part is moved into box.info.raft in info.cc.
==================================================
-
void
box_lua_misc_init(struct lua_State *L)
{
static const struct luaL_Reg boxlib_internal[] = {
{"select", lbox_select},
{"new_tuple_format", lbox_tuple_format_new},
- /* Temporary helpers to sanity test raft persistency. */
- {"raft_new_term", lbox_raft_new_term},
- {"raft_get", lbox_raft_get},
{NULL, NULL}
};
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index b0b744db8..fe7ae9f63 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -517,11 +517,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
- /*
- * Don't encode vclock, because it is stored in the snapshot header
- * anyway.
- */
- raft_serialize(&ckpt->raft, NULL);
+ raft_serialize_for_disk(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
diff --git a/src/box/raft.c b/src/box/raft.c
index 6f2891291..1c4275cd5 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -79,7 +79,7 @@ struct raft raft = {
* will turn into a candidate when the flush is done.
*
* In case of a new not flushed vote it means either this node voted for some
- * other node, and must be a follower; or it voted or self, and also must be a
+ * other node, and must be a follower; or it voted for self, and also must be a
* follower, but will become a candidate when the flush is done.
*
* In total - when something is not synced with disk, the instance is a follower
@@ -96,14 +96,14 @@ raft_is_fully_on_disk(void)
* Raft protocol says that election timeout should be a bit randomized so as
* the nodes wouldn't start election at the same time and end up with not having
* a quorum for anybody. This implementation randomizes the election timeout by
- * adding {election timeout * random factor} value, where the factor is a
- * constant floating point value > 0.
+ * adding {election timeout * random factor} value, where max value of the
+ * factor is a constant floating point value > 0.
*/
static inline double
raft_new_random_election_shift(void)
{
double timeout = raft.election_timeout;
- /* Translate to ms. */
+ /* Translate to ms. Integer is needed to be able to use mod below. */
uint32_t rand_part =
(uint32_t)(timeout * RAFT_RANDOM_ELECTION_FACTOR * 1000);
if (rand_part == 0)
@@ -134,6 +134,49 @@ raft_can_vote_for(const struct vclock *v)
return cmp == 0 || cmp == 1;
}
+/**
+ * Election quorum is not strictly equal to synchronous replication quorum.
+ * Sometimes it can be lowered. That is about bootstrap.
+ *
+ * The problem with bootstrap is that when the replicaset boots, all the
+ * instances can't write to WAL and can't recover from their initial snapshot.
+ * They need one node which will boot first, and then they will replicate from
+ * it.
+ *
+ * This one node should boot from its zero snapshot, create replicaset UUID,
+ * register self with ID 1 in _cluster space, and then register all the other
+ * instances here. To do that the node must be writable. It should have
+ * read_only = false, connection quorum satisfied, and be a Raft leader if Raft
+ * is enabled.
+ *
+ * To be elected a Raft leader it needs to perform election. But it can't be
+ * done before at least synchronous quorum of the replicas is bootstrapped. And
+ * they can't be bootstrapped because wait for a leader to initialize _cluster.
+ * Cyclic dependency.
+ *
+ * This is resolved by truncation of the election quorum to the number of
+ * registered replicas, if their count is less than synchronous quorum. That
+ * helps to elect a first leader.
+ *
+ * It may seem that the first node could just declare itself a leader and then
+ * strictly follow the protocol from now on, but that won't work, because if the
+ * first node will restart after it is booted, but before quorum of replicas is
+ * booted, the cluster will stuck again.
+ *
+ * The current solution is totally safe because
+ *
+ * - after all the cluster will have node count >= quorum, if user used a
+ * correct config (God help him if he didn't);
+ *
+ * - synchronous replication quorum is untouched - it is not truncated. Only
+ * leader election quorum is affected. So synchronous data won't be lost.
+ */
+static inline int
+raft_election_quorum(void)
+{
+ return MIN(replication_synchro_quorum, replicaset.size);
+}
+
/** Broadcast an event about this node changed its state to all relays. */
static inline void
raft_broadcast_new_state(void)
@@ -157,9 +200,9 @@ static void
raft_sm_start(void);
/**
- * Stop the state machine.
+ * Stop the state machine. Now until Raft is re-enabled,
* - Raft stops affecting the instance operation;
- * - this node can't become a leader anymore;
+ * - this node can't become a leader;
* - this node can't vote.
*/
static void
@@ -188,7 +231,10 @@ raft_sm_schedule_new_term(uint64_t new_term);
static void
raft_sm_schedule_new_vote(uint32_t new_vote);
-/** Bump volatile term, vote for self, and schedule their flush to disk. */
+/**
+ * Bump term and vote for self immediately. After that is persisted, the
+ * election timeout will be activated. Unless during that nothing newer happens.
+ */
static void
raft_sm_schedule_new_election(void);
@@ -200,6 +246,17 @@ static void
raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
int events);
+/** Start Raft state flush to disk. */
+static void
+raft_sm_pause_and_dump(void);
+
+/**
+ * Flush Raft state changes to WAL. The callback resets itself, if during the
+ * write more changes appear.
+ */
+static void
+raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events);
==================================================
I announces these functions even though it wasn't necessary, to keep all
raft_sm_* methods declared in one place.
==================================================
+
void
raft_process_recovery(const struct raft_request *req)
{
@@ -261,6 +318,14 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
/* Can't vote for too old or incomparable nodes. */
if (!raft_can_vote_for(req->vclock))
break;
+ /*
+ * Check if somebody is asking to vote for a third
+ * node - nope. Make votes only when asked directly by
+ * the new candidate. However that restriction may be
+ * relaxed in future, if can be proven to be safe.
+ */
+ if (req->vote != source)
+ break;
==================================================
I decided not to modify the original Raft here, and don't spread a vote wave.
A node can vote only for candidates asking for a vote directly. Not via a third
node.
==================================================
/*
* Either the term is new, or didn't vote in the current
* term yet. Anyway can vote now.
@@ -279,7 +344,7 @@ raft_process_msg(const struct raft_request *req, uint32_t source)
assert(raft.volatile_vote == instance_id);
bool was_set = bit_set(&raft.vote_mask, source);
raft.vote_count += !was_set;
- if (raft.vote_count < replication_synchro_quorum)
+ if (raft.vote_count < raft_election_quorum())
break;
raft.state = RAFT_STATE_LEADER;
raft.leader = instance_id;
@@ -330,8 +395,8 @@ void
raft_process_heartbeat(uint32_t source)
{
/*
- * When not a candidate - don't wait for anything. Therefore does not
- * care about the leader being dead.
+ * When not a candidate - don't wait for anything. Therefore do not care
+ * about the leader being dead.
*/
if (!raft.is_candidate)
return;
@@ -352,23 +417,6 @@ raft_process_heartbeat(uint32_t source)
raft_sm_wait_leader_dead();
}
-void
-raft_serialize(struct raft_request *req, struct vclock *vclock)
-{
- memset(req, 0, sizeof(*req));
- /*
- * Volatile state is never used for any communications.
- * Use only persisted state.
- */
- req->term = raft.term;
- req->vote = raft.vote;
- req->state = raft.state;
- /*
- * Raft does not own vclock, so it always expects it passed externally.
- */
- req->vclock = vclock;
-}
-
/** Wakeup Raft state writer fiber waiting for WAL write end. */
static void
raft_write_cb(struct journal_entry *entry)
@@ -386,6 +434,13 @@ raft_write_request(const struct raft_request *req)
* be sent to network when vote for self.
*/
assert(req->vclock == NULL);
+ /*
+ * State is not persisted. That would be strictly against Raft protocol.
+ * The reason is that it does not make much sense - even if the node is
+ * a leader now, after the node is restarted, there will be another
+ * leader elected by that time likely.
+ */
+ assert(req->state == 0);
struct region *region = &fiber()->gc;
uint32_t svp = region_used(region);
struct xrow_header row;
@@ -417,12 +472,8 @@ fail:
panic("Could not write a raft request to WAL\n");
}
-/**
- * Flush Raft state changes to WAL. The callback resets itself, if during the
- * write more changes appear.
- */
static void
-raft_sm_dump_step(ev_loop *loop, ev_check *watcher, int events)
+raft_sm_dump_step(struct ev_loop *loop, struct ev_check *watcher, int events)
{
assert(watcher == &raft.io);
(void) events;
@@ -453,7 +504,7 @@ end_dump:
raft_sm_wait_leader_dead();
} else if (raft.vote == instance_id) {
/* Just wrote own vote. */
- if (replication_synchro_quorum == 1) {
+ if (raft_election_quorum() == 1) {
raft.state = RAFT_STATE_LEADER;
raft.leader = instance_id;
/*
@@ -463,8 +514,10 @@ end_dump:
box_update_ro_summary();
} else {
raft.state = RAFT_STATE_CANDIDATE;
+ /* First vote for self. */
raft.vote_count = 1;
raft.vote_mask = 0;
+ bit_set(&raft.vote_mask, instance_id);
raft_sm_wait_election_end();
}
} else if (raft.vote != 0) {
@@ -530,7 +583,6 @@ end_dump:
raft_broadcast(&req);
}
-/** Start Raft state flush to disk. */
static void
raft_sm_pause_and_dump(void)
{
@@ -542,7 +594,6 @@ raft_sm_pause_and_dump(void)
raft.is_write_in_progress = true;
}
-/** Bump term, reset Raft state, and persist that fact. */
static void
raft_sm_schedule_new_term(uint64_t new_term)
{
@@ -556,7 +607,6 @@ raft_sm_schedule_new_term(uint64_t new_term)
raft_sm_pause_and_dump();
}
-/** Vote in the current term, and persist that fact. */
static void
raft_sm_schedule_new_vote(uint32_t new_vote)
{
@@ -565,10 +615,6 @@ raft_sm_schedule_new_vote(uint32_t new_vote)
raft_sm_pause_and_dump();
}
-/**
- * Bump term and vote for self immediately. After that is persisted, the
- * election timeout will be activated. Unless during that nothing newer happens.
- */
static void
raft_sm_schedule_new_election(void)
{
@@ -581,21 +627,6 @@ raft_sm_schedule_new_election(void)
box_update_ro_summary();
}
-void
-raft_new_term(uint64_t min_new_term)
-{
- uint64_t new_term;
- if (raft.term < min_new_term)
- new_term = min_new_term + 1;
- else
- new_term = raft.term + 1;
- enum raft_state old_state = raft.state;
- raft_sm_schedule_new_term(new_term);
- if (raft.state != old_state)
- raft_broadcast_new_state();
- box_update_ro_summary();
-}
-
static void
raft_sm_schedule_new_election_cb(struct ev_loop *loop, struct ev_timer *timer,
int events)
@@ -657,7 +688,7 @@ raft_sm_start(void)
* it wasn't sent in disabled state.
*/
struct raft_request req;
- raft_serialize(&req, NULL);
+ raft_serialize_for_network(&req, NULL);
raft_broadcast(&req);
}
@@ -670,6 +701,31 @@ raft_sm_stop(void)
box_update_ro_summary();
}
+void
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock)
+{
+ memset(req, 0, sizeof(*req));
+ /*
+ * Volatile state is never used for any communications.
+ * Use only persisted state.
+ */
+ req->term = raft.term;
+ req->vote = raft.vote;
+ req->state = raft.state;
+ /*
+ * Raft does not own vclock, so it always expects it passed externally.
+ */
+ req->vclock = vclock;
+}
+
+void
+raft_serialize_for_disk(struct raft_request *req)
+{
+ memset(req, 0, sizeof(*req));
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
+
void
raft_cfg_is_enabled(bool is_enabled)
{
@@ -694,9 +750,8 @@ raft_cfg_is_candidate(bool is_candidate)
if (raft.is_candidate) {
assert(raft.state == RAFT_STATE_FOLLOWER);
/*
- * If there is an on-going WAL write, it means
- * there was some node who sent newer data to this
- * node.
+ * If there is an on-going WAL write, it means there was some
+ * node who sent newer data to this node.
*/
if (raft.leader == 0 && raft_is_fully_on_disk())
raft_sm_schedule_new_election();
@@ -729,7 +784,7 @@ raft_cfg_election_quorum(void)
if (raft.state != RAFT_STATE_CANDIDATE ||
raft.state == RAFT_STATE_LEADER)
return;
- if (raft.vote_count < replication_synchro_quorum)
+ if (raft.vote_count < raft_election_quorum())
return;
/*
* The node is a candidate. It means its state if fully synced with
@@ -767,18 +822,6 @@ raft_broadcast(const struct raft_request *req)
}
}
-void
-raft_bootstrap_leader(void)
-{
- assert(raft.is_enabled);
- assert(raft.volatile_term == 0);
- assert(raft.volatile_vote == 0);
- assert(raft.state == RAFT_STATE_FOLLOWER);
- raft.state = RAFT_STATE_LEADER;
- raft_broadcast_new_state();
- box_update_ro_summary();
-}
-
void
raft_init(void)
{
diff --git a/src/box/raft.h b/src/box/raft.h
index 57584bc1b..111a9c16e 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -37,12 +37,51 @@
extern "C" {
#endif
+/**
+ * This is an implementation of Raft leader election protocol, separated from
+ * synchronous replication part.
+ *
+ * The protocol describes an algorithm which helps to elect a single leader in
+ * the cluster, which is supposed to handle write requests. And re-elect a new
+ * leader, when the current leader dies.
+ *
+ * The implementation follows the protocol to the letter except a few important
+ * details.
+ *
+ * Firstly, the original Raft assumes, that all nodes share the same log record
+ * numbers. In Tarantool they are called LSNs. But in case of Tarantool each
+ * node has its own LSN in its own component of vclock. That makes the election
+ * messages a bit heavier, because the nodes need to send and compare complete
+ * vclocks of each other instead of a single number like in the original Raft.
+ * But logic becomes simpler. Because in the original Raft there is a problem of
+ * uncertainty about what to do with records of an old leader right after a new
+ * leader is elected. They could be rolled back or confirmed depending on
+ * circumstances. The issue disappears when vclock is used.
+ *
+ * Secondly, leader election works differently during cluster bootstrap, until
+ * number of bootstrapped replicas becomes >= election quorum. That arises from
+ * specifics of replicas bootstrap and order of systems initialization. In
+ * short: during bootstrap a leader election may use a smaller election quorum
+ * than the configured one. See more details in the code.
+ */
==================================================
It feels the comment is still not detailed enough. It looks suspiciously small
for such a big feature. Would be nice if anyone could add anything. I will add
more info later, if something will come into mind.
==================================================
+
struct raft_request;
struct vclock;
enum raft_state {
+ /**
+ * Can't write. Can only accept data from a leader. Node in this state
+ * either monitors an existing leader, or there is an on-going election
+ * and the node voted for another node, or it can't be a candidate and
+ * does not do anything.
+ */
RAFT_STATE_FOLLOWER = 1,
+ /**
+ * The node can't write. There is an active election, in which the node
+ * voted for self. Now it waits for election outcome.
+ */
RAFT_STATE_CANDIDATE = 2,
+ /** Election was successful. The node accepts write requests. */
RAFT_STATE_LEADER = 3,
};
@@ -54,31 +93,27 @@ struct raft {
/** State of the instance. */
enum raft_state state;
/**
- * Volatile part of the Raft state, whose WAL write may be
- * still in-progress, and yet the state may be already
- * used. Volatile state is never sent to anywhere, but the
- * state machine makes decisions based on it. That is
- * vital.
- * As an example, volatile vote needs to be used to reject
- * votes inside a term, where the instance already voted
- * (even if the vote WAL write is not finished yet).
- * Otherwise the instance would try to write several votes
- * inside one term.
+ * Volatile part of the Raft state, whose WAL write may be still
+ * in-progress, and yet the state may be already used. Volatile state is
+ * never sent to anywhere, but the state machine makes decisions based
+ * on it. That is vital.
+ * As an example, volatile vote needs to be used to reject votes inside
+ * a term, where the instance already voted (even if the vote WAL write
+ * is not finished yet). Otherwise the instance would try to write
+ * several votes inside one term.
*/
uint64_t volatile_term;
uint32_t volatile_vote;
/**
- * Flag whether Raft is enabled. When disabled, it still
- * persists terms so as to quickly enroll into the cluster
- * when (if) it is enabled. In everything else disabled
- * Raft does not affect instance work.
+ * Flag whether Raft is enabled. When disabled, it still persists terms
+ * so as to quickly enroll into the cluster when (if) it is enabled. In
+ * everything else disabled Raft does not affect instance work.
*/
bool is_enabled;
/**
- * Flag whether the node can become a leader. It is an
- * accumulated value of configuration options Raft enabled
- * Raft candidate. If at least one is false - the instance
- * is not a candidate.
+ * Flag whether the node can become a leader. It is an accumulated value
+ * of configuration options Raft enabled and Raft candidate. If at least
+ * one is false - the instance is not a candidate.
*/
bool is_candidate;
/** Flag whether the instance is allowed to be a leader. */
@@ -94,7 +129,10 @@ struct raft {
*/
uint64_t term;
uint32_t vote;
- /** Bit 1 means that a vote from that instance was obtained. */
+ /**
+ * Bit 1 on position N means that a vote from instance with ID = N was
+ * obtained.
+ */
uint32_t vote_mask;
/** Number of votes for this instance. Valid only in candidate state. */
int vote_count;
@@ -110,24 +148,25 @@ struct raft {
extern struct raft raft;
-void
-raft_new_term(uint64_t min_new_term);
-
-void
-raft_vote(uint32_t vote_for);
-
+/**
+ * A flag whether the instance is read-only according to Raft. Even if Raft
+ * allows writes though, it does not mean the instance is writable. It can be
+ * affected by box.cfg.read_only, connection quorum.
+ */
static inline bool
raft_is_ro(void)
{
return raft.is_enabled && raft.state != RAFT_STATE_LEADER;
}
+/** See if the instance can accept rows from an instance with the given ID. */
static inline bool
raft_is_source_allowed(uint32_t source_id)
{
return !raft.is_enabled || raft.leader == source_id;
}
+/** Check if Raft is enabled. */
static inline bool
raft_is_enabled(void)
{
@@ -146,59 +185,66 @@ raft_process_recovery(const struct raft_request *req);
void
raft_process_msg(const struct raft_request *req, uint32_t source);
+/**
+ * Process a heartbeat message from an instance with the given ID. It is used to
+ * watch leader's health and start election when necessary.
+ */
void
raft_process_heartbeat(uint32_t source);
-/**
- * Broadcast the changes in this instance's raft status to all
- * the followers.
- */
+/** Configure whether Raft is enabled. */
void
raft_cfg_is_enabled(bool is_enabled);
+/**
+ * Configure whether the instance can be elected as Raft leader. Even if false,
+ * the node still can vote, when Raft is enabled.
+ */
void
raft_cfg_is_candidate(bool is_candidate);
+/** Configure Raft leader election timeout. */
void
raft_cfg_election_timeout(double timeout);
+/**
+ * Configure Raft leader election quorum. There is no a separate option.
+ * Instead, synchronous replication quorum is used. Since Raft is tightly bound
+ * with synchronous replication.
+ */
void
raft_cfg_election_quorum(void);
+/**
+ * Configure Raft leader death timeout. I.e. number of seconds without
+ * heartbeats from the leader to consider it dead. There is no a separate
+ * option. Raft uses replication timeout for that.
+ */
void
raft_cfg_death_timeout(void);
-/** Save complete Raft state into the request. */
+/**
+ * Save complete Raft state into a request to be sent to other instances of the
+ * cluster. It is allowed to save anything here, not only persistent state.
+ */
void
-raft_serialize(struct raft_request *req, struct vclock *vclock);
+raft_serialize_for_network(struct raft_request *req, struct vclock *vclock);
==================================================
Serialization was split in 2 calls - for disk and for network. Because for disk
state and vclock are never needed. It looked ugly to pass NULL vclock and
manually nullify request's state after serialize() call.
==================================================
/**
- * Broadcast the changes in this instance's raft status to all
- * the followers.
+ * Save complete Raft state into a request to be persisted on disk. Only term
+ * and vote are being persisted.
*/
void
-raft_broadcast(const struct raft_request *req);
+raft_serialize_for_disk(struct raft_request *req);
/**
- * Bootstrap the current instance as the first leader of the cluster. That is
- * done bypassing the Raft election protocol, by just assigning this node a
- * leader role. That is needed, because when the cluster is not bootstrapped, it
- * is necessary to find a node, which will generate a replicaset UUID, write it
- * into _cluster space, and register all the other nodes in _cluster.
- * Until it is done, all nodes but one won't boot. Their WALs won't work. And
- * therefore they won't be able to participate in leader election. That
- * effectively makes the cluster dead from the beginning unless the first
- * bootstrapped node won't declare itself a leader without elections.
- *
- * XXX: That does not solve the problem, when the first node boots, creates a
- * snapshot, and then immediately dies. After recovery it won't declare itself a
- * leader. Therefore if quorum > 1, the cluster won't proceed to registering
- * any replicas and becomes completely dead. Perhaps that must be solved by
- * truncating quorum down to number of records in _cluster.
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
*/
void
-raft_bootstrap_leader(void);
+raft_broadcast(const struct raft_request *req);
+/** Initialize Raft global data structures. */
void
raft_init(void);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index ef0e2411d..20f16206a 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
tt_uuid_str(&replica->uuid));
}
replicaset.replica_by_id[replica_id] = replica;
+ ++replicaset.size;
say_info("assigned id %d to replica %s",
replica->id, tt_uuid_str(&replica->uuid));
@@ -267,6 +268,8 @@ replica_clear_id(struct replica *replica)
* replication.
*/
replicaset.replica_by_id[replica->id] = NULL;
+ assert(replicaset.size > 0);
+ --replicaset.size;
if (replica->id == instance_id) {
/* See replica_check_id(). */
assert(replicaset.is_joining);
diff --git a/src/box/replication.h b/src/box/replication.h
index ddc2bddf4..69cc820c9 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -217,6 +217,13 @@ struct replicaset {
bool is_joining;
/* A number of anonymous replicas following this instance. */
int anon_count;
+ /**
+ * Number of registered replicas. That includes all of them - connected,
+ * disconnected, connected not directly, just present in _cluster. If an
+ * instance has an ID, has the same replicaset UUID, then it is
+ * accounted here.
+ */
+ int size;
/** Applier state. */
struct {
/**
diff --git a/test/box/info.result b/test/box/info.result
index 40eeae069..d0abb634a 100644
--- a/test/box/info.result
+++ b/test/box/info.result
@@ -82,6 +82,7 @@ t
- memory
- package
- pid
+ - raft
- replication
- replication_anon
- ro
^ permalink raw reply [flat|nested] 10+ messages in thread