[Tarantool-patches] [PATCH v2 05/19] xrow: introduce CONFIRM and ROLLBACK entries
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Jun 30 02:15:24 MSK 2020
From: Serge Petrenko <sergepetrenko at tarantool.org>
Add methods to encode/decode CONFIRM entry.
A CONFIRM entry will be written to WAL by synchronous replication master
as soon as it finds that the transaction was applied on a quorum of
replicas.
CONFIRM rows share the same header with other rows in WAL, but their body
differs: it's just a map containing replica_id and lsn of the last
confirmed transaction.
ROLLBACK request contains the same data as CONFIRM request.
The only difference is the request semantics. While a CONFIRM request
releases all the limbo entries up to the given lsn, the ROLLBACK request
rolls back all the entries with lsn greater than given one.
Part-of #4847
Part-of #4848
@TarantoolBot document
Title: document synchronous replication auxiliary requests
Two new iproto request codes are added:
* IPROTO_CONFIRM = 0x28 (decimal 40)
* IPROTO_ROLLBACK = 0x29 (decimal 41)
Both entries share the same request body (it's a map of 2 items):
IPROTO_REPLICA_ID : leader_id - id of the synchronous replication leader,
IPROTO_LSN : leader_lsn - lsn of the last confirmed transaction.
The CONFIRM and ROLLBACK ops are written to WAL, so their header also has
IPROTO_REPLICA_ID and IPROTO_LSN fields, which are replica_id : lsn of the
instance that wrote these records. leader_id may be different from
replica_id, and leader_lsn refers to some past moment in time.
When an instance either reads from WAL or receives a CONFIRM entry via
replication, it knows that all the leader's synchronous transactions
up to the given leader_lsn may be safely committed.
When an instance receives or reads a ROLLBACK entry, it knows that all
the leader's transactions received up to the given point in time must be
rolled back, starting with a transaction, which begins with leader_lsn + 1.
---
src/box/iproto_constants.h | 12 +++++
src/box/xrow.c | 106 +++++++++++++++++++++++++++++++++++++
src/box/xrow.h | 46 ++++++++++++++++
3 files changed, 164 insertions(+)
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index e38ee4529..6b850f101 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -219,6 +219,11 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
+ /** A confirmation message for synchronous transactions. */
+ IPROTO_CONFIRM = 40,
+ /** A rollback message for synchronous transactions. */
+ IPROTO_ROLLBACK = 41,
+
/** PING request */
IPROTO_PING = 64,
/** Replication JOIN command */
@@ -316,6 +321,13 @@ dml_request_key_map(uint32_t type)
return iproto_body_key_map[type];
}
+/** CONFIRM/ROLLBACK entries for synchronous replication. */
+static inline bool
+iproto_type_is_synchro_request(uint32_t type)
+{
+ return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
+}
+
/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index bb64864b2..39d1814c4 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -878,6 +878,112 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
+static int
+xrow_encode_confirm_rollback(struct xrow_header *row, uint32_t replica_id,
+ int64_t lsn, int type)
+{
+ size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
+ mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
+ mp_sizeof_uint(lsn);
+ char *buf = (char *)region_alloc(&fiber()->gc, len);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, len, "region_alloc", "buf");
+ return -1;
+ }
+ char *pos = buf;
+
+ pos = mp_encode_map(pos, 2);
+ pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
+ pos = mp_encode_uint(pos, replica_id);
+ pos = mp_encode_uint(pos, IPROTO_LSN);
+ pos = mp_encode_uint(pos, lsn);
+
+ memset(row, 0, sizeof(*row));
+
+ row->body[0].iov_base = buf;
+ row->body[0].iov_len = len;
+ row->bodycnt = 1;
+
+ row->type = type;
+
+ return 0;
+}
+
+int
+xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn)
+{
+ return xrow_encode_confirm_rollback(row, replica_id, lsn,
+ IPROTO_CONFIRM);
+}
+
+int
+xrow_encode_rollback(struct xrow_header *row, uint32_t replica_id, int64_t lsn)
+{
+ return xrow_encode_confirm_rollback(row, replica_id, lsn,
+ IPROTO_ROLLBACK);
+}
+
+static int
+xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
+ int64_t *lsn)
+{
+ if (row->bodycnt == 0) {
+ diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
+ return -1;
+ }
+
+ assert(row->bodycnt == 1);
+
+ const char * const data = (const char *)row->body[0].iov_base;
+ const char * const end = data + row->body[0].iov_len;
+ const char *d = data;
+ if (mp_check(&d, end) != 0 || mp_typeof(*data) != MP_MAP) {
+ xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ "request body");
+ return -1;
+ }
+
+ d = data;
+ uint32_t map_size = mp_decode_map(&d);
+ for (uint32_t i = 0; i < map_size; i++) {
+ enum mp_type type = mp_typeof(*d);
+ if (type != MP_UINT) {
+ mp_next(&d);
+ mp_next(&d);
+ continue;
+ }
+ uint8_t key = mp_decode_uint(&d);
+ if (key >= IPROTO_KEY_MAX || iproto_key_type[key] != type) {
+ xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+ "request body");
+ return -1;
+ }
+ switch (key) {
+ case IPROTO_REPLICA_ID:
+ *replica_id = mp_decode_uint(&d);
+ break;
+ case IPROTO_LSN:
+ *lsn = mp_decode_uint(&d);
+ break;
+ default:
+ mp_next(&d);
+ }
+ }
+ return 0;
+}
+
+int
+xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
+{
+ return xrow_decode_confirm_rollback(row, replica_id, lsn);
+}
+
+int
+xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
+{
+ return xrow_decode_confirm_rollback(row, replica_id, lsn);
+}
+
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 2a0a9c852..1def394e7 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -207,6 +207,52 @@ int
xrow_encode_dml(const struct request *request, struct region *region,
struct iovec *iov);
+/**
+ * Encode the CONFIRM to row body and set row type to
+ * IPROTO_CONFIRM.
+ * @param row xrow header.
+ * @param replica_id master's instance id.
+ * @param lsn last confirmed lsn.
+ * @retval -1 on error.
+ * @retval 0 success.
+ */
+int
+xrow_encode_confirm(struct xrow_header *row, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Decode the CONFIRM request body.
+ * @param row xrow header.
+ * @param[out] replica_id master's instance id.
+ * @param[out] lsn last confirmed lsn.
+ * @retval -1 on error.
+ * @retval 0 success.
+ */
+int
+xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+
+/**
+ * Encode the ROLLBACK row body and set row type to
+ * IPROTO_ROLLBACK.
+ * @param row xrow header.
+ * @param replica_id master's instance id.
+ * @param lsn lsn to rollback to.
+ * @retval -1 on error.
+ * @retval 0 success.
+ */
+int
+xrow_encode_rollback(struct xrow_header *row, uint32_t replica_id, int64_t lsn);
+
+/**
+ * Decode the ROLLBACK row body.
+ * @param row xrow header.
+ * @param[out] replica_id master's instance id.
+ * @param[out] lsn lsn to rollback to.
+ * @retval -1 on error.
+ * @retval 0 success.
+ */
+int
+xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+
/**
* CALL/EVAL request.
*/
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list