From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp57.i.mail.ru (smtp57.i.mail.ru [217.69.128.37]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 8944743040C for ; Fri, 4 Sep 2020 11:59:28 +0300 (MSK) References: <7abf6de9867b1c7dcfcff8e101fdb7dfed966530.1599173312.git.v.shpilevoy@tarantool.org> From: Serge Petrenko Message-ID: <3485dcec-c8b8-6f2e-96ef-95ed8a445225@tarantool.org> Date: Fri, 4 Sep 2020 11:59:25 +0300 MIME-Version: 1.0 In-Reply-To: <7abf6de9867b1c7dcfcff8e101fdb7dfed966530.1599173312.git.v.shpilevoy@tarantool.org> Content-Type: text/plain; charset="utf-8"; format="flowed" Content-Transfer-Encoding: 8bit Content-Language: ru Subject: Re: [Tarantool-patches] [PATCH v2 05/10] raft: introduce persistent raft state List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Vladislav Shpilevoy , tarantool-patches@dev.tarantool.org, gorcunov@gmail.com 04.09.2020 01:51, Vladislav Shpilevoy пишет: > box.internal.raft_*() helper functions were introduced to test > the persistency. Any state change is saved into WAL and into > snapshot. > > Part of #1146 > --- LGTM. > src/box/CMakeLists.txt | 1 + > src/box/box.cc | 8 +++++ > src/box/iproto_constants.h | 13 ++++++++ > src/box/lua/misc.cc | 1 + > src/box/memtx_engine.c | 35 ++++++++++++++++++++ > src/box/raft.c | 65 ++++++++++++++++++++++++++++++++++++ > src/box/raft.h | 67 ++++++++++++++++++++++++++++++++++++++ > src/box/xrow.c | 56 +++++++++++++++++++++++++++++++ > src/box/xrow.h | 12 +++++++ > 9 files changed, 258 insertions(+) > create mode 100644 src/box/raft.c > create mode 100644 src/box/raft.h > > diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt > index b8b2689d2..29c3bfe79 100644 > --- a/src/box/CMakeLists.txt > +++ b/src/box/CMakeLists.txt > @@ -170,6 +170,7 @@ add_library(box STATIC > port.c > txn.c > txn_limbo.c > + raft.c > box.cc > gc.c > checkpoint_schedule.c > diff --git a/src/box/box.cc b/src/box/box.cc > index eeb00d5e2..281917af2 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -78,6 +78,7 @@ > #include "sequence.h" > #include "sql_stmt_cache.h" > #include "msgpack.h" > +#include "raft.h" > #include "trivia/util.h" > > static char status[64] = "unknown"; > @@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) > diag_raise(); > return; > } > + if (iproto_type_is_raft_request(row->type)) { > + struct raft_request raft_req; > + if (xrow_decode_raft(row, &raft_req) != 0) > + diag_raise(); > + raft_process_recovery(&raft_req); > + return; > + } > xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); > if (request.type != IPROTO_NOP) { > struct space *space = space_cache_find_xc(request.space_id); > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 4f5a2b195..8a11626b3 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -219,6 +219,8 @@ enum iproto_type { > /** The maximum typecode used for box.stat() */ > IPROTO_TYPE_STAT_MAX, > > + IPROTO_RAFT = 30, > + > /** A confirmation message for synchronous transactions. */ > IPROTO_CONFIRM = 40, > /** A rollback message for synchronous transactions. */ > @@ -258,6 +260,11 @@ enum iproto_type { > /** IPROTO type name by code */ > extern const char *iproto_type_strs[]; > > +enum iproto_raft_keys { > + IPROTO_RAFT_TERM = 0, > + IPROTO_RAFT_VOTE = 1, > +}; > + > /** > * Returns IPROTO type name by @a type code. > * @param type IPROTO type. > @@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type) > return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK; > } > > +static inline bool > +iproto_type_is_raft_request(uint32_t type) > +{ > + return type == IPROTO_RAFT; > +} > + > /** This is an error. */ > static inline bool > iproto_type_is_error(uint32_t type) > diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc > index 5da84b35a..e356f2d4b 100644 > --- a/src/box/lua/misc.cc > +++ b/src/box/lua/misc.cc > @@ -40,6 +40,7 @@ > #include "box/tuple.h" > #include "box/tuple_format.h" > #include "box/lua/tuple.h" > +#include "box/xrow.h" > #include "mpstream/mpstream.h" > > static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR; > diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c > index dfd6fce6e..7ebed7aa8 100644 > --- a/src/box/memtx_engine.c > +++ b/src/box/memtx_engine.c > @@ -48,6 +48,7 @@ > #include "replication.h" > #include "schema.h" > #include "gc.h" > +#include "raft.h" > > /* sync snapshot every 16MB */ > #define SNAP_SYNC_INTERVAL (1 << 24) > @@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, > return 0; > } > > +static int > +memtx_engine_recover_raft(const struct xrow_header *row) > +{ > + assert(row->type == IPROTO_RAFT); > + struct raft_request req; > + if (xrow_decode_raft(row, &req) != 0) > + return -1; > + raft_process_recovery(&req); > + return 0; > +} > + > static int > memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, > struct xrow_header *row) > { > assert(row->bodycnt == 1); /* always 1 for read */ > if (row->type != IPROTO_INSERT) { > + if (row->type == IPROTO_RAFT) > + return memtx_engine_recover_raft(row); > diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE, > (uint32_t) row->type); > return -1; > @@ -477,6 +491,7 @@ struct checkpoint { > /** The vclock of the snapshot file. */ > struct vclock vclock; > struct xdir dir; > + struct raft_request raft; > /** > * Do nothing, just touch the snapshot file - the > * checkpoint already exists. > @@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) > opts.free_cache = true; > xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); > vclock_create(&ckpt->vclock); > + raft_serialize_for_disk(&ckpt->raft); > ckpt->touch = false; > return ckpt; > } > @@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data) > return 0; > }; > > +static int > +checkpoint_write_raft(struct xlog *l, const struct raft_request *req) > +{ > + struct xrow_header row; > + struct region *region = &fiber()->gc; > + uint32_t svp = region_used(region); > + int rc = -1; > + if (xrow_encode_raft(&row, region, req) != 0) > + goto finish; > + if (checkpoint_write_row(l, &row) != 0) > + goto finish; > + rc = 0; > +finish: > + region_truncate(region, svp); > + return rc; > +} > + > static int > checkpoint_f(va_list ap) > { > @@ -607,6 +640,8 @@ checkpoint_f(va_list ap) > if (rc != 0) > goto fail; > } > + if (checkpoint_write_raft(&snap, &ckpt->raft) != 0) > + goto fail; > if (xlog_flush(&snap) < 0) > goto fail; > > diff --git a/src/box/raft.c b/src/box/raft.c > new file mode 100644 > index 000000000..511fe42f5 > --- /dev/null > +++ b/src/box/raft.c > @@ -0,0 +1,65 @@ > +/* > + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > +#include "raft.h" > + > +#include "error.h" > +#include "journal.h" > +#include "xrow.h" > +#include "small/region.h" > + > +/** Raft state of this instance. */ > +struct raft raft = { > + .term = 1, > + .vote = 0, > +}; > + > +void > +raft_process_recovery(const struct raft_request *req) > +{ > + if (req->term != 0) > + raft.term = req->term; > + if (req->vote != 0) > + raft.vote = req->vote; > +} > + > +void > +raft_serialize_for_network(struct raft_request *req) > +{ > + req->term = raft.term; > + req->vote = raft.vote; > +} > + > +void > +raft_serialize_for_disk(struct raft_request *req) > +{ > + req->term = raft.term; > + req->vote = raft.vote; > +} > diff --git a/src/box/raft.h b/src/box/raft.h > new file mode 100644 > index 000000000..31f7becdb > --- /dev/null > +++ b/src/box/raft.h > @@ -0,0 +1,67 @@ > +#pragma once > +/* > + * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file. > + * > + * Redistribution and use in source and binary forms, with or > + * without modification, are permitted provided that the following > + * conditions are met: > + * > + * 1. Redistributions of source code must retain the above > + * copyright notice, this list of conditions and the > + * following disclaimer. > + * > + * 2. Redistributions in binary form must reproduce the above > + * copyright notice, this list of conditions and the following > + * disclaimer in the documentation and/or other materials > + * provided with the distribution. > + * > + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND > + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED > + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR > + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL > + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, > + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL > + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF > + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR > + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF > + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT > + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF > + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF > + * SUCH DAMAGE. > + */ > +#include > + > +#if defined(__cplusplus) > +extern "C" { > +#endif > + > +struct raft_request; > + > +struct raft { > + uint64_t term; > + uint32_t vote; > +}; > + > +extern struct raft raft; > + > +/** Process a raft entry stored in WAL/snapshot. */ > +void > +raft_process_recovery(const struct raft_request *req); > + > +/** > + * Save complete Raft state into a request to be sent to other instances of the > + * cluster. It is allowed to save anything here, not only persistent state. > + */ > +void > +raft_serialize_for_network(struct raft_request *req); > + > +/** > + * Save complete Raft state into a request to be persisted on disk. Only term > + * and vote are being persisted. > + */ > +void > +raft_serialize_for_disk(struct raft_request *req); > + > +#if defined(__cplusplus) > +} > +#endif > diff --git a/src/box/xrow.c b/src/box/xrow.c > index 95ddb1fe7..1923bacfc 100644 > --- a/src/box/xrow.c > +++ b/src/box/xrow.c > @@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) > return 0; > } > > +int > +xrow_encode_raft(struct xrow_header *row, struct region *region, > + const struct raft_request *r) > +{ > + size_t size = mp_sizeof_map(2) + > + mp_sizeof_uint(IPROTO_RAFT_TERM) + > + mp_sizeof_uint(r->term) + > + mp_sizeof_uint(IPROTO_RAFT_VOTE) + > + mp_sizeof_uint(r->vote); > + char *buf = region_alloc(region, size); > + if (buf == NULL) { > + diag_set(OutOfMemory, size, "region_alloc", "buf"); > + return -1; > + } > + memset(row, 0, sizeof(*row)); > + row->type = IPROTO_RAFT; > + row->body[0].iov_base = buf; > + row->body[0].iov_len = size; > + row->group_id = GROUP_LOCAL; > + row->bodycnt = 1; > + buf = mp_encode_map(buf, 2); > + buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); > + buf = mp_encode_uint(buf, r->term); > + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); > + buf = mp_encode_uint(buf, r->vote); > + return 0; > +} > + > +int > +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) > +{ > + /* TODO: handle bad format. */ > + assert(row->type == IPROTO_RAFT); > + assert(row->bodycnt == 1); > + assert(row->group_id == GROUP_LOCAL); > + memset(r, 0, sizeof(*r)); > + const char *pos = row->body[0].iov_base; > + uint32_t map_size = mp_decode_map(&pos); > + for (uint32_t i = 0; i < map_size; ++i) > + { > + uint64_t key = mp_decode_uint(&pos); > + switch (key) { > + case IPROTO_RAFT_TERM: > + r->term = mp_decode_uint(&pos); > + break; > + case IPROTO_RAFT_VOTE: > + r->vote = mp_decode_uint(&pos); > + break; > + default: > + mp_next(&pos); > + break; > + } > + } > + return 0; > +} > + > int > xrow_to_iovec(const struct xrow_header *row, struct iovec *out) > { > diff --git a/src/box/xrow.h b/src/box/xrow.h > index 58d47b12d..c234f6f88 100644 > --- a/src/box/xrow.h > +++ b/src/box/xrow.h > @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row, > int > xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); > > +struct raft_request { > + uint64_t term; > + uint32_t vote; > +}; > + > +int > +xrow_encode_raft(struct xrow_header *row, struct region *region, > + const struct raft_request *r); > + > +int > +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); > + > /** > * CALL/EVAL request. > */ -- Serge Petrenko