[Tarantool-patches] [PATCH v2 06/11] raft: introduce persistent raft state
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Sep 10 02:16:57 MSK 2020
The patch introduces a sceleton of Raft module and a method to
persist a Raft state in snapshot, not bound to any space.
Part of #1146
---
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 8 +++++
src/box/iproto_constants.h | 13 ++++++++
src/box/lua/misc.cc | 1 +
src/box/memtx_engine.c | 35 ++++++++++++++++++++
src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++++
src/box/raft.h | 67 ++++++++++++++++++++++++++++++++++++++
src/box/xrow.c | 56 +++++++++++++++++++++++++++++++
src/box/xrow.h | 12 +++++++
9 files changed, 258 insertions(+)
create mode 100644 src/box/raft.c
create mode 100644 src/box/raft.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689d2..29c3bfe79 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -170,6 +170,7 @@ add_library(box STATIC
port.c
txn.c
txn_limbo.c
+ raft.c
box.cc
gc.c
checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index 3214ec340..1a19058b3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
+#include "raft.h"
#include "trivia/util.h"
static char status[64] = "unknown";
@@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
diag_raise();
return;
}
+ if (iproto_type_is_raft_request(row->type)) {
+ struct raft_request raft_req;
+ if (xrow_decode_raft(row, &raft_req) != 0)
+ diag_raise();
+ raft_process_recovery(&raft_req);
+ return;
+ }
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4f5a2b195..8a11626b3 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,8 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
+ IPROTO_RAFT = 30,
+
/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
/** A rollback message for synchronous transactions. */
@@ -258,6 +260,11 @@ enum iproto_type {
/** IPROTO type name by code */
extern const char *iproto_type_strs[];
+enum iproto_raft_keys {
+ IPROTO_RAFT_TERM = 0,
+ IPROTO_RAFT_VOTE = 1,
+};
+
/**
* Returns IPROTO type name by @a type code.
* @param type IPROTO type.
@@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
}
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+ return type == IPROTO_RAFT;
+}
+
/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..e356f2d4b 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,7 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
+#include "box/xrow.h"
#include "mpstream/mpstream.h"
static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 9f079a6b5..5ab2cf266 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "raft.h"
/* sync snapshot every 16MB */
#define SNAP_SYNC_INTERVAL (1 << 24)
@@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
return 0;
}
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+ assert(row->type == IPROTO_RAFT);
+ struct raft_request req;
+ if (xrow_decode_raft(row, &req) != 0)
+ return -1;
+ raft_process_recovery(&req);
+ return 0;
+}
+
static int
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct xrow_header *row)
{
assert(row->bodycnt == 1); /* always 1 for read */
if (row->type != IPROTO_INSERT) {
+ if (row->type == IPROTO_RAFT)
+ return memtx_engine_recover_raft(row);
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row->type);
return -1;
@@ -477,6 +491,7 @@ struct checkpoint {
/** The vclock of the snapshot file. */
struct vclock vclock;
struct xdir dir;
+ struct raft_request raft;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
@@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
+ raft_serialize_for_disk(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
@@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
return 0;
};
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+ struct xrow_header row;
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ int rc = -1;
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto finish;
+ if (checkpoint_write_row(l, &row) != 0)
+ goto finish;
+ rc = 0;
+finish:
+ region_truncate(region, svp);
+ return rc;
+}
+
static int
checkpoint_f(va_list ap)
{
@@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
if (rc != 0)
goto fail;
}
+ if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+ goto fail;
if (xlog_flush(&snap) < 0)
goto fail;
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..511fe42f5
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+ .term = 1,
+ .vote = 0,
+};
+
+void
+raft_process_recovery(const struct raft_request *req)
+{
+ if (req->term != 0)
+ raft.term = req->term;
+ if (req->vote != 0)
+ raft.vote = req->vote;
+}
+
+void
+raft_serialize_for_network(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
+
+void
+raft_serialize_for_disk(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..31f7becdb
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,67 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft_request;
+
+struct raft {
+ uint64_t term;
+ uint32_t vote;
+};
+
+extern struct raft raft;
+
+/** Process a raft entry stored in WAL/snapshot. */
+void
+raft_process_recovery(const struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be sent to other instances of the
+ * cluster. It is allowed to save anything here, not only persistent state.
+ */
+void
+raft_serialize_for_network(struct raft_request *req);
+
+/**
+ * Save complete Raft state into a request to be persisted on disk. Only term
+ * and vote are being persisted.
+ */
+void
+raft_serialize_for_disk(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 95ddb1fe7..1923bacfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
return 0;
}
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r)
+{
+ size_t size = mp_sizeof_map(2) +
+ mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term) +
+ mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ char *buf = region_alloc(region, size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "buf");
+ return -1;
+ }
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_RAFT;
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = size;
+ row->group_id = GROUP_LOCAL;
+ row->bodycnt = 1;
+ buf = mp_encode_map(buf, 2);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+ buf = mp_encode_uint(buf, r->term);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+ /* TODO: handle bad format. */
+ assert(row->type == IPROTO_RAFT);
+ assert(row->bodycnt == 1);
+ assert(row->group_id == GROUP_LOCAL);
+ memset(r, 0, sizeof(*r));
+ const char *pos = row->body[0].iov_base;
+ uint32_t map_size = mp_decode_map(&pos);
+ for (uint32_t i = 0; i < map_size; ++i)
+ {
+ uint64_t key = mp_decode_uint(&pos);
+ switch (key) {
+ case IPROTO_RAFT_TERM:
+ r->term = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VOTE:
+ r->vote = mp_decode_uint(&pos);
+ break;
+ default:
+ mp_next(&pos);
+ break;
+ }
+ }
+ return 0;
+}
+
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 58d47b12d..c234f6f88 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
/**
* CALL/EVAL request.
*/
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list