From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id AA27444643A for ; Thu, 10 Sep 2020 02:17:10 +0300 (MSK) From: Vladislav Shpilevoy Date: Thu, 10 Sep 2020 01:16:57 +0200 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 06/11] raft: introduce persistent raft state List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org, gorcunov@gmail.com The patch introduces a sceleton of Raft module and a method to persist a Raft state in snapshot, not bound to any space. Part of #1146 --- src/box/CMakeLists.txt | 1 + src/box/box.cc | 8 +++++ src/box/iproto_constants.h | 13 ++++++++ src/box/lua/misc.cc | 1 + src/box/memtx_engine.c | 35 ++++++++++++++++++++ src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++++ src/box/raft.h | 67 ++++++++++++++++++++++++++++++++++++++ src/box/xrow.c | 56 +++++++++++++++++++++++++++++++ src/box/xrow.h | 12 +++++++ 9 files changed, 258 insertions(+) create mode 100644 src/box/raft.c create mode 100644 src/box/raft.h diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index b8b2689d2..29c3bfe79 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -170,6 +170,7 @@ add_library(box STATIC port.c txn.c txn_limbo.c + raft.c box.cc gc.c checkpoint_schedule.c diff --git a/src/box/box.cc b/src/box/box.cc index 3214ec340..1a19058b3 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -78,6 +78,7 @@ #include "sequence.h" #include "sql_stmt_cache.h" #include "msgpack.h" +#include "raft.h" #include "trivia/util.h" static char status[64] = "unknown"; @@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) diag_raise(); return; } + if (iproto_type_is_raft_request(row->type)) { + struct raft_request raft_req; + if (xrow_decode_raft(row, &raft_req) != 0) + diag_raise(); + raft_process_recovery(&raft_req); + return; + } xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { struct space *space = space_cache_find_xc(request.space_id); diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 4f5a2b195..8a11626b3 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -219,6 +219,8 @@ enum iproto_type { /** The maximum typecode used for box.stat() */ IPROTO_TYPE_STAT_MAX, + IPROTO_RAFT = 30, + /** A confirmation message for synchronous transactions. */ IPROTO_CONFIRM = 40, /** A rollback message for synchronous transactions. */ @@ -258,6 +260,11 @@ enum iproto_type { /** IPROTO type name by code */ extern const char *iproto_type_strs[]; +enum iproto_raft_keys { + IPROTO_RAFT_TERM = 0, + IPROTO_RAFT_VOTE = 1, +}; + /** * Returns IPROTO type name by @a type code. * @param type IPROTO type. @@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type) return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK; } +static inline bool +iproto_type_is_raft_request(uint32_t type) +{ + return type == IPROTO_RAFT; +} + /** This is an error. */ static inline bool iproto_type_is_error(uint32_t type) diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc index 5da84b35a..e356f2d4b 100644 --- a/src/box/lua/misc.cc +++ b/src/box/lua/misc.cc @@ -40,6 +40,7 @@ #include "box/tuple.h" #include "box/tuple_format.h" #include "box/lua/tuple.h" +#include "box/xrow.h" #include "mpstream/mpstream.h" static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR; diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 9f079a6b5..5ab2cf266 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -48,6 +48,7 @@ #include "replication.h" #include "schema.h" #include "gc.h" +#include "raft.h" /* sync snapshot every 16MB */ #define SNAP_SYNC_INTERVAL (1 << 24) @@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, return 0; } +static int +memtx_engine_recover_raft(const struct xrow_header *row) +{ + assert(row->type == IPROTO_RAFT); + struct raft_request req; + if (xrow_decode_raft(row, &req) != 0) + return -1; + raft_process_recovery(&req); + return 0; +} + static int memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, struct xrow_header *row) { assert(row->bodycnt == 1); /* always 1 for read */ if (row->type != IPROTO_INSERT) { + if (row->type == IPROTO_RAFT) + return memtx_engine_recover_raft(row); diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row->type); return -1; @@ -477,6 +491,7 @@ struct checkpoint { /** The vclock of the snapshot file. */ struct vclock vclock; struct xdir dir; + struct raft_request raft; /** * Do nothing, just touch the snapshot file - the * checkpoint already exists. @@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) opts.free_cache = true; xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); vclock_create(&ckpt->vclock); + raft_serialize_for_disk(&ckpt->raft); ckpt->touch = false; return ckpt; } @@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data) return 0; }; +static int +checkpoint_write_raft(struct xlog *l, const struct raft_request *req) +{ + struct xrow_header row; + struct region *region = &fiber()->gc; + uint32_t svp = region_used(region); + int rc = -1; + if (xrow_encode_raft(&row, region, req) != 0) + goto finish; + if (checkpoint_write_row(l, &row) != 0) + goto finish; + rc = 0; +finish: + region_truncate(region, svp); + return rc; +} + static int checkpoint_f(va_list ap) { @@ -607,6 +640,8 @@ checkpoint_f(va_list ap) if (rc != 0) goto fail; } + if (checkpoint_write_raft(&snap, &ckpt->raft) != 0) + goto fail; if (xlog_flush(&snap) < 0) goto fail; diff --git a/src/box/raft.c b/src/box/raft.c new file mode 100644 index 000000000..511fe42f5 --- /dev/null +++ b/src/box/raft.c @@ -0,0 +1,65 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "raft.h" + +#include "error.h" +#include "journal.h" +#include "xrow.h" +#include "small/region.h" + +/** Raft state of this instance. */ +struct raft raft = { + .term = 1, + .vote = 0, +}; + +void +raft_process_recovery(const struct raft_request *req) +{ + if (req->term != 0) + raft.term = req->term; + if (req->vote != 0) + raft.vote = req->vote; +} + +void +raft_serialize_for_network(struct raft_request *req) +{ + req->term = raft.term; + req->vote = raft.vote; +} + +void +raft_serialize_for_disk(struct raft_request *req) +{ + req->term = raft.term; + req->vote = raft.vote; +} diff --git a/src/box/raft.h b/src/box/raft.h new file mode 100644 index 000000000..31f7becdb --- /dev/null +++ b/src/box/raft.h @@ -0,0 +1,67 @@ +#pragma once +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include + +#if defined(__cplusplus) +extern "C" { +#endif + +struct raft_request; + +struct raft { + uint64_t term; + uint32_t vote; +}; + +extern struct raft raft; + +/** Process a raft entry stored in WAL/snapshot. */ +void +raft_process_recovery(const struct raft_request *req); + +/** + * Save complete Raft state into a request to be sent to other instances of the + * cluster. It is allowed to save anything here, not only persistent state. + */ +void +raft_serialize_for_network(struct raft_request *req); + +/** + * Save complete Raft state into a request to be persisted on disk. Only term + * and vote are being persisted. + */ +void +raft_serialize_for_disk(struct raft_request *req); + +#if defined(__cplusplus) +} +#endif diff --git a/src/box/xrow.c b/src/box/xrow.c index 95ddb1fe7..1923bacfc 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) return 0; } +int +xrow_encode_raft(struct xrow_header *row, struct region *region, + const struct raft_request *r) +{ + size_t size = mp_sizeof_map(2) + + mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term) + + mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + char *buf = region_alloc(region, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } + memset(row, 0, sizeof(*row)); + row->type = IPROTO_RAFT; + row->body[0].iov_base = buf; + row->body[0].iov_len = size; + row->group_id = GROUP_LOCAL; + row->bodycnt = 1; + buf = mp_encode_map(buf, 2); + buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); + buf = mp_encode_uint(buf, r->term); + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + return 0; +} + +int +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +{ + /* TODO: handle bad format. */ + assert(row->type == IPROTO_RAFT); + assert(row->bodycnt == 1); + assert(row->group_id == GROUP_LOCAL); + memset(r, 0, sizeof(*r)); + const char *pos = row->body[0].iov_base; + uint32_t map_size = mp_decode_map(&pos); + for (uint32_t i = 0; i < map_size; ++i) + { + uint64_t key = mp_decode_uint(&pos); + switch (key) { + case IPROTO_RAFT_TERM: + r->term = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VOTE: + r->vote = mp_decode_uint(&pos); + break; + default: + mp_next(&pos); + break; + } + } + return 0; +} + int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { diff --git a/src/box/xrow.h b/src/box/xrow.h index 58d47b12d..c234f6f88 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row, int xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); +struct raft_request { + uint64_t term; + uint32_t vote; +}; + +int +xrow_encode_raft(struct xrow_header *row, struct region *region, + const struct raft_request *r); + +int +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); + /** * CALL/EVAL request. */ -- 2.21.1 (Apple Git-122.3)