From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id DE44E43040D for ; Wed, 26 Aug 2020 10:52:53 +0300 (MSK) From: Serge Petrenko Date: Wed, 26 Aug 2020 10:52:33 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org Cc: tarantool-patches@dev.tarantool.org From: Vladislav Shpilevoy box.internal.raft_*() helper functions were introduced to test the persistency. Any state change is saved into WAL and into snapshot. --- src/box/CMakeLists.txt | 1 + src/box/box.cc | 8 +++ src/box/iproto_constants.h | 13 ++++ src/box/lua/misc.cc | 35 +++++++++++ src/box/memtx_engine.c | 35 +++++++++++ src/box/raft.c | 120 +++++++++++++++++++++++++++++++++++++ src/box/raft.h | 61 +++++++++++++++++++ src/box/xrow.c | 56 +++++++++++++++++ src/box/xrow.h | 12 ++++ 9 files changed, 341 insertions(+) create mode 100644 src/box/raft.c create mode 100644 src/box/raft.h diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index b8b2689d2..29c3bfe79 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -170,6 +170,7 @@ add_library(box STATIC port.c txn.c txn_limbo.c + raft.c box.cc gc.c checkpoint_schedule.c diff --git a/src/box/box.cc b/src/box/box.cc index faffd5769..c0adccc6a 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -78,6 +78,7 @@ #include "sequence.h" #include "sql_stmt_cache.h" #include "msgpack.h" +#include "raft.h" #include "trivia/util.h" static char status[64] = "unknown"; @@ -374,6 +375,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) diag_raise(); return; } + if (iproto_type_is_raft_request(row->type)) { + struct raft_request raft_req; + if (xrow_decode_raft(row, &raft_req) != 0) + diag_raise(); + raft_process(&raft_req); + return; + } xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); if (request.type != IPROTO_NOP) { struct space *space = space_cache_find_xc(request.space_id); diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 4f5a2b195..8a11626b3 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -219,6 +219,8 @@ enum iproto_type { /** The maximum typecode used for box.stat() */ IPROTO_TYPE_STAT_MAX, + IPROTO_RAFT = 30, + /** A confirmation message for synchronous transactions. */ IPROTO_CONFIRM = 40, /** A rollback message for synchronous transactions. */ @@ -258,6 +260,11 @@ enum iproto_type { /** IPROTO type name by code */ extern const char *iproto_type_strs[]; +enum iproto_raft_keys { + IPROTO_RAFT_TERM = 0, + IPROTO_RAFT_VOTE = 1, +}; + /** * Returns IPROTO type name by @a type code. * @param type IPROTO type. @@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type) return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK; } +static inline bool +iproto_type_is_raft_request(uint32_t type) +{ + return type == IPROTO_RAFT; +} + /** This is an error. */ static inline bool iproto_type_is_error(uint32_t type) diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc index 5da84b35a..98e98abe2 100644 --- a/src/box/lua/misc.cc +++ b/src/box/lua/misc.cc @@ -40,6 +40,8 @@ #include "box/tuple.h" #include "box/tuple_format.h" #include "box/lua/tuple.h" +#include "box/raft.h" +#include "box/xrow.h" #include "mpstream/mpstream.h" static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR; @@ -246,12 +248,45 @@ lbox_tuple_format_new(struct lua_State *L) /* }}} */ +static int +lbox_raft_new_term(struct lua_State *L) +{ + uint64_t min_term = luaL_checkuint64(L, 1); + raft_new_term(min_term); + return 0; +} + +static int +lbox_raft_vote(struct lua_State *L) +{ + uint64_t vote_for = luaL_checkuint64(L, 1); + if (vote_for > UINT32_MAX) + return luaL_error(L, "Invalid vote"); + raft_vote(vote_for); + return 0; +} + +static int +lbox_raft_get(struct lua_State *L) +{ + lua_createtable(L, 0, 2); + luaL_pushuint64(L, raft.term); + lua_setfield(L, -2, "term"); + luaL_pushuint64(L, raft.vote); + lua_setfield(L, -2, "vote"); + return 1; +} + void box_lua_misc_init(struct lua_State *L) { static const struct luaL_Reg boxlib_internal[] = { {"select", lbox_select}, {"new_tuple_format", lbox_tuple_format_new}, + /* Temporary helpers to sanity test raft persistency. */ + {"raft_new_term", lbox_raft_new_term}, + {"raft_vote", lbox_raft_vote}, + {"raft_get", lbox_raft_get}, {NULL, NULL} }; diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index dfd6fce6e..26274de80 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -48,6 +48,7 @@ #include "replication.h" #include "schema.h" #include "gc.h" +#include "raft.h" /* sync snapshot every 16MB */ #define SNAP_SYNC_INTERVAL (1 << 24) @@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, return 0; } +static int +memtx_engine_recover_raft(const struct xrow_header *row) +{ + assert(row->type == IPROTO_RAFT); + struct raft_request req; + if (xrow_decode_raft(row, &req) != 0) + return -1; + raft_process(&req); + return 0; +} + static int memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, struct xrow_header *row) { assert(row->bodycnt == 1); /* always 1 for read */ if (row->type != IPROTO_INSERT) { + if (row->type == IPROTO_RAFT) + return memtx_engine_recover_raft(row); diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row->type); return -1; @@ -477,6 +491,7 @@ struct checkpoint { /** The vclock of the snapshot file. */ struct vclock vclock; struct xdir dir; + struct raft_request raft; /** * Do nothing, just touch the snapshot file - the * checkpoint already exists. @@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) opts.free_cache = true; xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); vclock_create(&ckpt->vclock); + raft_serialize(&ckpt->raft); ckpt->touch = false; return ckpt; } @@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data) return 0; }; +static int +checkpoint_write_raft(struct xlog *l, const struct raft_request *req) +{ + struct xrow_header row; + struct region *region = &fiber()->gc; + uint32_t svp = region_used(region); + int rc = -1; + if (xrow_encode_raft(&row, region, req) != 0) + goto finish; + if (checkpoint_write_row(l, &row) != 0) + goto finish; + rc = 0; +finish: + region_truncate(region, svp); + return rc; +} + static int checkpoint_f(va_list ap) { @@ -607,6 +640,8 @@ checkpoint_f(va_list ap) if (rc != 0) goto fail; } + if (checkpoint_write_raft(&snap, &ckpt->raft) != 0) + goto fail; if (xlog_flush(&snap) < 0) goto fail; diff --git a/src/box/raft.c b/src/box/raft.c new file mode 100644 index 000000000..5465f46b6 --- /dev/null +++ b/src/box/raft.c @@ -0,0 +1,120 @@ +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "raft.h" + +#include "error.h" +#include "journal.h" +#include "xrow.h" +#include "small/region.h" + +/** Raft state of this instance. */ +struct raft raft = { + .term = 0, + .vote = 0, +}; + +void +raft_process(const struct raft_request *req) +{ + if (req->term != 0) + raft.term = req->term; + if (req->vote != 0) + raft.vote = req->vote; +} + +void +raft_serialize(struct raft_request *req) +{ + req->term = raft.term; + req->vote = raft.vote; +} + +static void +raft_write_cb(struct journal_entry *entry) +{ + fiber_wakeup(entry->complete_data); +} + +static void +raft_write_request(const struct raft_request *req) +{ + struct region *region = &fiber()->gc; + uint32_t svp = region_used(region); + struct xrow_header row; + char buf[sizeof(struct journal_entry) + + sizeof(struct xrow_header *)]; + struct journal_entry *entry = (struct journal_entry *)buf; + entry->rows[0] = &row; + + if (xrow_encode_raft(&row, region, req) != 0) + goto fail; + journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb, + fiber()); + + if (journal_write(entry) != 0 || entry->res < 0) { + diag_set(ClientError, ER_WAL_IO); + diag_log(); + goto fail; + } + region_truncate(region, svp); + return; +fail: + /* + * XXX: the stub is supposed to be removed once it is defined what to do + * when a raft request WAL write fails. + */ + panic("Could not write a raft request to WAL\n"); +} + +void +raft_new_term(uint64_t min_new_term) +{ + if (raft.term < min_new_term) + raft.term = min_new_term + 1; + else + ++raft.term; + + struct raft_request req; + memset(&req, 0, sizeof(req)); + req.term = raft.term; + raft_write_request(&req); +} + +void +raft_vote(uint32_t vote_for) +{ + raft.vote = vote_for; + + struct raft_request req; + memset(&req, 0, sizeof(req)); + req.vote = vote_for; + raft_write_request(&req); +} diff --git a/src/box/raft.h b/src/box/raft.h new file mode 100644 index 000000000..1f392033d --- /dev/null +++ b/src/box/raft.h @@ -0,0 +1,61 @@ +#pragma once +/* + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include + +struct raft_request; + +#if defined(__cplusplus) +extern "C" { +#endif + +struct raft { + uint64_t term; + uint32_t vote; +}; + +extern struct raft raft; + +void +raft_new_term(uint64_t min_new_term); + +void +raft_vote(uint32_t vote_for); + +void +raft_process(const struct raft_request *req); + +void +raft_serialize(struct raft_request *req); + +#if defined(__cplusplus) +} +#endif diff --git a/src/box/xrow.c b/src/box/xrow.c index 95ddb1fe7..1923bacfc 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) return 0; } +int +xrow_encode_raft(struct xrow_header *row, struct region *region, + const struct raft_request *r) +{ + size_t size = mp_sizeof_map(2) + + mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term) + + mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + char *buf = region_alloc(region, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "region_alloc", "buf"); + return -1; + } + memset(row, 0, sizeof(*row)); + row->type = IPROTO_RAFT; + row->body[0].iov_base = buf; + row->body[0].iov_len = size; + row->group_id = GROUP_LOCAL; + row->bodycnt = 1; + buf = mp_encode_map(buf, 2); + buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); + buf = mp_encode_uint(buf, r->term); + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + return 0; +} + +int +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +{ + /* TODO: handle bad format. */ + assert(row->type == IPROTO_RAFT); + assert(row->bodycnt == 1); + assert(row->group_id == GROUP_LOCAL); + memset(r, 0, sizeof(*r)); + const char *pos = row->body[0].iov_base; + uint32_t map_size = mp_decode_map(&pos); + for (uint32_t i = 0; i < map_size; ++i) + { + uint64_t key = mp_decode_uint(&pos); + switch (key) { + case IPROTO_RAFT_TERM: + r->term = mp_decode_uint(&pos); + break; + case IPROTO_RAFT_VOTE: + r->vote = mp_decode_uint(&pos); + break; + default: + mp_next(&pos); + break; + } + } + return 0; +} + int xrow_to_iovec(const struct xrow_header *row, struct iovec *out) { diff --git a/src/box/xrow.h b/src/box/xrow.h index 58d47b12d..c234f6f88 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row, int xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); +struct raft_request { + uint64_t term; + uint32_t vote; +}; + +int +xrow_encode_raft(struct xrow_header *row, struct region *region, + const struct raft_request *r); + +int +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); + /** * CALL/EVAL request. */ -- 2.20.1 (Apple Git-117)