[Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec
Serge Petrenko
sergepetrenko at tarantool.org
Wed Aug 26 10:52:37 MSK 2020
From: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
There were a few issues:
- In decode() memset was used without raft_request tail assuming
that vclock storage pointer is always in the end. Better not
to assume such things. The patch makes the decode() caller to
pass vclock storage explicitly if necessary.
- Little improvement over the readibility of the encoder to make
more 'patterned'. Consisting of a sequence of similar code
blocks of kind:
if (is_field_set) {
++map_size;
size += mp_sizeof_uint(field_key) +
mp_sizeof_...(field_value);
}
...
if (is_field_set) {
pos = mp_encode_uint(pos, field_key);
pos = mp_encode_...(pos, field_value);
}
Instead of unique handling of certain fields. Also the vote is
now not encoded into each message, because no need in that.
- Added malformed packet handling. Note, that we don't consider
the invalid MessagePack case. But probably should do it
eventually. Other requests don't handle it, because they ar
checked either by iproto thread or by relay thread. Need to see
if raft message are also already validated by some mp_check()
in relay.
---
src/box/applier.cc | 4 +--
src/box/box.cc | 3 +-
src/box/memtx_engine.c | 3 +-
src/box/xrow.c | 76 +++++++++++++++++++++++++++---------------
src/box/xrow.h | 3 +-
5 files changed, 57 insertions(+), 32 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index f27436b79..8e6d1b2a4 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -886,9 +886,7 @@ apply_raft_row(struct xrow_header *row)
struct raft_request req;
struct vclock candidate_clock;
- req.vclock = &candidate_clock;
-
- if (xrow_decode_raft(row, &req) != 0)
+ if (xrow_decode_raft(row, &req, &candidate_clock) != 0)
return -1;
raft_process_msg(&req);
diff --git a/src/box/box.cc b/src/box/box.cc
index 8323de531..b871f45e2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -377,7 +377,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
}
if (iproto_type_is_raft_request(row->type)) {
struct raft_request raft_req;
- if (xrow_decode_raft(row, &raft_req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &raft_req, NULL) != 0)
diag_raise();
raft_process(&raft_req);
return;
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 26274de80..a034baa6c 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT);
struct raft_request req;
- if (xrow_decode_raft(row, &req) != 0)
+ /* Vclock is never persisted in WAL by Raft. */
+ if (xrow_decode_raft(row, &req, NULL) != 0)
return -1;
raft_process(&req);
return 0;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 836de3575..11fdacc0d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -958,25 +958,29 @@ int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
- assert(mp_sizeof_map(2) == mp_sizeof_map(4));
/*
- * Term and vote are encoded every time for the sake of
- * snapshot, while state and vclock are optional.
+ * Terms is encoded always. Sometimes the rest can be even ignored if
+ * the term is too old.
*/
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_RAFT_TERM) +
- mp_sizeof_uint(r->term) +
- mp_sizeof_uint(IPROTO_RAFT_VOTE) +
- mp_sizeof_uint(r->vote);
-
- size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) +
- mp_sizeof_uint(r->state));
+ int map_size = 1;
+ size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) +
+ mp_sizeof_uint(r->term);
+ if (r->vote != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_VOTE) +
+ mp_sizeof_uint(r->vote);
+ }
+ if (r->state != 0) {
+ ++map_size;
+ size += mp_sizeof_uint(IPROTO_RAFT_STATE) +
+ mp_sizeof_uint(r->state);
+ }
if (r->vclock != NULL) {
+ ++map_size;
size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) +
- mp_sizeof_vclock_ignore0(r->vclock);
+ mp_sizeof_vclock_ignore0(r->vclock);
}
-
- int map_size = 2 + (r->state != 0) + (r->vclock != NULL);
+ size += mp_sizeof_map(map_size);
char *buf = region_alloc(region, size);
if (buf == NULL) {
@@ -992,8 +996,10 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
buf = mp_encode_map(buf, map_size);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
- buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
- buf = mp_encode_uint(buf, r->vote);
+ if (r->vote != 0) {
+ buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
+ buf = mp_encode_uint(buf, r->vote);
+ }
if (r->state != 0) {
buf = mp_encode_uint(buf, IPROTO_RAFT_STATE);
buf = mp_encode_uint(buf, r->state);
@@ -1002,38 +1008,52 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK);
buf = mp_encode_vclock_ignore0(buf, r->vclock);
}
-
return 0;
}
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock)
{
- /* TODO: handle bad format. */
assert(row->type == IPROTO_RAFT);
- assert(row->bodycnt == 1);
- assert(row->group_id == GROUP_LOCAL);
- memset(r, 0, sizeof(*r) - sizeof(struct vclock *));
- const char *pos = row->body[0].iov_base;
+ if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) {
+ diag_set(ClientError, ER_INVALID_MSGPACK,
+ "malformed raft request");
+ return -1;
+ }
+ memset(r, 0, sizeof(*r));
+ r->vclock = vclock;
+
+ const char *begin = row->body[0].iov_base;
+ const char *end = begin + row->body[0].iov_len;
+ const char *pos = begin;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->vote = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_STATE:
+ if (mp_typeof(*pos) != MP_UINT)
+ goto bad_msgpack;
r->state = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VCLOCK:
- if (r->vclock != NULL)
- mp_decode_vclock_ignore0(&pos, r->vclock);
- else
+ if (r->vclock == NULL)
mp_next(&pos);
+ else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0)
+ goto bad_msgpack;
break;
default:
mp_next(&pos);
@@ -1041,6 +1061,10 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
}
}
return 0;
+
+bad_msgpack:
+ xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body");
+ return -1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 5d571a821..c627102dd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -276,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r);
int
-xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);
+xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
+ struct vclock *vclock);
/**
* CALL/EVAL request.
--
2.20.1 (Apple Git-117)
More information about the Tarantool-patches
mailing list