* [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers Serge Petrenko
` (8 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
box.internal.raft_*() helper functions were introduced to test
the persistency. Any state change is saved into WAL and into
snapshot.
---
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 8 +++
src/box/iproto_constants.h | 13 ++++
src/box/lua/misc.cc | 35 +++++++++++
src/box/memtx_engine.c | 35 +++++++++++
src/box/raft.c | 120 +++++++++++++++++++++++++++++++++++++
src/box/raft.h | 61 +++++++++++++++++++
src/box/xrow.c | 56 +++++++++++++++++
src/box/xrow.h | 12 ++++
9 files changed, 341 insertions(+)
create mode 100644 src/box/raft.c
create mode 100644 src/box/raft.h
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index b8b2689d2..29c3bfe79 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -170,6 +170,7 @@ add_library(box STATIC
port.c
txn.c
txn_limbo.c
+ raft.c
box.cc
gc.c
checkpoint_schedule.c
diff --git a/src/box/box.cc b/src/box/box.cc
index faffd5769..c0adccc6a 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
+#include "raft.h"
#include "trivia/util.h"
static char status[64] = "unknown";
@@ -374,6 +375,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
diag_raise();
return;
}
+ if (iproto_type_is_raft_request(row->type)) {
+ struct raft_request raft_req;
+ if (xrow_decode_raft(row, &raft_req) != 0)
+ diag_raise();
+ raft_process(&raft_req);
+ return;
+ }
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4f5a2b195..8a11626b3 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,8 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
+ IPROTO_RAFT = 30,
+
/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
/** A rollback message for synchronous transactions. */
@@ -258,6 +260,11 @@ enum iproto_type {
/** IPROTO type name by code */
extern const char *iproto_type_strs[];
+enum iproto_raft_keys {
+ IPROTO_RAFT_TERM = 0,
+ IPROTO_RAFT_VOTE = 1,
+};
+
/**
* Returns IPROTO type name by @a type code.
* @param type IPROTO type.
@@ -332,6 +339,12 @@ iproto_type_is_synchro_request(uint32_t type)
return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
}
+static inline bool
+iproto_type_is_raft_request(uint32_t type)
+{
+ return type == IPROTO_RAFT;
+}
+
/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 5da84b35a..98e98abe2 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -40,6 +40,8 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
+#include "box/raft.h"
+#include "box/xrow.h"
#include "mpstream/mpstream.h"
static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
@@ -246,12 +248,45 @@ lbox_tuple_format_new(struct lua_State *L)
/* }}} */
+static int
+lbox_raft_new_term(struct lua_State *L)
+{
+ uint64_t min_term = luaL_checkuint64(L, 1);
+ raft_new_term(min_term);
+ return 0;
+}
+
+static int
+lbox_raft_vote(struct lua_State *L)
+{
+ uint64_t vote_for = luaL_checkuint64(L, 1);
+ if (vote_for > UINT32_MAX)
+ return luaL_error(L, "Invalid vote");
+ raft_vote(vote_for);
+ return 0;
+}
+
+static int
+lbox_raft_get(struct lua_State *L)
+{
+ lua_createtable(L, 0, 2);
+ luaL_pushuint64(L, raft.term);
+ lua_setfield(L, -2, "term");
+ luaL_pushuint64(L, raft.vote);
+ lua_setfield(L, -2, "vote");
+ return 1;
+}
+
void
box_lua_misc_init(struct lua_State *L)
{
static const struct luaL_Reg boxlib_internal[] = {
{"select", lbox_select},
{"new_tuple_format", lbox_tuple_format_new},
+ /* Temporary helpers to sanity test raft persistency. */
+ {"raft_new_term", lbox_raft_new_term},
+ {"raft_vote", lbox_raft_vote},
+ {"raft_get", lbox_raft_get},
{NULL, NULL}
};
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index dfd6fce6e..26274de80 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "raft.h"
/* sync snapshot every 16MB */
#define SNAP_SYNC_INTERVAL (1 << 24)
@@ -200,12 +201,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
return 0;
}
+static int
+memtx_engine_recover_raft(const struct xrow_header *row)
+{
+ assert(row->type == IPROTO_RAFT);
+ struct raft_request req;
+ if (xrow_decode_raft(row, &req) != 0)
+ return -1;
+ raft_process(&req);
+ return 0;
+}
+
static int
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct xrow_header *row)
{
assert(row->bodycnt == 1); /* always 1 for read */
if (row->type != IPROTO_INSERT) {
+ if (row->type == IPROTO_RAFT)
+ return memtx_engine_recover_raft(row);
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row->type);
return -1;
@@ -477,6 +491,7 @@ struct checkpoint {
/** The vclock of the snapshot file. */
struct vclock vclock;
struct xdir dir;
+ struct raft_request raft;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
@@ -501,6 +516,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
+ raft_serialize(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
@@ -572,6 +588,23 @@ checkpoint_add_space(struct space *sp, void *data)
return 0;
};
+static int
+checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
+{
+ struct xrow_header row;
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ int rc = -1;
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto finish;
+ if (checkpoint_write_row(l, &row) != 0)
+ goto finish;
+ rc = 0;
+finish:
+ region_truncate(region, svp);
+ return rc;
+}
+
static int
checkpoint_f(va_list ap)
{
@@ -607,6 +640,8 @@ checkpoint_f(va_list ap)
if (rc != 0)
goto fail;
}
+ if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
+ goto fail;
if (xlog_flush(&snap) < 0)
goto fail;
diff --git a/src/box/raft.c b/src/box/raft.c
new file mode 100644
index 000000000..5465f46b6
--- /dev/null
+++ b/src/box/raft.c
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "raft.h"
+
+#include "error.h"
+#include "journal.h"
+#include "xrow.h"
+#include "small/region.h"
+
+/** Raft state of this instance. */
+struct raft raft = {
+ .term = 0,
+ .vote = 0,
+};
+
+void
+raft_process(const struct raft_request *req)
+{
+ if (req->term != 0)
+ raft.term = req->term;
+ if (req->vote != 0)
+ raft.vote = req->vote;
+}
+
+void
+raft_serialize(struct raft_request *req)
+{
+ req->term = raft.term;
+ req->vote = raft.vote;
+}
+
+static void
+raft_write_cb(struct journal_entry *entry)
+{
+ fiber_wakeup(entry->complete_data);
+}
+
+static void
+raft_write_request(const struct raft_request *req)
+{
+ struct region *region = &fiber()->gc;
+ uint32_t svp = region_used(region);
+ struct xrow_header row;
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
+
+ if (xrow_encode_raft(&row, region, req) != 0)
+ goto fail;
+ journal_entry_create(entry, 1, xrow_approx_len(&row), raft_write_cb,
+ fiber());
+
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ goto fail;
+ }
+ region_truncate(region, svp);
+ return;
+fail:
+ /*
+ * XXX: the stub is supposed to be removed once it is defined what to do
+ * when a raft request WAL write fails.
+ */
+ panic("Could not write a raft request to WAL\n");
+}
+
+void
+raft_new_term(uint64_t min_new_term)
+{
+ if (raft.term < min_new_term)
+ raft.term = min_new_term + 1;
+ else
+ ++raft.term;
+
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.term = raft.term;
+ raft_write_request(&req);
+}
+
+void
+raft_vote(uint32_t vote_for)
+{
+ raft.vote = vote_for;
+
+ struct raft_request req;
+ memset(&req, 0, sizeof(req));
+ req.vote = vote_for;
+ raft_write_request(&req);
+}
diff --git a/src/box/raft.h b/src/box/raft.h
new file mode 100644
index 000000000..1f392033d
--- /dev/null
+++ b/src/box/raft.h
@@ -0,0 +1,61 @@
+#pragma once
+/*
+ * Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+struct raft_request;
+
+#if defined(__cplusplus)
+extern "C" {
+#endif
+
+struct raft {
+ uint64_t term;
+ uint32_t vote;
+};
+
+extern struct raft raft;
+
+void
+raft_new_term(uint64_t min_new_term);
+
+void
+raft_vote(uint32_t vote_for);
+
+void
+raft_process(const struct raft_request *req);
+
+void
+raft_serialize(struct raft_request *req);
+
+#if defined(__cplusplus)
+}
+#endif
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 95ddb1fe7..1923bacfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -954,6 +954,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
return 0;
}
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r)
+{
+ size_t size = mp_sizeof_map(2) +
+ mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term) +
+ mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ char *buf = region_alloc(region, size);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "buf");
+ return -1;
+ }
+ memset(row, 0, sizeof(*row));
+ row->type = IPROTO_RAFT;
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = size;
+ row->group_id = GROUP_LOCAL;
+ row->bodycnt = 1;
+ buf = mp_encode_map(buf, 2);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
+ buf = mp_encode_uint(buf, r->term);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ return 0;
+}
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+{
+ /* TODO: handle bad format. */
+ assert(row->type == IPROTO_RAFT);
+ assert(row->bodycnt == 1);
+ assert(row->group_id == GROUP_LOCAL);
+ memset(r, 0, sizeof(*r));
+ const char *pos = row->body[0].iov_base;
+ uint32_t map_size = mp_decode_map(&pos);
+ for (uint32_t i = 0; i < map_size; ++i)
+ {
+ uint64_t key = mp_decode_uint(&pos);
+ switch (key) {
+ case IPROTO_RAFT_TERM:
+ r->term = mp_decode_uint(&pos);
+ break;
+ case IPROTO_RAFT_VOTE:
+ r->vote = mp_decode_uint(&pos);
+ break;
+ default:
+ mp_next(&pos);
+ break;
+ }
+ }
+ return 0;
+}
+
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 58d47b12d..c234f6f88 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+};
+
+int
+xrow_encode_raft(struct xrow_header *row, struct region *region,
+ const struct raft_request *r);
+
+int
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+
/**
* CALL/EVAL request.
*/
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-27 20:36 ` Vladislav Shpilevoy
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 03/10] [tosquash] raft: return raft_request to xrow Serge Petrenko
` (7 subsequent siblings)
9 siblings, 1 reply; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: sergepetrenko <sergepetrenko@tarantool.org>
The patch introduces a new type of system message used to notify the
followers of the instance's raft status updates.
It's relay's responsibility to deliver the new system rows to its peers.
The notification system reuses and extends the same row type used to
persist raft state in WAL and snapshot.
Part of #1146
Part of #5204
---
src/box/applier.cc | 34 +++++++++++++++++++---
src/box/box.cc | 17 ++++++++++-
src/box/iproto_constants.h | 1 +
src/box/raft.c | 58 ++++++++++++++++++++++++++++++++++++++
src/box/raft.h | 39 +++++++++++++++++++++++--
src/box/relay.cc | 34 ++++++++++++++++++++--
src/box/relay.h | 14 +++++++++
src/box/xrow.c | 38 +++++++++++++++++++++++--
src/box/xrow.h | 5 +---
9 files changed, 225 insertions(+), 15 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index c1d07ca54..f27436b79 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -55,6 +55,7 @@
#include "scoped_guard.h"
#include "txn_limbo.h"
#include "journal.h"
+#include "raft.h"
STRS(applier_state, applier_STATE);
@@ -298,6 +299,8 @@ apply_final_join_row(struct xrow_header *row)
*/
if (iproto_type_is_synchro_request(row->type))
return 0;
+ if (iproto_type_is_raft_request(row->type))
+ return 0;
struct txn *txn = txn_begin();
if (txn == NULL)
return -1;
@@ -876,6 +879,23 @@ err:
return -1;
}
+static int
+apply_raft_row(struct xrow_header *row)
+{
+ assert(iproto_type_is_raft_request(row->type));
+
+ struct raft_request req;
+ struct vclock candidate_clock;
+ req.vclock = &candidate_clock;
+
+ if (xrow_decode_raft(row, &req) != 0)
+ return -1;
+
+ raft_process_msg(&req);
+
+ return 0;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -1219,11 +1239,17 @@ applier_subscribe(struct applier *applier)
* In case of an heartbeat message wake a writer up
* and check applier state.
*/
- if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0)
- applier_signal_ack(applier);
- else if (applier_apply_tx(&rows) != 0)
+ struct xrow_header *first_row =
+ &stailq_first_entry(&rows, struct applier_tx_row,
+ next)->row;
+ if (first_row->lsn == 0) {
+ if (unlikely(iproto_type_is_raft_request(first_row->type)))
+ apply_raft_row(first_row);
+ else
+ applier_signal_ack(applier);
+ } else if (applier_apply_tx(&rows) != 0) {
diag_raise();
+ }
if (ibuf_used(ibuf) == 0)
ibuf_reset(ibuf);
diff --git a/src/box/box.cc b/src/box/box.cc
index c0adccc6a..8323de531 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2050,7 +2050,22 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
tt_uuid_str(&replica_uuid), sio_socketname(io->fd));
say_info("remote vclock %s local vclock %s",
vclock_to_string(&replica_clock), vclock_to_string(&vclock));
-
+ /*
+ * Send out the current raft state of the instance.
+ */
+ if (raft.state != RAFT_STATE_NONE) {
+ struct raft_request req;
+ req.term = raft.term;
+ req.vote = raft.vote;
+ req.state = raft.state;
+ /*
+ * Omit the candidate vclock, since we've just
+ * sent it in subscribe response.
+ */
+ req.vclock = NULL;
+ xrow_encode_raft(&row, &fiber()->gc, &req);
+ coio_write_xrow(io, &row);
+ }
/*
* Replica clock is used in gc state and recovery
* initialization, so we need to replace the remote 0-th
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8a11626b3..4217ce2e0 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -263,6 +263,7 @@ extern const char *iproto_type_strs[];
enum iproto_raft_keys {
IPROTO_RAFT_TERM = 0,
IPROTO_RAFT_VOTE = 1,
+ IPROTO_RAFT_STATE = 2,
};
/**
diff --git a/src/box/raft.c b/src/box/raft.c
index 5465f46b6..839a7dfeb 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -34,11 +34,15 @@
#include "journal.h"
#include "xrow.h"
#include "small/region.h"
+#include "replication.h"
+#include "relay.h"
/** Raft state of this instance. */
struct raft raft = {
.term = 0,
.vote = 0,
+ .curr_leader = 0,
+ .state = RAFT_STATE_NONE,
};
void
@@ -50,9 +54,36 @@ raft_process(const struct raft_request *req)
raft.vote = req->vote;
}
+void
+raft_process_msg(const struct raft_request *req)
+{
+ if (req->term > raft.term) {
+ // Update term.
+ // The logic will be similar, but the code
+ // below is for testing purposes.
+ raft.term = req->term;
+ }
+ if (req->vote > 0) {
+ // Check whether the vote's for us.
+ }
+ switch (req->state) {
+ case RAFT_STATE_FOLLOWER:
+ break;
+ case RAFT_STATE_CANDIDATE:
+ // Perform voting logic.
+ break;
+ case RAFT_STATE_LEADER:
+ // Switch to a new leader.
+ break;
+ default:
+ break;
+ }
+}
+
void
raft_serialize(struct raft_request *req)
{
+ memset(req, 0, sizeof(*req));
req->term = raft.term;
req->vote = raft.vote;
}
@@ -84,6 +115,9 @@ raft_write_request(const struct raft_request *req)
diag_log();
goto fail;
}
+
+ raft_broadcast(req);
+
region_truncate(region, svp);
return;
fail:
@@ -118,3 +152,27 @@ raft_vote(uint32_t vote_for)
req.vote = vote_for;
raft_write_request(&req);
}
+
+void
+raft_free_msg(struct cmsg *msg)
+{
+ free((void *)msg->route);
+ free(msg);
+}
+
+void
+raft_broadcast(const struct raft_request *req)
+{
+ replicaset_foreach(replica) {
+ if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
+ relay_get_state(replica->relay) == RELAY_FOLLOW) {
+ // TODO: think of a proper allocator.
+ struct raft_broadcast_msg *raft_msg =
+ calloc(1, sizeof(*raft_msg));
+ raft_msg->req = *req;
+ struct cmsg_hop *route = calloc(2, sizeof(*route));
+ relay_push_raft_msg(replica->relay, &raft_msg->base,
+ route);
+ }
+ }
+}
diff --git a/src/box/raft.h b/src/box/raft.h
index 1f392033d..9cb39dd24 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -30,32 +30,67 @@
* SUCH DAMAGE.
*/
#include <stdint.h>
-
-struct raft_request;
+#include "cbus.h"
#if defined(__cplusplus)
extern "C" {
#endif
+enum raft_state {
+ RAFT_STATE_NONE = 0,
+ RAFT_STATE_FOLLOWER = 1,
+ RAFT_STATE_CANDIDATE = 2,
+ RAFT_STATE_LEADER = 3
+};
+
struct raft {
uint64_t term;
uint32_t vote;
+ uint32_t curr_leader;
+ enum raft_state state;
};
extern struct raft raft;
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+ enum raft_state state;
+ struct vclock *vclock;
+};
+
+struct raft_broadcast_msg {
+ struct cmsg base;
+ struct raft_request req;
+};
+
void
raft_new_term(uint64_t min_new_term);
void
raft_vote(uint32_t vote_for);
+/** Process a raft entry stored in WAL/snapshot. */
void
raft_process(const struct raft_request *req);
+/** Process a raft status message coming from the network. */
+void
+raft_process_msg(const struct raft_request *req);
+
void
raft_serialize(struct raft_request *req);
+void
+raft_free_msg(struct cmsg *msg);
+
+/**
+ * Broadcast the changes in this instance's raft status to all
+ * the followers.
+ */
+void
+raft_broadcast(const struct raft_request *req);
+
#if defined(__cplusplus)
}
#endif
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a7843a8c2..be252cad1 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -54,6 +54,7 @@
#include "xstream.h"
#include "wal.h"
#include "txn_limbo.h"
+#include "raft.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -773,13 +774,40 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
+static void
+relay_send_raft(struct relay *relay, struct raft_request *req)
+{
+ struct xrow_header packet;
+ xrow_encode_raft(&packet, &fiber()->gc, req);
+ relay_send(relay, &packet);
+}
+
+static void
+relay_send_raft_msg(struct cmsg *msg)
+{
+ struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
+ struct relay *relay = container_of(msg->route[0].pipe, struct relay,
+ tx_pipe);
+ relay_send_raft(relay, &raft_msg->req);
+}
+
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+ struct cmsg_hop *route)
+{
+ route[0].f = relay_send_raft_msg;
+ route[0].pipe = &relay->tx_pipe;
+ route[1].f = raft_free_msg;
+ route[1].pipe = NULL;
+ cmsg_init(msg, route);
+ cpipe_push(&relay->relay_pipe, msg);
+}
+
/** Send a single row to the client. */
static void
relay_send_row(struct xstream *stream, struct xrow_header *packet)
{
struct relay *relay = container_of(stream, struct relay, stream);
- assert(iproto_type_is_dml(packet->type) ||
- iproto_type_is_synchro_request(packet->type));
if (packet->group_id == GROUP_LOCAL) {
/*
* We do not relay replica-local rows to other
@@ -796,6 +824,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
packet->group_id = GROUP_DEFAULT;
packet->bodycnt = 0;
}
+ assert(iproto_type_is_dml(packet->type) ||
+ iproto_type_is_synchro_request(packet->type));
/* Check if the rows from the instance are filtered. */
if ((1 << packet->replica_id & relay->id_filter) != 0)
return;
diff --git a/src/box/relay.h b/src/box/relay.h
index 0632fa912..c2c30cd11 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -41,6 +41,8 @@ struct relay;
struct replica;
struct tt_uuid;
struct vclock;
+struct cmsg;
+struct cmsg_hop;
enum relay_state {
/**
@@ -93,6 +95,18 @@ relay_vclock(const struct relay *relay);
double
relay_last_row_time(const struct relay *relay);
+/**
+ * Initialize a raft status message with the route to relay and
+ * back and push the message to relay.
+ *
+ * @param relay relay.
+ * @param msg a preallocated status message.
+ * @param route a preallocated message route.
+ */
+void
+relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
+ struct cmsg_hop *route);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 1923bacfc..f60b12cfc 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -44,6 +44,7 @@
#include "scramble.h"
#include "iproto_constants.h"
#include "mpstream/mpstream.h"
+#include "raft.h"
static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
@@ -958,11 +959,26 @@ int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
+ assert(mp_sizeof_map(2) == mp_sizeof_map(4));
+ /*
+ * Term and vote are encoded every time for the sake of
+ * snapshot, while state and vclock are optional.
+ */
size_t size = mp_sizeof_map(2) +
mp_sizeof_uint(IPROTO_RAFT_TERM) +
mp_sizeof_uint(r->term) +
mp_sizeof_uint(IPROTO_RAFT_VOTE) +
mp_sizeof_uint(r->vote);
+
+ size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
+ mp_sizeof_uint(r->state));
+ if (r->vclock != NULL) {
+ size += mp_sizeof_uint(IPROTO_VCLOCK) +
+ mp_sizeof_vclock_ignore0(r->vclock);
+ }
+
+ int map_size = 2 + (r->state != 0) + (r->vclock != NULL);
+
char *buf = region_alloc(region, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
@@ -974,11 +990,20 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
row->body[0].iov_len = size;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
- buf = mp_encode_map(buf, 2);
+ buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
buf = mp_encode_uint(buf, r->vote);
+ if (r->state != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
+ buf = mp_encode_uint(buf, r->state);
+ }
+ if (r->vclock != NULL) {
+ buf = mp_encode_uint(buf, IPROTO_VCLOCK);
+ buf = mp_encode_vclock_ignore0(buf, r->vclock);
+ }
+
return 0;
}
@@ -989,7 +1014,7 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
assert(row->type == IPROTO_RAFT);
assert(row->bodycnt == 1);
assert(row->group_id == GROUP_LOCAL);
- memset(r, 0, sizeof(*r));
+ memset(r, 0, sizeof(*r) - sizeof(struct vclock *));
const char *pos = row->body[0].iov_base;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
@@ -1002,6 +1027,15 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
case IPROTO_RAFT_VOTE:
r->vote = mp_decode_uint(&pos);
break;
+ case IPROTO_RAFT_STATE:
+ r->state = mp_decode_uint(&pos);
+ break;
+ case IPROTO_VCLOCK:
+ if (r->vclock != NULL)
+ mp_decode_vclock_ignore0(&pos, r->vclock);
+ else
+ mp_next(&pos);
+ break;
default:
mp_next(&pos);
break;
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c234f6f88..3f37dc18f 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,10 +264,7 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
-struct raft_request {
- uint64_t term;
- uint32_t vote;
-};
+struct raft_request;
int
xrow_encode_raft(struct xrow_header *row, struct region *region,
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers Serge Petrenko
@ 2020-08-27 20:36 ` Vladislav Shpilevoy
2020-08-28 10:10 ` Sergey Petrenko
0 siblings, 1 reply; 13+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-27 20:36 UTC (permalink / raw)
To: Serge Petrenko, gorcunov, sergos; +Cc: tarantool-patches
Hi! Thanks for the patch!
> diff --git a/src/box/relay.cc b/src/box/relay.cc
> index a7843a8c2..be252cad1 100644
> --- a/src/box/relay.cc
> +++ b/src/box/relay.cc
> @@ -773,13 +774,40 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
> relay_send(relay, row);
> }
>
> +static void
> +relay_send_raft(struct relay *relay, struct raft_request *req)
> +{
> + struct xrow_header packet;
> + xrow_encode_raft(&packet, &fiber()->gc, req);
> + relay_send(relay, &packet);
> +}
> +
> +static void
> +relay_send_raft_msg(struct cmsg *msg)
> +{
> + struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
> + struct relay *relay = container_of(msg->route[0].pipe, struct relay,
> + tx_pipe);
> + relay_send_raft(relay, &raft_msg->req);
> +}
> +
> +void
> +relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
> + struct cmsg_hop *route)
> +{
> + route[0].f = relay_send_raft_msg;
> + route[0].pipe = &relay->tx_pipe;
> + route[1].f = raft_free_msg;
> + route[1].pipe = NULL;
> + cmsg_init(msg, route);
> + cpipe_push(&relay->relay_pipe, msg);
> +}
> +
> /** Send a single row to the client. */
> static void
> relay_send_row(struct xstream *stream, struct xrow_header *packet)
> {
> struct relay *relay = container_of(stream, struct relay, stream);
> - assert(iproto_type_is_dml(packet->type) ||
> - iproto_type_is_synchro_request(packet->type));
> if (packet->group_id == GROUP_LOCAL) {
> /*
> * We do not relay replica-local rows to other
> @@ -796,6 +824,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
> packet->group_id = GROUP_DEFAULT;
> packet->bodycnt = 0;
> }
> + assert(iproto_type_is_dml(packet->type) ||
> + iproto_type_is_synchro_request(packet->type));
Why did you move this check, if Raft uses relay_send() anyway? Not
relay_send_row().
> /* Check if the rows from the instance are filtered. */
> if ((1 << packet->replica_id & relay->id_filter) != 0)
> return;
^ permalink raw reply [flat|nested] 13+ messages in thread
* Re: [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers
2020-08-27 20:36 ` Vladislav Shpilevoy
@ 2020-08-28 10:10 ` Sergey Petrenko
0 siblings, 0 replies; 13+ messages in thread
From: Sergey Petrenko @ 2020-08-28 10:10 UTC (permalink / raw)
To: Vladislav Shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
27.08.2020 23:36, Vladislav Shpilevoy пишет:
> Hi! Thanks for the patch!
Hi! Thanks for the review!
>> diff --git a/src/box/relay.cc b/src/box/relay.cc
>> index a7843a8c2..be252cad1 100644
>> --- a/src/box/relay.cc
>> +++ b/src/box/relay.cc
>> @@ -773,13 +774,40 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
>> relay_send(relay, row);
>> }
>>
>> +static void
>> +relay_send_raft(struct relay *relay, struct raft_request *req)
>> +{
>> + struct xrow_header packet;
>> + xrow_encode_raft(&packet, &fiber()->gc, req);
>> + relay_send(relay, &packet);
>> +}
>> +
>> +static void
>> +relay_send_raft_msg(struct cmsg *msg)
>> +{
>> + struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
>> + struct relay *relay = container_of(msg->route[0].pipe, struct relay,
>> + tx_pipe);
>> + relay_send_raft(relay, &raft_msg->req);
>> +}
>> +
>> +void
>> +relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
>> + struct cmsg_hop *route)
>> +{
>> + route[0].f = relay_send_raft_msg;
>> + route[0].pipe = &relay->tx_pipe;
>> + route[1].f = raft_free_msg;
>> + route[1].pipe = NULL;
>> + cmsg_init(msg, route);
>> + cpipe_push(&relay->relay_pipe, msg);
>> +}
>> +
>> /** Send a single row to the client. */
>> static void
>> relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> {
>> struct relay *relay = container_of(stream, struct relay, stream);
>> - assert(iproto_type_is_dml(packet->type) ||
>> - iproto_type_is_synchro_request(packet->type));
>> if (packet->group_id == GROUP_LOCAL) {
>> /*
>> * We do not relay replica-local rows to other
>> @@ -796,6 +824,8 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
>> packet->group_id = GROUP_DEFAULT;
>> packet->bodycnt = 0;
>> }
>> + assert(iproto_type_is_dml(packet->type) ||
>> + iproto_type_is_synchro_request(packet->type));
> Why did you move this check, if Raft uses relay_send() anyway? Not
> relay_send_row().
When relay sends a WAL to the replica, local raft rows end up in
relay_send_row().
The rows are not sent, since they're local, but they fail this
assertion, so I
moved it below the locality check.
>
>> /* Check if the rows from the instance are filtered. */
>> if ((1 << packet->replica_id & relay->id_filter) != 0)
>> return;
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 03/10] [tosquash] raft: return raft_request to xrow
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 01/10] raft: introduce persistent raft state Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 02/10] raft: relay status updates to followers Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 04/10] [tosquash] raft: introduce IPROTO_RAFT_VCLOCK Serge Petrenko
` (6 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Xrow is the owner of all the IProto and xlog codecs. So the
raft reaquest's definition belongs here, just like any other
request.
---
src/box/raft.c | 1 -
src/box/raft.h | 9 +--------
src/box/xrow.c | 1 -
src/box/xrow.h | 7 ++++++-
4 files changed, 7 insertions(+), 11 deletions(-)
diff --git a/src/box/raft.c b/src/box/raft.c
index 839a7dfeb..227846596 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -32,7 +32,6 @@
#include "error.h"
#include "journal.h"
-#include "xrow.h"
#include "small/region.h"
#include "replication.h"
#include "relay.h"
diff --git a/src/box/raft.h b/src/box/raft.h
index 9cb39dd24..b11ae7b1d 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -29,7 +29,7 @@
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
-#include <stdint.h>
+#include "xrow.h"
#include "cbus.h"
#if defined(__cplusplus)
@@ -52,13 +52,6 @@ struct raft {
extern struct raft raft;
-struct raft_request {
- uint64_t term;
- uint32_t vote;
- enum raft_state state;
- struct vclock *vclock;
-};
-
struct raft_broadcast_msg {
struct cmsg base;
struct raft_request req;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f60b12cfc..ed3f77a15 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -44,7 +44,6 @@
#include "scramble.h"
#include "iproto_constants.h"
#include "mpstream/mpstream.h"
-#include "raft.h"
static_assert(IPROTO_DATA < 0x7f && IPROTO_METADATA < 0x7f &&
IPROTO_SQL_INFO < 0x7f, "encoded IPROTO_BODY keys must fit into "\
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3f37dc18f..5d571a821 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -264,7 +264,12 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
-struct raft_request;
+struct raft_request {
+ uint64_t term;
+ uint32_t vote;
+ uint32_t state;
+ struct vclock *vclock;
+};
int
xrow_encode_raft(struct xrow_header *row, struct region *region,
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 04/10] [tosquash] raft: introduce IPROTO_RAFT_VCLOCK
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (2 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 03/10] [tosquash] raft: return raft_request to xrow Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec Serge Petrenko
` (5 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
IPROTO_RAFT keys are stored in their own isolated dictionary, so
no need to reuse other keys. Better implement the raft's own
keyset with IPROTO_RAFT_* keys.
---
src/box/iproto_constants.h | 1 +
src/box/xrow.c | 6 +++---
2 files changed, 4 insertions(+), 3 deletions(-)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 4217ce2e0..3ec397d3c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -264,6 +264,7 @@ enum iproto_raft_keys {
IPROTO_RAFT_TERM = 0,
IPROTO_RAFT_VOTE = 1,
IPROTO_RAFT_STATE = 2,
+ IPROTO_RAFT_VCLOCK = 3,
};
/**
diff --git a/src/box/xrow.c b/src/box/xrow.c
index ed3f77a15..836de3575 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -972,7 +972,7 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
mp_sizeof_uint(r->state));
if (r->vclock != NULL) {
- size += mp_sizeof_uint(IPROTO_VCLOCK) +
+ size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
mp_sizeof_vclock_ignore0(r->vclock);
}
@@ -999,7 +999,7 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
buf = mp_encode_uint(buf, r->state);
}
if (r->vclock != NULL) {
- buf = mp_encode_uint(buf, IPROTO_VCLOCK);
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
buf = mp_encode_vclock_ignore0(buf, r->vclock);
}
@@ -1029,7 +1029,7 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
case IPROTO_RAFT_STATE:
r->state = mp_decode_uint(&pos);
break;
- case IPROTO_VCLOCK:
+ case IPROTO_RAFT_VCLOCK:
if (r->vclock != NULL)
mp_decode_vclock_ignore0(&pos, r->vclock);
else
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (3 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 04/10] [tosquash] raft: introduce IPROTO_RAFT_VCLOCK Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 06/10] [tosquash] raft: don't fill raft_request manually Serge Petrenko
` (4 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
There were a few issues:
- In decode() memset was used without raft_request tail assuming
that vclock storage pointer is always in the end. Better not
to assume such things. The patch makes the decode() caller to
pass vclock storage explicitly if necessary.
- Little improvement over the readibility of the encoder to make
more 'patterned'. Consisting of a sequence of similar code
blocks of kind:
if (is_field_set) {
++map_size;
size += mp_sizeof_uint(field_key) +
mp_sizeof_...(field_value);
}
...
if (is_field_set) {
pos = mp_encode_uint(pos, field_key);
pos = mp_encode_...(pos, field_value);
}
Instead of unique handling of certain fields. Also the vote is
now not encoded into each message, because no need in that.
- Added malformed packet handling. Note, that we don't consider
the invalid MessagePack case. But probably should do it
eventually. Other requests don't handle it, because they ar
checked either by iproto thread or by relay thread. Need to see
if raft message are also already validated by some mp_check()
in relay.
---
src/box/applier.cc | 4 +--
src/box/box.cc | 3 +-
src/box/memtx_engine.c | 3 +-
src/box/xrow.c | 76 +++++++++++++++++++++++++++---------------
src/box/xrow.h | 3 +-
5 files changed, 57 insertions(+), 32 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index f27436b79..8e6d1b2a4 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -886,9 +886,7 @@ apply_raft_row(struct xrow_header *row)
struct raft_request req;
struct vclock candidate_clock;
- req.vclock = &candidate_clock;
-
- if (xrow_decode_raft(row, &req) != 0)
+ if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
return -1;
raft_process_msg(&req);
diff --git a/src/box/box.cc b/src/box/box.cc
index 8323de531..b871f45e2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -377,7 +377,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
}
if (iproto_type_is_raft_request(row->type)) {
struct raft_request raft_req;
- if (xrow_decode_raft(row, &raft_req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &raft_req, NULL) != 0)
diag_raise();
raft_process(&raft_req);
return;
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 26274de80..a034baa6c 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT);
struct raft_request req;
- if (xrow_decode_raft(row, &req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &req, NULL) != 0)
return -1;
raft_process(&req);
return 0;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 836de3575..11fdacc0d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,25 +958,29 @@ int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
- assert(mp_sizeof_map(2) == mp_sizeof_map(4));
/*
- * Term and vote are encoded every time for the sake of
- * snapshot, while state and vclock are optional.
+ * Terms is encoded always. Sometimes the rest can be even ignored if
+ * the term is too old.
*/
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_RAFT_TERM) +
- mp_sizeof_uint(r->term) +
- mp_sizeof_uint(IPROTO_RAFT_VOTE) +
- mp_sizeof_uint(r->vote);
-
- size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
- mp_sizeof_uint(r->state));
+ int map_size = 1;
+ size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term);
+ if (r->vote != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ }
+ if (r->state != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+ mp_sizeof_uint(r->state);
+ }
if (r->vclock != NULL) {
+ ++map_size;
size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
- mp_sizeof_vclock_ignore0(r->vclock);
+ mp_sizeof_vclock_ignore0(r->vclock);
}
-
- int map_size = 2 + (r->state != 0) + (r->vclock != NULL);
+ size += mp_sizeof_map(map_size);
char *buf = region_alloc(region, size);
if (buf == NULL) {
@@ -992,8 +996,10 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
- buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
- buf = mp_encode_uint(buf, r->vote);
+ if (r->vote != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ }
if (r->state != 0) {
buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
buf = mp_encode_uint(buf, r->state);
@@ -1002,38 +1008,52 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
buf = mp_encode_vclock_ignore0(buf, r->vclock);
}
-
return 0;
}
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock)
{
- /* TODO: handle bad format. */
assert(row->type == IPROTO_RAFT);
- assert(row->bodycnt == 1);
- assert(row->group_id == GROUP_LOCAL);
- memset(r, 0, sizeof(*r) - sizeof(struct vclock *));
- const char *pos = row->body[0].iov_base;
+ if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "malformed raft request");
+ return -1;
+ }
+ memset(r, 0, sizeof(*r));
+ r->vclock = vclock;
+
+ const char *begin = row->body[0].iov_base;
+ const char *end = begin + row->body[0].iov_len;
+ const char *pos = begin;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->vote = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_STATE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->state = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VCLOCK:
- if (r->vclock != NULL)
- mp_decode_vclock_ignore0(&pos, r->vclock);
- else
+ if (r->vclock == NULL)
mp_next(&pos);
+ else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+ goto bad_msgpack;
break;
default:
mp_next(&pos);
@@ -1041,6 +1061,10 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
}
}
return 0;
+
+bad_msgpack:
+ xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+ return -1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 5d571a821..c627102dd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -276,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r);
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock);
/**
* CALL/EVAL request.
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 06/10] [tosquash] raft: don't fill raft_request manually
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (4 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 07/10] [tosquash] raft: rename curr_leader to leader Serge Petrenko
` (3 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
At 'subscribe' the raft complete state was sent to the peer. But
it was filled manually into struct raft_request. There is a
function raft_serialize() exactly to avoid such manual work.
The patch extends the serializer with vclock argument. Since
raft does not manage vclocks and needs them provided externally
(so far, perhaps that will change).
---
src/box/box.cc | 5 +----
src/box/memtx_engine.c | 6 +++++-
src/box/raft.c | 7 ++++++-
src/box/raft.h | 2 +-
4 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index b871f45e2..e7eb79e9f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2056,14 +2056,11 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
*/
if (raft.state != RAFT_STATE_NONE) {
struct raft_request req;
- req.term = raft.term;
- req.vote = raft.vote;
- req.state = raft.state;
/*
* Omit the candidate vclock, since we've just
* sent it in subscribe response.
*/
- req.vclock = NULL;
+ raft_serialize(&req, NULL);
xrow_encode_raft(&row, &fiber()->gc, &req);
coio_write_xrow(io, &row);
}
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index a034baa6c..7de12a569 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -517,7 +517,11 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
- raft_serialize(&ckpt->raft);
+ /*
+ * Don't encode vclock, because it is stored in the snapshot header
+ * anyway.
+ */
+ raft_serialize(&ckpt->raft, NULL);
ckpt->touch = false;
return ckpt;
}
diff --git a/src/box/raft.c b/src/box/raft.c
index 227846596..1d25459e9 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -80,11 +80,16 @@ raft_process_msg(const struct raft_request *req)
}
void
-raft_serialize(struct raft_request *req)
+raft_serialize(struct raft_request *req, struct vclock *vclock)
{
memset(req, 0, sizeof(*req));
req->term = raft.term;
req->vote = raft.vote;
+ req->state = raft.state;
+ /*
+ * Raft does not own vclock, so it always expects it passed externally.
+ */
+ req->vclock = vclock;
}
static void
diff --git a/src/box/raft.h b/src/box/raft.h
index b11ae7b1d..c95a51873 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -72,7 +72,7 @@ void
raft_process_msg(const struct raft_request *req);
void
-raft_serialize(struct raft_request *req);
+raft_serialize(struct raft_request *req, struct vclock *vclock);
void
raft_free_msg(struct cmsg *msg);
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 07/10] [tosquash] raft: rename curr_leader to leader
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (5 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 06/10] [tosquash] raft: don't fill raft_request manually Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 08/10] [tosquash] raft: rename raft_process to raft_process_recovery Serge Petrenko
` (2 subsequent siblings)
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Because it will be consistent with 'vote', which is not
'curr_vote'.
---
src/box/raft.c | 2 +-
src/box/raft.h | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/src/box/raft.c b/src/box/raft.c
index 1d25459e9..714a1518a 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -40,7 +40,7 @@
struct raft raft = {
.term = 0,
.vote = 0,
- .curr_leader = 0,
+ .leader = 0,
.state = RAFT_STATE_NONE,
};
diff --git a/src/box/raft.h b/src/box/raft.h
index c95a51873..927aa8f5f 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -46,7 +46,7 @@ enum raft_state {
struct raft {
uint64_t term;
uint32_t vote;
- uint32_t curr_leader;
+ uint32_t leader;
enum raft_state state;
};
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 08/10] [tosquash] raft: rename raft_process to raft_process_recovery
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (6 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 07/10] [tosquash] raft: rename curr_leader to leader Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 09/10] [tosquash] applier: handler error at raft row appliance Serge Petrenko
2020-08-26 7:53 ` [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay Serge Petrenko
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
There is another 'process' for remote messages from other raft
nodes. In order to make these functions more clearly separated,
the old function used for local state recovery is renamed to
raft_process_recovery.
This should be squashed into the first commit about persistent
raft state.
---
src/box/box.cc | 2 +-
src/box/memtx_engine.c | 2 +-
src/box/raft.c | 14 +++++++++++++-
src/box/raft.h | 2 +-
4 files changed, 16 insertions(+), 4 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e7eb79e9f..d01de2519 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -380,7 +380,7 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
/* Vclock is never persisted in WAL by Raft. */
if (xrow_decode_raft(row, &raft_req, NULL) != 0)
diag_raise();
- raft_process(&raft_req);
+ raft_process_recovery(&raft_req);
return;
}
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 7de12a569..b0b744db8 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -209,7 +209,7 @@ memtx_engine_recover_raft(const struct xrow_header *row)
/* Vclock is never persisted in WAL by Raft. */
if (xrow_decode_raft(row, &req, NULL) != 0)
return -1;
- raft_process(&req);
+ raft_process_recovery(&req);
return 0;
}
diff --git a/src/box/raft.c b/src/box/raft.c
index 714a1518a..34c1cf7aa 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -45,12 +45,24 @@ struct raft raft = {
};
void
-raft_process(const struct raft_request *req)
+raft_process_recovery(const struct raft_request *req)
{
if (req->term != 0)
raft.term = req->term;
if (req->vote != 0)
raft.vote = req->vote;
+ /*
+ * Role is never persisted. If recovery is happening, the
+ * node was restarted, and the former role can be false
+ * anyway.
+ */
+ assert(req->state == RAFT_STATE_NONE);
+ /*
+ * Vclock is always persisted by some other subsystem - WAL, snapshot.
+ * It is used only to decide to whom to give the vote during election,
+ * as a part of the volatile state.
+ */
+ assert(req->vclock == NULL);
}
void
diff --git a/src/box/raft.h b/src/box/raft.h
index 927aa8f5f..be071c215 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -65,7 +65,7 @@ raft_vote(uint32_t vote_for);
/** Process a raft entry stored in WAL/snapshot. */
void
-raft_process(const struct raft_request *req);
+raft_process_recovery(const struct raft_request *req);
/** Process a raft status message coming from the network. */
void
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 09/10] [tosquash] applier: handler error at raft row appliance
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (7 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 08/10] [tosquash] raft: rename raft_process to raft_process_recovery Serge Petrenko
@ 2020-08-26 7:52 ` Serge Petrenko
2020-08-26 7:53 ` [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay Serge Petrenko
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:52 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/applier.cc | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 8e6d1b2a4..b17ac5363 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1241,10 +1241,13 @@ applier_subscribe(struct applier *applier)
&stailq_first_entry(&rows, struct applier_tx_row,
next)->row;
if (first_row->lsn == 0) {
- if (unlikely(iproto_type_is_raft_request(first_row->type)))
- apply_raft_row(first_row);
- else
+ if (unlikely(iproto_type_is_raft_request(
+ first_row->type))) {
+ if (apply_raft_row(first_row) != 0)
+ diag_raise();
+ } else {
applier_signal_ack(applier);
+ }
} else if (applier_apply_tx(&rows) != 0) {
diag_raise();
}
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread
* [Tarantool-patches] [RAFT 10/10] [tosquash] relay: move raft broadcast details into relay
2020-08-26 7:52 [Tarantool-patches] [RAFT 00/10] raft implementation Serge Petrenko
` (8 preceding siblings ...)
2020-08-26 7:52 ` [Tarantool-patches] [RAFT 09/10] [tosquash] applier: handler error at raft row appliance Serge Petrenko
@ 2020-08-26 7:53 ` Serge Petrenko
9 siblings, 0 replies; 13+ messages in thread
From: Serge Petrenko @ 2020-08-26 7:53 UTC (permalink / raw)
To: v.shpilevoy, gorcunov, sergos; +Cc: tarantool-patches
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Raft did some allocations, cbus and cmsg initializations, in
order to broadcast its state update. Also it didn't copy vclock
value so it could point at invalid memory.
The patch moves all the details about pushing a raft update into
relay.cc file, and fixes the vclock copying.
---
src/box/raft.c | 15 +----------
src/box/raft.h | 8 ------
src/box/relay.cc | 68 ++++++++++++++++++++++++++++++++++--------------
src/box/relay.h | 11 +++-----
4 files changed, 52 insertions(+), 50 deletions(-)
diff --git a/src/box/raft.c b/src/box/raft.c
index 34c1cf7aa..e40d778af 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -169,26 +169,13 @@ raft_vote(uint32_t vote_for)
raft_write_request(&req);
}
-void
-raft_free_msg(struct cmsg *msg)
-{
- free((void *)msg->route);
- free(msg);
-}
-
void
raft_broadcast(const struct raft_request *req)
{
replicaset_foreach(replica) {
if (replica->relay != NULL && replica->id != REPLICA_ID_NIL &&
relay_get_state(replica->relay) == RELAY_FOLLOW) {
- // TODO: think of a proper allocator.
- struct raft_broadcast_msg *raft_msg =
- calloc(1, sizeof(*raft_msg));
- raft_msg->req = *req;
- struct cmsg_hop *route = calloc(2, sizeof(*route));
- relay_push_raft_msg(replica->relay, &raft_msg->base,
- route);
+ relay_push_raft(replica->relay, req);
}
}
}
diff --git a/src/box/raft.h b/src/box/raft.h
index be071c215..e14173057 100644
--- a/src/box/raft.h
+++ b/src/box/raft.h
@@ -52,11 +52,6 @@ struct raft {
extern struct raft raft;
-struct raft_broadcast_msg {
- struct cmsg base;
- struct raft_request req;
-};
-
void
raft_new_term(uint64_t min_new_term);
@@ -74,9 +69,6 @@ raft_process_msg(const struct raft_request *req);
void
raft_serialize(struct raft_request *req, struct vclock *vclock);
-void
-raft_free_msg(struct cmsg *msg);
-
/**
* Broadcast the changes in this instance's raft status to all
* the followers.
diff --git a/src/box/relay.cc b/src/box/relay.cc
index be252cad1..53a90f826 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -774,33 +774,61 @@ relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row)
relay_send(relay, row);
}
-static void
-relay_send_raft(struct relay *relay, struct raft_request *req)
-{
- struct xrow_header packet;
- xrow_encode_raft(&packet, &fiber()->gc, req);
- relay_send(relay, &packet);
-}
+struct relay_raft_msg {
+ struct cmsg base;
+ struct cmsg_hop route;
+ struct raft_request req;
+ struct vclock vclock;
+ struct relay *relay;
+};
static void
-relay_send_raft_msg(struct cmsg *msg)
+relay_raft_msg_send(struct cmsg *base)
{
- struct raft_broadcast_msg *raft_msg = (struct raft_broadcast_msg *)msg;
- struct relay *relay = container_of(msg->route[0].pipe, struct relay,
- tx_pipe);
- relay_send_raft(relay, &raft_msg->req);
+ struct relay_raft_msg *msg = (struct relay_raft_msg *)base;
+ struct xrow_header row;
+ xrow_encode_raft(&row, &fiber()->gc, &msg->req);
+ try {
+ relay_send(msg->relay, &row);
+ } catch (Exception *e) {
+ relay_set_error(msg->relay, e);
+ fiber_cancel(fiber());
+ }
+ free(msg);
}
void
-relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
- struct cmsg_hop *route)
+relay_push_raft(struct relay *relay, const struct raft_request *req)
{
- route[0].f = relay_send_raft_msg;
- route[0].pipe = &relay->tx_pipe;
- route[1].f = raft_free_msg;
- route[1].pipe = NULL;
- cmsg_init(msg, route);
- cpipe_push(&relay->relay_pipe, msg);
+ /*
+ * XXX: the message should be preallocated. It should
+ * work like Kharon in IProto. Relay should have 2 raft
+ * messages rotating. When one is sent, the other can be
+ * updated and a flag is set. When the first message is
+ * sent, the control returns to TX thread, sees the set
+ * flag, rotates the buffers, and sends it again. And so
+ * on. This is how it can work in future, with 0 heap
+ * allocations. Current solution with alloc-per-update is
+ * good enough as a start. Another option - wait until all
+ * is moved to WAL thread, where this will all happen
+ * in one thread and will be much simpler.
+ */
+ struct relay_raft_msg *msg =
+ (struct relay_raft_msg *)malloc(sizeof(*msg));
+ if (msg == NULL) {
+ panic("Couldn't allocate raft message");
+ return;
+ }
+ msg->req = *req;
+ if (req->vclock != NULL) {
+ msg->req.vclock = &msg->vclock;
+ vclock_copy(&msg->vclock, req->vclock);
+ }
+ msg->route.f = relay_raft_msg_send;
+ msg->route.pipe = NULL;
+ cmsg_init(&msg->base, &msg->route);
+ msg->relay = relay;
+ cpipe_push(&relay->relay_pipe, &msg->base);
}
/** Send a single row to the client. */
diff --git a/src/box/relay.h b/src/box/relay.h
index c2c30cd11..4d291698d 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -96,16 +96,11 @@ double
relay_last_row_time(const struct relay *relay);
/**
- * Initialize a raft status message with the route to relay and
- * back and push the message to relay.
- *
- * @param relay relay.
- * @param msg a preallocated status message.
- * @param route a preallocated message route.
+ * Send a Raft update request to the relay channel. It is not
+ * guaranteed that it will be delivered. The connection may break.
*/
void
-relay_push_raft_msg(struct relay *relay, struct cmsg *msg,
- struct cmsg_hop *route);
+relay_push_raft(struct relay *relay, const struct raft_request *req);
#if defined(__cplusplus)
} /* extern "C" */
--
2.20.1 (Apple Git-117)
^ permalink raw reply [flat|nested] 13+ messages in thread