From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 5D01F430410 for ; Wed, 26 Aug 2020 10:53:02 +0300 (MSK) From: Serge Petrenko Date: Wed, 26 Aug 2020 10:52:37 +0300 Message-Id: In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [RAFT 05/10] [tosquash] xrow: refactor raft request codec List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: v.shpilevoy@tarantool.org, gorcunov@gmail.com, sergos@tarantool.org Cc: tarantool-patches@dev.tarantool.org From: Vladislav Shpilevoy There were a few issues: - In decode() memset was used without raft_request tail assuming that vclock storage pointer is always in the end. Better not to assume such things. The patch makes the decode() caller to pass vclock storage explicitly if necessary. - Little improvement over the readibility of the encoder to make more 'patterned'. Consisting of a sequence of similar code blocks of kind: if (is_field_set) { ++map_size; size += mp_sizeof_uint(field_key) + mp_sizeof_...(field_value); } ... if (is_field_set) { pos = mp_encode_uint(pos, field_key); pos = mp_encode_...(pos, field_value); } Instead of unique handling of certain fields. Also the vote is now not encoded into each message, because no need in that. - Added malformed packet handling. Note, that we don't consider the invalid MessagePack case. But probably should do it eventually. Other requests don't handle it, because they ar checked either by iproto thread or by relay thread. Need to see if raft message are also already validated by some mp_check() in relay. --- src/box/applier.cc | 4 +-- src/box/box.cc | 3 +- src/box/memtx_engine.c | 3 +- src/box/xrow.c | 76 +++++++++++++++++++++++++++--------------- src/box/xrow.h | 3 +- 5 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index f27436b79..8e6d1b2a4 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -886,9 +886,7 @@ apply_raft_row(struct xrow_header *row) struct raft_request req; struct vclock candidate_clock; - req.vclock = &candidate_clock; - - if (xrow_decode_raft(row, &req) != 0) + if (xrow_decode_raft(row, &req, &candidate_clock) != 0) return -1; raft_process_msg(&req); diff --git a/src/box/box.cc b/src/box/box.cc index 8323de531..b871f45e2 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -377,7 +377,8 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) } if (iproto_type_is_raft_request(row->type)) { struct raft_request raft_req; - if (xrow_decode_raft(row, &raft_req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &raft_req, NULL) != 0) diag_raise(); raft_process(&raft_req); return; diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 26274de80..a034baa6c 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -206,7 +206,8 @@ memtx_engine_recover_raft(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT); struct raft_request req; - if (xrow_decode_raft(row, &req) != 0) + /* Vclock is never persisted in WAL by Raft. */ + if (xrow_decode_raft(row, &req, NULL) != 0) return -1; raft_process(&req); return 0; diff --git a/src/box/xrow.c b/src/box/xrow.c index 836de3575..11fdacc0d 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -958,25 +958,29 @@ int xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r) { - assert(mp_sizeof_map(2) == mp_sizeof_map(4)); /* - * Term and vote are encoded every time for the sake of - * snapshot, while state and vclock are optional. + * Terms is encoded always. Sometimes the rest can be even ignored if + * the term is too old. */ - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_RAFT_TERM) + - mp_sizeof_uint(r->term) + - mp_sizeof_uint(IPROTO_RAFT_VOTE) + - mp_sizeof_uint(r->vote); - - size += (r->state != 0) * (mp_sizeof_uint(IPROTO_RAFT_STATE) + - mp_sizeof_uint(r->state)); + int map_size = 1; + size_t size = mp_sizeof_uint(IPROTO_RAFT_TERM) + + mp_sizeof_uint(r->term); + if (r->vote != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_VOTE) + + mp_sizeof_uint(r->vote); + } + if (r->state != 0) { + ++map_size; + size += mp_sizeof_uint(IPROTO_RAFT_STATE) + + mp_sizeof_uint(r->state); + } if (r->vclock != NULL) { + ++map_size; size += mp_sizeof_uint(IPROTO_RAFT_VCLOCK) + - mp_sizeof_vclock_ignore0(r->vclock); + mp_sizeof_vclock_ignore0(r->vclock); } - - int map_size = 2 + (r->state != 0) + (r->vclock != NULL); + size += mp_sizeof_map(map_size); char *buf = region_alloc(region, size); if (buf == NULL) { @@ -992,8 +996,10 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, buf = mp_encode_map(buf, map_size); buf = mp_encode_uint(buf, IPROTO_RAFT_TERM); buf = mp_encode_uint(buf, r->term); - buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); - buf = mp_encode_uint(buf, r->vote); + if (r->vote != 0) { + buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE); + buf = mp_encode_uint(buf, r->vote); + } if (r->state != 0) { buf = mp_encode_uint(buf, IPROTO_RAFT_STATE); buf = mp_encode_uint(buf, r->state); @@ -1002,38 +1008,52 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, buf = mp_encode_uint(buf, IPROTO_RAFT_VCLOCK); buf = mp_encode_vclock_ignore0(buf, r->vclock); } - return 0; } int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock) { - /* TODO: handle bad format. */ assert(row->type == IPROTO_RAFT); - assert(row->bodycnt == 1); - assert(row->group_id == GROUP_LOCAL); - memset(r, 0, sizeof(*r) - sizeof(struct vclock *)); - const char *pos = row->body[0].iov_base; + if (row->bodycnt != 1 || row->group_id != GROUP_LOCAL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "malformed raft request"); + return -1; + } + memset(r, 0, sizeof(*r)); + r->vclock = vclock; + + const char *begin = row->body[0].iov_base; + const char *end = begin + row->body[0].iov_len; + const char *pos = begin; uint32_t map_size = mp_decode_map(&pos); for (uint32_t i = 0; i < map_size; ++i) { + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; uint64_t key = mp_decode_uint(&pos); switch (key) { case IPROTO_RAFT_TERM: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->term = mp_decode_uint(&pos); break; case IPROTO_RAFT_VOTE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->vote = mp_decode_uint(&pos); break; case IPROTO_RAFT_STATE: + if (mp_typeof(*pos) != MP_UINT) + goto bad_msgpack; r->state = mp_decode_uint(&pos); break; case IPROTO_RAFT_VCLOCK: - if (r->vclock != NULL) - mp_decode_vclock_ignore0(&pos, r->vclock); - else + if (r->vclock == NULL) mp_next(&pos); + else if (mp_decode_vclock_ignore0(&pos, r->vclock) != 0) + goto bad_msgpack; break; default: mp_next(&pos); @@ -1041,6 +1061,10 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r) } } return 0; + +bad_msgpack: + xrow_on_decode_err(begin, end, ER_INVALID_MSGPACK, "raft body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index 5d571a821..c627102dd 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -276,7 +276,8 @@ xrow_encode_raft(struct xrow_header *row, struct region *region, const struct raft_request *r); int -xrow_decode_raft(const struct xrow_header *row, struct raft_request *r); +xrow_decode_raft(const struct xrow_header *row, struct raft_request *r, + struct vclock *vclock); /** * CALL/EVAL request. -- 2.20.1 (Apple Git-117)