[Tarantool-patches] [PATCH 1/1] xrow: introduce struct synchro_request

Serge Petrenko sergepetrenko at tarantool.org
Mon Aug 17 18:51:00 MSK 2020


 
Hi! Thanks for the patch!

LGTM.
 
 
 
  
>Суббота, 15 августа 2020, 16:24 +03:00 от Vladislav Shpilevoy <v.shpilevoy at tarantool.org>:
> 
>Hi! Thanks for the review!
>
>On 14.08.2020 00:17, Cyrill Gorcunov wrote:
>> On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote:
>> ...
>>> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>>> {
>>> assert(lsn > 0);
>>>
>>> + struct synchro_request req;
>>> + req.type = type;
>>> + req.replica_id = limbo->instance_id;
>>> + req.lsn = lsn;
>>> +
>>
>> Vlad, while you're at this code, could we please use designated
>> initialization from the very beginning, ie
>>
>> struct synchro_request req = {
>> .type = type,
>> .replica_id = limbo->instance_id,
>> .lsn = lsn,
>> };
>>
>> (the alignment is up to you though). Such initialization won't
>> allow a bug to happen when we get a structure extension and
>> other non updated fields will be zeroified.
>Ok, done:
>
>====================
>@@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> {
>  assert(lsn > 0);
> 
>- struct synchro_request req;
>- req.type = type;
>- req.replica_id = limbo->instance_id;
>- req.lsn = lsn;
>+ struct synchro_request req = {
>+ .type = type,
>+ .replica_id = limbo->instance_id,
>+ .lsn = lsn,
>+ };
> 
>====================
>
>New complete patch:
>
>====================
>diff --git a/src/box/applier.cc b/src/box/applier.cc
>index a953d293e..98fb87375 100644
>--- a/src/box/applier.cc
>+++ b/src/box/applier.cc
>@@ -275,26 +275,14 @@ process_nop(struct request *request)
>  * or rollback some of its entries.
>  */
> static int
>-process_confirm_rollback(struct request *request, bool is_confirm)
>+process_synchro_row(struct request *request)
> {
>  assert(iproto_type_is_synchro_request(request->header->type));
>- uint32_t replica_id;
>  struct txn *txn = in_txn();
>- int64_t lsn = 0;
> 
>- int res = 0;
>- if (is_confirm)
>- res = xrow_decode_confirm(request->header, &replica_id, &lsn);
>- else
>- res = xrow_decode_rollback(request->header, &replica_id, &lsn);
>- if (res == -1)
>- return -1;
>-
>- if (replica_id != txn_limbo.instance_id) {
>- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
>- txn_limbo.instance_id);
>+ struct synchro_request syn_req;
>+ if (xrow_decode_synchro(request->header, &syn_req) != 0)
>  return -1;
>- }
>  assert(txn->n_applier_rows == 0);
>  /*
>  * This is not really a transaction. It just uses txn API
>@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)
> 
>  if (txn_begin_stmt(txn, NULL) != 0)
>  return -1;
>-
>- if (txn_commit_stmt(txn, request) == 0) {
>- if (is_confirm)
>- txn_limbo_read_confirm(&txn_limbo, lsn);
>- else
>- txn_limbo_read_rollback(&txn_limbo, lsn);
>- return 0;
>- } else {
>+ if (txn_commit_stmt(txn, request) != 0)
>  return -1;
>- }
>+ return txn_limbo_process(&txn_limbo, &syn_req);
> }
> 
> static int
>@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)
>  struct request request;
>  if (iproto_type_is_synchro_request(row->type)) {
>  request.header = row;
>- return process_confirm_rollback(&request,
>- row->type == IPROTO_CONFIRM);
>+ return process_synchro_row(&request);
>  }
>  if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
>  return -1;
>diff --git a/src/box/box.cc b/src/box/box.cc
>index 83eef5d98..8e811e9c1 100644
>--- a/src/box/box.cc
>+++ b/src/box/box.cc
>@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
> {
>  struct request request;
>  if (iproto_type_is_synchro_request(row->type)) {
>- uint32_t replica_id;
>- int64_t lsn;
>- switch(row->type) {
>- case IPROTO_CONFIRM:
>- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)
>- diag_raise();
>- assert(txn_limbo.instance_id == replica_id);
>- txn_limbo_read_confirm(&txn_limbo, lsn);
>- break;
>- case IPROTO_ROLLBACK:
>- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)
>- diag_raise();
>- assert(txn_limbo.instance_id == replica_id);
>- txn_limbo_read_rollback(&txn_limbo, lsn);
>- break;
>- }
>+ struct synchro_request syn_req;
>+ if (xrow_decode_synchro(row, &syn_req) != 0)
>+ diag_raise();
>+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
>+ diag_raise();
>  return;
>  }
>  xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
>diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>index a2043c17a..c6a4e5efc 100644
>--- a/src/box/txn_limbo.c
>+++ b/src/box/txn_limbo.c
>@@ -31,6 +31,7 @@
> #include "txn.h"
> #include "txn_limbo.h"
> #include "replication.h"
>+#include "iproto_constants.h"
> 
> struct txn_limbo txn_limbo;
> 
>@@ -272,11 +273,16 @@ complete:
> }
> 
> static void
>-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
>- bool is_confirm)
>+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> {
>  assert(lsn > 0);
> 
>+ struct synchro_request req = {
>+ .type = type,
>+ .replica_id = limbo->instance_id,
>+ .lsn = lsn,
>+ };
>+
>  struct xrow_header row;
>  struct request request = {
>  .header = &row,
>@@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
>  if (txn == NULL)
>  goto rollback;
> 
>- int res = 0;
>- if (is_confirm) {
>- res = xrow_encode_confirm(&row, &txn->region,
>- limbo->instance_id, lsn);
>- } else {
>- /*
>- * This LSN is the first to be rolled back, so
>- * the last "safe" lsn is lsn - 1.
>- */
>- res = xrow_encode_rollback(&row, &txn->region,
>- limbo->instance_id, lsn);
>- }
>- if (res == -1)
>+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
>  goto rollback;
>  /*
>  * This is not really a transaction. It just uses txn API
>@@ -325,7 +319,7 @@ rollback:
>  * problems are fixed. Or retry automatically with some period.
>  */
>  panic("Could not write a synchro request to WAL: lsn = %lld, type = "
>- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");
>+ "%s\n", lsn, iproto_type_name(type));
> }
> 
> /**
>@@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
>  assert(lsn > limbo->confirmed_lsn);
>  assert(!limbo->is_in_rollback);
>  limbo->confirmed_lsn = lsn;
>- txn_limbo_write_confirm_rollback(limbo, lsn, true);
>+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
> }
> 
>-void
>+/** Confirm all the entries <= @a lsn. */
>+static void
> txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
> {
>  assert(limbo->instance_id != REPLICA_ID_NIL);
>@@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
>  assert(lsn > limbo->confirmed_lsn);
>  assert(!limbo->is_in_rollback);
>  limbo->is_in_rollback = true;
>- txn_limbo_write_confirm_rollback(limbo, lsn, false);
>+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
>  limbo->is_in_rollback = false;
> }
> 
>-void
>+/** Rollback all the entries >= @a lsn. */
>+static void
> txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
> {
>  assert(limbo->instance_id != REPLICA_ID_NIL);
>@@ -577,6 +573,27 @@ complete:
>  return 0;
> }
> 
>+int
>+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
>+{
>+ if (req->replica_id != limbo->instance_id) {
>+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,
>+ limbo->instance_id);
>+ return -1;
>+ }
>+ switch (req->type) {
>+ case IPROTO_CONFIRM:
>+ txn_limbo_read_confirm(limbo, req->lsn);
>+ break;
>+ case IPROTO_ROLLBACK:
>+ txn_limbo_read_rollback(limbo, req->lsn);
>+ break;
>+ default:
>+ unreachable();
>+ }
>+ return 0;
>+}
>+
> void
> txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
> {
>diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
>index 04ee7ea5c..eaf662987 100644
>--- a/src/box/txn_limbo.h
>+++ b/src/box/txn_limbo.h
>@@ -39,6 +39,7 @@ extern "C" {
> #endif /* defined(__cplusplus) */
> 
> struct txn;
>+struct synchro_request;
> 
> /**
>  * Transaction and its quorum metadata, to be stored in limbo.
>@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
> int
> txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
> 
>-/**
>- * Confirm all the entries up to the given master's LSN.
>- */
>-void
>-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
>-
>-/**
>- * Rollback all the entries starting with given master's LSN.
>- */
>-void
>-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
>+/** Execute a synchronous replication request. */
>+int
>+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
> 
> /**
>  * Waiting for confirmation of all "sync" transactions
>diff --git a/src/box/xrow.c b/src/box/xrow.c
>index 0c797a9d5..bf174c701 100644
>--- a/src/box/xrow.c
>+++ b/src/box/xrow.c
>@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,
>  return iovcnt;
> }
> 
>-static int
>-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
>- uint32_t replica_id, int64_t lsn, int type)
>+int
>+xrow_encode_synchro(struct xrow_header *row, struct region *region,
>+ const struct synchro_request *req)
> {
>  size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
>- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
>- mp_sizeof_uint(lsn);
>+ mp_sizeof_uint(req->replica_id) +
>+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
>  char *buf = (char *)region_alloc(region, len);
>  if (buf == NULL) {
>  diag_set(OutOfMemory, len, "region_alloc", "buf");
>@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
> 
>  pos = mp_encode_map(pos, 2);
>  pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
>- pos = mp_encode_uint(pos, replica_id);
>+ pos = mp_encode_uint(pos, req->replica_id);
>  pos = mp_encode_uint(pos, IPROTO_LSN);
>- pos = mp_encode_uint(pos, lsn);
>+ pos = mp_encode_uint(pos, req->lsn);
> 
>  memset(row, 0, sizeof(*row));
> 
>@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
>  row->body[0].iov_len = len;
>  row->bodycnt = 1;
> 
>- row->type = type;
>+ row->type = req->type;
> 
>  return 0;
> }
> 
> int
>-xrow_encode_confirm(struct xrow_header *row, struct region *region,
>- uint32_t replica_id, int64_t lsn)
>-{
>- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
>- IPROTO_CONFIRM);
>-}
>-
>-int
>-xrow_encode_rollback(struct xrow_header *row, struct region *region,
>- uint32_t replica_id, int64_t lsn)
>-{
>- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
>- IPROTO_ROLLBACK);
>-}
>-
>-static int
>-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
>- int64_t *lsn)
>+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
> {
>  if (row->bodycnt == 0) {
>  diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
>@@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
>  return -1;
>  }
> 
>+ memset(req, 0, sizeof(*req));
>  d = data;
>  uint32_t map_size = mp_decode_map(&d);
>  for (uint32_t i = 0; i < map_size; i++) {
>@@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
>  }
>  switch (key) {
>  case IPROTO_REPLICA_ID:
>- *replica_id = mp_decode_uint(&d);
>+ req->replica_id = mp_decode_uint(&d);
>  break;
>  case IPROTO_LSN:
>- *lsn = mp_decode_uint(&d);
>+ req->lsn = mp_decode_uint(&d);
>  break;
>  default:
>  mp_next(&d);
>  }
>  }
>+ req->type = row->type;
>  return 0;
> }
> 
>-int
>-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
>-{
>- return xrow_decode_confirm_rollback(row, replica_id, lsn);
>-}
>-
>-int
>-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
>-{
>- return xrow_decode_confirm_rollback(row, replica_id, lsn);
>-}
>-
> int
> xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
> {
>diff --git a/src/box/xrow.h b/src/box/xrow.h
>index e21ede5a3..02dca74e5 100644
>--- a/src/box/xrow.h
>+++ b/src/box/xrow.h
>@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,
>  struct iovec *iov);
> 
> /**
>- * Encode the CONFIRM to row body and set row type to
>- * IPROTO_CONFIRM.
>- * @param row xrow header.
>- * @param region Region to use to encode the confirmation body.
>- * @param replica_id master's instance id.
>- * @param lsn last confirmed lsn.
>- * @retval -1 on error.
>- * @retval 0 success.
>+ * Synchronous replication request - confirmation or rollback of
>+ * pending synchronous transactions.
>  */
>-int
>-xrow_encode_confirm(struct xrow_header *row, struct region *region,
>- uint32_t replica_id, int64_t lsn);
>+struct synchro_request {
>+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
>+ uint32_t type;
>+ /**
>+ * ID of the instance owning the pending transactions.
>+ * Note, it may be not the same instance, who created this
>+ * request. An instance can make an operation on foreign
>+ * synchronous transactions in case a new master tries to
>+ * finish transactions of an old master.
>+ */
>+ uint32_t replica_id;
>+ /**
>+ * Operation LSN.
>+ * In case of CONFIRM it means 'confirm all
>+ * transactions with lsn <= this value'.
>+ * In case of ROLLBACK it means 'rollback all transactions
>+ * with lsn >= this value'.
>+ */
>+ int64_t lsn;
>+};
> 
> /**
>- * Decode the CONFIRM request body.
>+ * Encode synchronous replication request.
>  * @param row xrow header.
>- * @param[out] replica_id master's instance id.
>- * @param[out] lsn last confirmed lsn.
>+ * @param region Region to use to encode the confirmation body.
>+ * @param req Request parameters.
>  * @retval -1 on error.
>  * @retval 0 success.
>  */
> int
>-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
>-
>-/**
>- * Encode the ROLLBACK row body and set row type to
>- * IPROTO_ROLLBACK.
>- * @param row xrow header.
>- * @param region Region to use to encode the rollback body.
>- * @param replica_id master's instance id.
>- * @param lsn lsn to rollback from, including it.
>- * @retval -1 on error.
>- * @retval 0 success.
>- */
>-int
>-xrow_encode_rollback(struct xrow_header *row, struct region *region,
>- uint32_t replica_id, int64_t lsn);
>+xrow_encode_synchro(struct xrow_header *row, struct region *region,
>+ const struct synchro_request *req);
> 
> /**
>- * Decode the ROLLBACK row body.
>+ * Decode synchronous replication request.
>  * @param row xrow header.
>- * @param[out] replica_id master's instance id.
>- * @param[out] lsn lsn to rollback from, including it.
>+ * @param[out] req Request parameters.
>  * @retval -1 on error.
>  * @retval 0 success.
>  */
> int
>-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
>+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
> 
> /**
>  * CALL/EVAL request.
> 
 
--
Serge Petrenko
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20200817/dd1ebf9d/attachment.html>


More information about the Tarantool-patches mailing list