Tarantool development patches archive
 help / color / mirror / Atom feed
From: Serge Petrenko <sergepetrenko@tarantool.org>
To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org
Cc: tarantool-patches@dev.tarantool.org
Subject: [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state
Date: Wed, 26 Aug 2020 10:52:33 +0300	[thread overview]
Message-ID: <cff55bc413825bf9237be9e2c01ef308438587aa.1598427905.git.sergepetrenko@tarantool.org> (raw)
In-Reply-To: <cover.1598427905.git.sergepetrenko@tarantool.org>

From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>

box.internal.raft_*() helper functions were introduced to test
the persistency. Any state change is saved into WAL and into
snapshot.
---
 src/box/CMakeLists.txt     |   1 +
 src/box/box.cc             |   8 +++
 src/box/iproto_constants.h |  13 ++++
 src/box/lua/misc.cc        |  35 +++++++++++
 src/box/memtx_engine.c     |  35 +++++++++++
 src/box/raft.c             | 120 +++++++++++++++++++++++++++++++++++++
 src/box/raft.h             |  61 +++++++++++++++++++
 src/box/xrow.c             |  56 +++++++++++++++++
 src/box/xrow.h             |  12 ++++
 9 files changed, 341 insertions(+)
 create mode 100644 src/box/raft.c
 create mode 100644 src/box/raft.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689d2..29c3bfe79 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -170,6 +170,7 @@ add_library(box STATIC
     port.c
     txn.c
     txn_limbo.c
+    raft.c
     box.cc
     gc.c
     checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index faffd5769..c0adccc6a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
 #include "sequence.h"
 #include "sql_stmt_cache.h"
 #include "msgpack.h"
+#include "raft.h"
 #include "trivia/util.h"
 
 static char status[64] = "unknown";
@@ -374,6 +375,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
 			diag_raise();
 		return;
 	}
+	if (iproto_type_is_raft_request(row->type)) {
+		struct raft_request raft_req;
+		if (xrow_decode_raft(row, &raft_req) != 0)
+			diag_raise();
+		raft_process(&raft_req);
+		return;
+	}
 	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
 	if (request.type != IPROTO_NOP) {
 		struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4f5a2b195..8a11626b3 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,8 @@ enum iproto_type {
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
+	IPROTO_RAFT = 30,
+
 	/** A confirmation message for synchronous transactions. */
 	IPROTO_CONFIRM = 40,
 	/** A rollback message for synchronous transactions. */
@@ -258,6 +260,11 @@ enum iproto_type {
 /** IPROTO type name by code */
 extern const char *iproto_type_strs[];
 
+enum iproto_raft_keys {
+	IPROTO_RAFT_TERM = 0,
+	IPROTO_RAFT_VOTE = 1,
+};
+
 /**
  * Returns IPROTO type name by @a type code.
  * @param type IPROTO type.
@@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
 	return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
 }
 
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+	return type == IPROTO_RAFT;
+}
+
 /** This is an error. */
 static inline bool
 iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..98e98abe2 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,8 @@
 #include "box/tuple.h"
 #include "box/tuple_format.h"
 #include "box/lua/tuple.h"
+#include "box/raft.h"
+#include "box/xrow.h"
 #include "mpstream/mpstream.h"
 
 static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
@@ -246,12 +248,45 @@ lbox_tuple_format_new(struct lua_State *L)
 
 /* }}} */
 
+static int
+lbox_raft_new_term(struct lua_State *L)
+{
+	uint64_t min_term = luaL_checkuint64(L, 1);
+	raft_new_term(min_term);
+	return 0;
+}
+
+static int
+lbox_raft_vote(struct lua_State *L)
+{
+	uint64_t vote_for = luaL_checkuint64(L, 1);
+	if (vote_for > UINT32_MAX)
+		return luaL_error(L, "Invalid vote");
+	raft_vote(vote_for);
+	return 0;
+}
+
+static int
+lbox_raft_get(struct lua_State *L)
+{
+	lua_createtable(L, 0, 2);
+	luaL_pushuint64(L, raft.term);
+	lua_setfield(L, -2, "term");
+	luaL_pushuint64(L, raft.vote);
+	lua_setfield(L, -2, "vote");
+	return 1;
+}
+
 void
 box_lua_misc_init(struct lua_State *L)
 {
 	static const struct luaL_Reg boxlib_internal[] = {
 		{"select", lbox_select},
 		{"new_tuple_format", lbox_tuple_format_new},
+		/* Temporary helpers to sanity test raft persistency. */
+		{"raft_new_term", lbox_raft_new_term},
+		{"raft_vote", lbox_raft_vote},
+		{"raft_get", lbox_raft_get},
 		{NULL, NULL}
 	};
 
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dfd6fce6e..26274de80 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
 #include "replication.h"
 #include "schema.h"
 #include "gc.h"
+#include "raft.h"
 
 /* sync snapshot every 16MB */
 #define SNAP_SYNC_INTERVAL	(1 << 24)
@@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
 	return 0;
 }
 
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+	assert(row->type == IPROTO_RAFT);
+	struct raft_request req;
+	if (xrow_decode_raft(row, &req) != 0)
+		return -1;
+	raft_process(&req);
+	return 0;
+}
+
 static int
 memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
 				  struct xrow_header *row)
 {
 	assert(row->bodycnt == 1); /* always 1 for read */
 	if (row->type != IPROTO_INSERT) {
+		if (row->type == IPROTO_RAFT)
+			return memtx_engine_recover_raft(row);
 		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
 			 (uint32_t) row->type);
 		return -1;
@@ -477,6 +491,7 @@ struct checkpoint {
 	/** The vclock of the snapshot file. */
 	struct vclock vclock;
 	struct xdir dir;
+	struct raft_request raft;
 	/**
 	 * Do nothing, just touch the snapshot file - the
 	 * checkpoint already exists.
@@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
 	opts.free_cache = true;
 	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
 	vclock_create(&ckpt->vclock);
+	raft_serialize(&ckpt->raft);
 	ckpt->touch = false;
 	return ckpt;
 }
@@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
 	return 0;
 };
 
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+	struct xrow_header row;
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	int rc = -1;
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto finish;
+	if (checkpoint_write_row(l, &row) != 0)
+		goto finish;
+	rc = 0;
+finish:
+	region_truncate(region, svp);
+	return rc;
+}
+
 static int
 checkpoint_f(va_list ap)
 {
@@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
 		if (rc != 0)
 			goto fail;
 	}
+	if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+		goto fail;
 	if (xlog_flush(&snap) < 0)
 		goto fail;
 
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..5465f46b6
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+	.term = 0,
+	.vote = 0,
+};
+
+void
+raft_process(const struct raft_request *req)
+{
+	if (req->term != 0)
+		raft.term = req->term;
+	if (req->vote != 0)
+		raft.vote = req->vote;
+}
+
+void
+raft_serialize(struct raft_request *req)
+{
+	req->term = raft.term;
+	req->vote = raft.vote;
+}
+
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+	fiber_wakeup(entry->complete_data);
+}
+
+static void
+raft_write_request(const struct raft_request *req)
+{
+	struct region *region = &fiber()->gc;
+	uint32_t svp = region_used(region);
+	struct xrow_header row;
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
+
+	if (xrow_encode_raft(&row, region, req) != 0)
+		goto fail;
+	journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+			     fiber());
+
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		goto fail;
+	}
+	region_truncate(region, svp);
+	return;
+fail:
+	/*
+	 * XXX: the stub is supposed to be removed once it is defined what to do
+	 * when a raft request WAL write fails.
+	 */
+	panic("Could not write a raft request to WAL\n");
+}
+
+void
+raft_new_term(uint64_t min_new_term)
+{
+	if (raft.term < min_new_term)
+		raft.term = min_new_term + 1;
+	else
+		++raft.term;
+
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.term = raft.term;
+	raft_write_request(&req);
+}
+
+void
+raft_vote(uint32_t vote_for)
+{
+	raft.vote = vote_for;
+
+	struct raft_request req;
+	memset(&req, 0, sizeof(req));
+	req.vote = vote_for;
+	raft_write_request(&req);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..1f392033d
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,61 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+struct raft_request;
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft {
+	uint64_t term;
+	uint32_t vote;
+};
+
+extern struct raft raft;
+
+void
+raft_new_term(uint64_t min_new_term);
+
+void
+raft_vote(uint32_t vote_for);
+
+void
+raft_process(const struct raft_request *req);
+
+void
+raft_serialize(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 95ddb1fe7..1923bacfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
 	return 0;
 }
 
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+		 const struct raft_request *r)
+{
+	size_t size = mp_sizeof_map(2) +
+		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
+		      mp_sizeof_uint(r->term) +
+		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+		      mp_sizeof_uint(r->vote);
+	char *buf = region_alloc(region, size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, size, "region_alloc", "buf");
+		return -1;
+	}
+	memset(row, 0, sizeof(*row));
+	row->type = IPROTO_RAFT;
+	row->body[0].iov_base = buf;
+	row->body[0].iov_len = size;
+	row->group_id = GROUP_LOCAL;
+	row->bodycnt = 1;
+	buf = mp_encode_map(buf, 2);
+	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+	buf = mp_encode_uint(buf, r->term);
+	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+	buf = mp_encode_uint(buf, r->vote);
+	return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+	/* TODO: handle bad format. */
+	assert(row->type == IPROTO_RAFT);
+	assert(row->bodycnt == 1);
+	assert(row->group_id == GROUP_LOCAL);
+	memset(r, 0, sizeof(*r));
+	const char *pos = row->body[0].iov_base;
+	uint32_t map_size = mp_decode_map(&pos);
+	for (uint32_t i = 0; i < map_size; ++i)
+	{
+		uint64_t key = mp_decode_uint(&pos);
+		switch (key) {
+		case IPROTO_RAFT_TERM:
+			r->term = mp_decode_uint(&pos);
+			break;
+		case IPROTO_RAFT_VOTE:
+			r->vote = mp_decode_uint(&pos);
+			break;
+		default:
+			mp_next(&pos);
+			break;
+		}
+	}
+	return 0;
+}
+
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 58d47b12d..c234f6f88 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
 int
 xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
 
+struct raft_request {
+	uint64_t term;
+	uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+		 const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
 /**
  * CALL/EVAL request.
  */
-- 
2.20.1 (Apple Git-117)

  reply	other threads:[~2020-08-26  7:52 UTC|newest]

Thread overview: 13+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-08-26  7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
2020-08-26  7:52 ` Serge Petrenko [this message]
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers Serge Petrenko
2020-08-27 20:36   ` Vladislav Shpilevoy
2020-08-28 10:10     ` Sergey Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 03/10] [tosquash] raft: return raft_request to xrow Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 04/10] [tosquash] raft: introduce IPROTO_RAFT_VCLOCK Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 06/10] [tosquash] raft: don't fill raft_request manually Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 07/10] [tosquash] raft: rename curr_leader to leader Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 08/10] [tosquash] raft: rename raft_process to raft_process_recovery Serge Petrenko
2020-08-26  7:52 ` [Tarantool-patches] [RAFT 09/10] [tosquash] applier: handler error at raft row appliance Serge Petrenko
2020-08-26  7:53 ` [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay Serge Petrenko

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=cff55bc413825bf9237be9e2c01ef308438587aa.1598427905.git.sergepetrenko@tarantool.org \
    --to=sergepetrenko@tarantool.org \
    --cc=gorcunov@gmail.com \
    --cc=sergos@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox