[Tarantool-patches] [PATCH v4 03/12] xrow: introduce a PROMOTE entry
Serge Petrenko
sergepetrenko at tarantool.org
Fri Apr 16 19:25:34 MSK 2021
A PROMOTE entry combines effect of CONFIRM, ROLLBACK and RAFT_TERM
entries with some additional semantics on top.
PROMOTE carries the following arguments:
1) former_leader_id - the id of previous limbo owner whose entries we
want to confirm.
2) confirm_lsn - the lsn of the last former leader's transaction to be
confirmed. In this sense PROMOTE(confirm_lsn) replaces
CONFIRM(confirm_lsn) + ROLLBACK(confirm_lsn + 1).
3) replica_id - id of the instance issuing
`box.ctl.clear_synchro_queue()`
4) term - the new term the instance issuing
`box.ctl.clear_synchro_queue()` has just entered.
This entry will be written to WAL instead of the usual CONFIRM +
ROLLBACK pair on a successful `box.ctl.clear_synchro_queue()` call.
Note, the ususal CONFIRM and ROLLBACK occurrences (after a confirmed or
rolled back synchronous transaction) are here to stay.
Part of #5445
---
src/box/iproto_constants.h | 26 ++++++++++++++++++++--
src/box/txn_limbo.c | 4 ++--
src/box/xrow.c | 45 ++++++++++++++++++++++++--------------
src/box/xrow.h | 31 ++++++++++++++------------
4 files changed, 71 insertions(+), 35 deletions(-)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index e9d1ef5d6..99c8ca184 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -132,6 +132,18 @@ enum iproto_key {
IPROTO_REPLICA_ANON = 0x50,
IPROTO_ID_FILTER = 0x51,
IPROTO_ERROR = 0x52,
+ /**
+ * Term. Has the same meaning as IPROTO_RAFT_TERM, but is an iproto
+ * key, rather than a raft key. Used for PROMOTE request, which needs
+ * both iproto (e.g. REPLICA_ID) and raft (RAFT_TERM) keys.
+ */
+ IPROTO_TERM = 0x53,
+ /*
+ * Be careful to not extend iproto_key values over 0x7f.
+ * iproto_keys are encoded in msgpack as positive fixnum, which ends at
+ * 0x7f, and we rely on this in some places by allocating a uint8_t to
+ * hold a msgpack-encoded key value.
+ */
IPROTO_KEY_MAX
};
@@ -226,6 +238,8 @@ enum iproto_type {
IPROTO_TYPE_STAT_MAX,
IPROTO_RAFT = 30,
+ /** PROMOTE request. */
+ IPROTO_PROMOTE = 31,
/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
@@ -340,11 +354,19 @@ dml_request_key_map(uint16_t type)
return iproto_body_key_map[type];
}
-/** CONFIRM/ROLLBACK entries for synchronous replication. */
+/** Synchronous replication entries: CONFIRM/ROLLBACK/PROMOTE. */
static inline bool
iproto_type_is_synchro_request(uint16_t type)
{
- return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
+ return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK ||
+ type == IPROTO_PROMOTE;
+}
+
+/** PROMOTE entry (synchronous replication and leader elections). */
+static inline bool
+iproto_type_is_promote_request(uint32_t type)
+{
+ return type == IPROTO_PROMOTE;
}
static inline bool
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index addcb0f97..c96e497c6 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -331,7 +331,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn)
* This is a synchronous commit so we can
* allocate everything on a stack.
*/
- struct synchro_body_bin body;
+ char body[XROW_SYNCHRO_BODY_LEN_MAX];
struct xrow_header row;
char buf[sizeof(struct journal_entry) +
sizeof(struct xrow_header *)];
@@ -339,7 +339,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn)
struct journal_entry *entry = (struct journal_entry *)buf;
entry->rows[0] = &row;
- xrow_encode_synchro(&row, &body, &req);
+ xrow_encode_synchro(&row, body, &req);
journal_entry_create(entry, 1, xrow_approx_len(&row),
txn_limbo_write_cb, fiber());
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 35e1d1c20..2e364cea5 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -885,28 +885,33 @@ xrow_encode_dml(const struct request *request, struct region *region,
}
void
-xrow_encode_synchro(struct xrow_header *row,
- struct synchro_body_bin *body,
+xrow_encode_synchro(struct xrow_header *row, char *body,
const struct synchro_request *req)
{
- /*
- * A map with two elements. We don't compress
- * numbers to have this structure constant in size,
- * which allows us to preallocate it on stack.
- */
- body->m_body = 0x80 | 2;
- body->k_replica_id = IPROTO_REPLICA_ID;
- body->m_replica_id = 0xce;
- body->v_replica_id = mp_bswap_u32(req->replica_id);
- body->k_lsn = IPROTO_LSN;
- body->m_lsn = 0xcf;
- body->v_lsn = mp_bswap_u64(req->lsn);
+ assert(iproto_type_is_synchro_request(req->type));
- memset(row, 0, sizeof(*row));
+ char *pos = body;
+
+ pos = mp_encode_map(pos,
+ iproto_type_is_promote_request(req->type) ? 3 : 2);
+ pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
+ pos = mp_encode_uint(pos, req->replica_id);
+
+ pos = mp_encode_uint(pos, IPROTO_LSN);
+ pos = mp_encode_uint(pos, req->lsn);
+
+ if (iproto_type_is_promote_request(req->type)) {
+ pos = mp_encode_uint(pos, IPROTO_TERM);
+ pos = mp_encode_uint(pos, req->term);
+ }
+
+ assert(pos - body < XROW_SYNCHRO_BODY_LEN_MAX);
+
+ memset(row, 0, sizeof(*row));
row->type = req->type;
- row->body[0].iov_base = (void *)body;
- row->body[0].iov_len = sizeof(*body);
+ row->body[0].iov_base = body;
+ row->body[0].iov_len = pos - body;
row->bodycnt = 1;
}
@@ -952,11 +957,17 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
case IPROTO_LSN:
req->lsn = mp_decode_uint(&d);
break;
+ case IPROTO_TERM:
+ req->term = mp_decode_uint(&d);
+ break;
default:
mp_next(&d);
}
}
+
req->type = row->type;
+ req->origin_id = row->replica_id;
+
return 0;
}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 5ea99e792..b3c664be2 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -49,6 +49,7 @@ enum {
XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
XROW_HEADER_LEN_MAX = 52,
XROW_BODY_LEN_MAX = 256,
+ XROW_SYNCHRO_BODY_LEN_MAX = 32,
IPROTO_HEADER_LEN = 28,
/** 7 = sizeof(iproto_body_bin). */
IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -226,7 +227,10 @@ xrow_encode_dml(const struct request *request, struct region *region,
* pending synchronous transactions.
*/
struct synchro_request {
- /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
+ /**
+ * Operation type - either IPROTO_ROLLBACK or IPROTO_CONFIRM or
+ * IPROTO_PROMOTE
+ */
uint16_t type;
/**
* ID of the instance owning the pending transactions.
@@ -236,25 +240,25 @@ struct synchro_request {
* finish transactions of an old master.
*/
uint32_t replica_id;
+ /**
+ * Id of the instance which has issued this request. Only filled on
+ * decoding, and left blank when encoding a request.
+ */
+ uint32_t origin_id;
/**
* Operation LSN.
* In case of CONFIRM it means 'confirm all
* transactions with lsn <= this value'.
* In case of ROLLBACK it means 'rollback all transactions
* with lsn >= this value'.
+ * In case of PROMOTE it means CONFIRM(lsn) + ROLLBACK(lsn+1)
*/
int64_t lsn;
-};
-
-/** Synchro request xrow's body in MsgPack format. */
-struct PACKED synchro_body_bin {
- uint8_t m_body;
- uint8_t k_replica_id;
- uint8_t m_replica_id;
- uint32_t v_replica_id;
- uint8_t k_lsn;
- uint8_t m_lsn;
- uint64_t v_lsn;
+ /**
+ * The new term the instance issuing this request is in. Only used for
+ * PROMOTE request.
+ */
+ uint64_t term;
};
/**
@@ -264,8 +268,7 @@ struct PACKED synchro_body_bin {
* @param req Request parameters.
*/
void
-xrow_encode_synchro(struct xrow_header *row,
- struct synchro_body_bin *body,
+xrow_encode_synchro(struct xrow_header *row, char *body,
const struct synchro_request *req);
/**
--
2.24.3 (Apple Git-128)
More information about the Tarantool-patches
mailing list