[Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state

Serge Petrenko sergepetrenko at tarantool.org
Fri Sep 4 11:59:25 MSK 2020


04.09.2020 01:51, Vladislav Shpilevoy пишет:
> box.internal.raft_*() helper functions were introduced to test
> the persistency. Any state change is saved into WAL and into
> snapshot.
>
> Part of #1146
> ---


LGTM.


>   src/box/CMakeLists.txt     |  1 +
>   src/box/box.cc             |  8 +++++
>   src/box/iproto_constants.h | 13 ++++++++
>   src/box/lua/misc.cc        |  1 +
>   src/box/memtx_engine.c     | 35 ++++++++++++++++++++
>   src/box/raft.c             | 65 ++++++++++++++++++++++++++++++++++++
>   src/box/raft.h             | 67 ++++++++++++++++++++++++++++++++++++++
>   src/box/xrow.c             | 56 +++++++++++++++++++++++++++++++
>   src/box/xrow.h             | 12 +++++++
>   9 files changed, 258 insertions(+)
>   create mode 100644 src/box/raft.c
>   create mode 100644 src/box/raft.h
>
> diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
> index b8b2689d2..29c3bfe79 100644
> --- a/src/box/CMakeLists.txt
> +++ b/src/box/CMakeLists.txt
> @@ -170,6 +170,7 @@ add_library(box STATIC
>       port.c
>       txn.c
>       txn_limbo.c
> +    raft.c
>       box.cc
>       gc.c
>       checkpoint_schedule.c
> diff --git a/src/box/box.cc b/src/box/box.cc
> index eeb00d5e2..281917af2 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -78,6 +78,7 @@
>   #include "sequence.h"
>   #include "sql_stmt_cache.h"
>   #include "msgpack.h"
> +#include "raft.h"
>   #include "trivia/util.h"
>   
>   static char status[64] = "unknown";
> @@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
>   			diag_raise();
>   		return;
>   	}
> +	if (iproto_type_is_raft_request(row->type)) {
> +		struct raft_request raft_req;
> +		if (xrow_decode_raft(row, &raft_req) != 0)
> +			diag_raise();
> +		raft_process_recovery(&raft_req);
> +		return;
> +	}
>   	xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>   	if (request.type != IPROTO_NOP) {
>   		struct space *space = space_cache_find_xc(request.space_id);
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 4f5a2b195..8a11626b3 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -219,6 +219,8 @@ enum iproto_type {
>   	/** The maximum typecode used for box.stat() */
>   	IPROTO_TYPE_STAT_MAX,
>   
> +	IPROTO_RAFT = 30,
> +
>   	/** A confirmation message for synchronous transactions. */
>   	IPROTO_CONFIRM = 40,
>   	/** A rollback message for synchronous transactions. */
> @@ -258,6 +260,11 @@ enum iproto_type {
>   /** IPROTO type name by code */
>   extern const char *iproto_type_strs[];
>   
> +enum iproto_raft_keys {
> +	IPROTO_RAFT_TERM = 0,
> +	IPROTO_RAFT_VOTE = 1,
> +};
> +
>   /**
>    * Returns IPROTO type name by @a type code.
>    * @param type IPROTO type.
> @@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
>   	return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
>   }
>   
> +static inline bool
> +iproto_type_is_raft_request(uint32_t type)
> +{
> +	return type == IPROTO_RAFT;
> +}
> +
>   /** This is an error. */
>   static inline bool
>   iproto_type_is_error(uint32_t type)
> diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
> index 5da84b35a..e356f2d4b 100644
> --- a/src/box/lua/misc.cc
> +++ b/src/box/lua/misc.cc
> @@ -40,6 +40,7 @@
>   #include "box/tuple.h"
>   #include "box/tuple_format.h"
>   #include "box/lua/tuple.h"
> +#include "box/xrow.h"
>   #include "mpstream/mpstream.h"
>   
>   static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
> diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
> index dfd6fce6e..7ebed7aa8 100644
> --- a/src/box/memtx_engine.c
> +++ b/src/box/memtx_engine.c
> @@ -48,6 +48,7 @@
>   #include "replication.h"
>   #include "schema.h"
>   #include "gc.h"
> +#include "raft.h"
>   
>   /* sync snapshot every 16MB */
>   #define SNAP_SYNC_INTERVAL	(1 << 24)
> @@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
>   	return 0;
>   }
>   
> +static int
> +memtx_engine_recover_raft(const struct xrow_header *row)
> +{
> +	assert(row->type == IPROTO_RAFT);
> +	struct raft_request req;
> +	if (xrow_decode_raft(row, &req) != 0)
> +		return -1;
> +	raft_process_recovery(&req);
> +	return 0;
> +}
> +
>   static int
>   memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
>   				  struct xrow_header *row)
>   {
>   	assert(row->bodycnt == 1); /* always 1 for read */
>   	if (row->type != IPROTO_INSERT) {
> +		if (row->type == IPROTO_RAFT)
> +			return memtx_engine_recover_raft(row);
>   		diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
>   			 (uint32_t) row->type);
>   		return -1;
> @@ -477,6 +491,7 @@ struct checkpoint {
>   	/** The vclock of the snapshot file. */
>   	struct vclock vclock;
>   	struct xdir dir;
> +	struct raft_request raft;
>   	/**
>   	 * Do nothing, just touch the snapshot file - the
>   	 * checkpoint already exists.
> @@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
>   	opts.free_cache = true;
>   	xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
>   	vclock_create(&ckpt->vclock);
> +	raft_serialize_for_disk(&ckpt->raft);
>   	ckpt->touch = false;
>   	return ckpt;
>   }
> @@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
>   	return 0;
>   };
>   
> +static int
> +checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
> +{
> +	struct xrow_header row;
> +	struct region *region = &fiber()->gc;
> +	uint32_t svp = region_used(region);
> +	int rc = -1;
> +	if (xrow_encode_raft(&row, region, req) != 0)
> +		goto finish;
> +	if (checkpoint_write_row(l, &row) != 0)
> +		goto finish;
> +	rc = 0;
> +finish:
> +	region_truncate(region, svp);
> +	return rc;
> +}
> +
>   static int
>   checkpoint_f(va_list ap)
>   {
> @@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
>   		if (rc != 0)
>   			goto fail;
>   	}
> +	if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
> +		goto fail;
>   	if (xlog_flush(&snap) < 0)
>   		goto fail;
>   
> diff --git a/src/box/raft.c b/src/box/raft.c
> new file mode 100644
> index 000000000..511fe42f5
> --- /dev/null
> +++ b/src/box/raft.c
> @@ -0,0 +1,65 @@
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include "raft.h"
> +
> +#include "error.h"
> +#include "journal.h"
> +#include "xrow.h"
> +#include "small/region.h"
> +
> +/** Raft state of this instance. */
> +struct raft raft = {
> +	.term = 1,
> +	.vote = 0,
> +};
> +
> +void
> +raft_process_recovery(const struct raft_request *req)
> +{
> +	if (req->term != 0)
> +		raft.term = req->term;
> +	if (req->vote != 0)
> +		raft.vote = req->vote;
> +}
> +
> +void
> +raft_serialize_for_network(struct raft_request *req)
> +{
> +	req->term = raft.term;
> +	req->vote = raft.vote;
> +}
> +
> +void
> +raft_serialize_for_disk(struct raft_request *req)
> +{
> +	req->term = raft.term;
> +	req->vote = raft.vote;
> +}
> diff --git a/src/box/raft.h b/src/box/raft.h
> new file mode 100644
> index 000000000..31f7becdb
> --- /dev/null
> +++ b/src/box/raft.h
> @@ -0,0 +1,67 @@
> +#pragma once
> +/*
> + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
> + *
> + * Redistribution and use in source and binary forms, with or
> + * without modification, are permitted provided that the following
> + * conditions are met:
> + *
> + * 1. Redistributions of source code must retain the above
> + *    copyright notice, this list of conditions and the
> + *    following disclaimer.
> + *
> + * 2. Redistributions in binary form must reproduce the above
> + *    copyright notice, this list of conditions and the following
> + *    disclaimer in the documentation and/or other materials
> + *    provided with the distribution.
> + *
> + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
> + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
> + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
> + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
> + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
> + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
> + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
> + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
> + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
> + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
> + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
> + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
> + * SUCH DAMAGE.
> + */
> +#include <stdint.h>
> +
> +#if defined(__cplusplus)
> +extern "C" {
> +#endif
> +
> +struct raft_request;
> +
> +struct raft {
> +	uint64_t term;
> +	uint32_t vote;
> +};
> +
> +extern struct raft raft;
> +
> +/** Process a raft entry stored in WAL/snapshot. */
> +void
> +raft_process_recovery(const struct raft_request *req);
> +
> +/**
> + * Save complete Raft state into a request to be sent to other instances of the
> + * cluster. It is allowed to save anything here, not only persistent state.
> + */
> +void
> +raft_serialize_for_network(struct raft_request *req);
> +
> +/**
> + * Save complete Raft state into a request to be persisted on disk. Only term
> + * and vote are being persisted.
> + */
> +void
> +raft_serialize_for_disk(struct raft_request *req);
> +
> +#if defined(__cplusplus)
> +}
> +#endif
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> index 95ddb1fe7..1923bacfc 100644
> --- a/src/box/xrow.c
> +++ b/src/box/xrow.c
> @@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
>   	return 0;
>   }
>   
> +int
> +xrow_encode_raft(struct xrow_header *row, struct region *region,
> +		 const struct raft_request *r)
> +{
> +	size_t size = mp_sizeof_map(2) +
> +		      mp_sizeof_uint(IPROTO_RAFT_TERM) +
> +		      mp_sizeof_uint(r->term) +
> +		      mp_sizeof_uint(IPROTO_RAFT_VOTE) +
> +		      mp_sizeof_uint(r->vote);
> +	char *buf = region_alloc(region, size);
> +	if (buf == NULL) {
> +		diag_set(OutOfMemory, size, "region_alloc", "buf");
> +		return -1;
> +	}
> +	memset(row, 0, sizeof(*row));
> +	row->type = IPROTO_RAFT;
> +	row->body[0].iov_base = buf;
> +	row->body[0].iov_len = size;
> +	row->group_id = GROUP_LOCAL;
> +	row->bodycnt = 1;
> +	buf = mp_encode_map(buf, 2);
> +	buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
> +	buf = mp_encode_uint(buf, r->term);
> +	buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
> +	buf = mp_encode_uint(buf, r->vote);
> +	return 0;
> +}
> +
> +int
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
> +{
> +	/* TODO: handle bad format. */
> +	assert(row->type == IPROTO_RAFT);
> +	assert(row->bodycnt == 1);
> +	assert(row->group_id == GROUP_LOCAL);
> +	memset(r, 0, sizeof(*r));
> +	const char *pos = row->body[0].iov_base;
> +	uint32_t map_size = mp_decode_map(&pos);
> +	for (uint32_t i = 0; i < map_size; ++i)
> +	{
> +		uint64_t key = mp_decode_uint(&pos);
> +		switch (key) {
> +		case IPROTO_RAFT_TERM:
> +			r->term = mp_decode_uint(&pos);
> +			break;
> +		case IPROTO_RAFT_VOTE:
> +			r->vote = mp_decode_uint(&pos);
> +			break;
> +		default:
> +			mp_next(&pos);
> +			break;
> +		}
> +	}
> +	return 0;
> +}
> +
>   int
>   xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
>   {
> diff --git a/src/box/xrow.h b/src/box/xrow.h
> index 58d47b12d..c234f6f88 100644
> --- a/src/box/xrow.h
> +++ b/src/box/xrow.h
> @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
>   int
>   xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
>   
> +struct raft_request {
> +	uint64_t term;
> +	uint32_t vote;
> +};
> +
> +int
> +xrow_encode_raft(struct xrow_header *row, struct region *region,
> +		 const struct raft_request *r);
> +
> +int
> +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
> +
>   /**
>    * CALL/EVAL request.
>    */

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list