Hi! Thanks for the patch! LGTM.         >Суббота, 15 августа 2020, 16:24 +03:00 от Vladislav Shpilevoy : >  >Hi! Thanks for the review! > >On 14.08.2020 00:17, Cyrill Gorcunov wrote: >> On Thu, Aug 13, 2020 at 11:58:20PM +0200, Vladislav Shpilevoy wrote: >> ... >>> +txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) >>> { >>> assert(lsn > 0); >>> >>> + struct synchro_request req; >>> + req.type = type; >>> + req.replica_id = limbo->instance_id; >>> + req.lsn = lsn; >>> + >> >> Vlad, while you're at this code, could we please use designated >> initialization from the very beginning, ie >> >> struct synchro_request req = { >> .type = type, >> .replica_id = limbo->instance_id, >> .lsn = lsn, >> }; >> >> (the alignment is up to you though). Such initialization won't >> allow a bug to happen when we get a structure extension and >> other non updated fields will be zeroified. >Ok, done: > >==================== >@@ -277,10 +277,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) > { >  assert(lsn > 0); >  >- struct synchro_request req; >- req.type = type; >- req.replica_id = limbo->instance_id; >- req.lsn = lsn; >+ struct synchro_request req = { >+ .type = type, >+ .replica_id = limbo->instance_id, >+ .lsn = lsn, >+ }; >  >==================== > >New complete patch: > >==================== >diff --git a/src/box/applier.cc b/src/box/applier.cc >index a953d293e..98fb87375 100644 >--- a/src/box/applier.cc >+++ b/src/box/applier.cc >@@ -275,26 +275,14 @@ process_nop(struct request *request) >  * or rollback some of its entries. >  */ > static int >-process_confirm_rollback(struct request *request, bool is_confirm) >+process_synchro_row(struct request *request) > { >  assert(iproto_type_is_synchro_request(request->header->type)); >- uint32_t replica_id; >  struct txn *txn = in_txn(); >- int64_t lsn = 0; >  >- int res = 0; >- if (is_confirm) >- res = xrow_decode_confirm(request->header, &replica_id, &lsn); >- else >- res = xrow_decode_rollback(request->header, &replica_id, &lsn); >- if (res == -1) >- return -1; >- >- if (replica_id != txn_limbo.instance_id) { >- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id, >- txn_limbo.instance_id); >+ struct synchro_request syn_req; >+ if (xrow_decode_synchro(request->header, &syn_req) != 0) >  return -1; >- } >  assert(txn->n_applier_rows == 0); >  /* >  * This is not really a transaction. It just uses txn API >@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm) >  >  if (txn_begin_stmt(txn, NULL) != 0) >  return -1; >- >- if (txn_commit_stmt(txn, request) == 0) { >- if (is_confirm) >- txn_limbo_read_confirm(&txn_limbo, lsn); >- else >- txn_limbo_read_rollback(&txn_limbo, lsn); >- return 0; >- } else { >+ if (txn_commit_stmt(txn, request) != 0) >  return -1; >- } >+ return txn_limbo_process(&txn_limbo, &syn_req); > } >  > static int >@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row) >  struct request request; >  if (iproto_type_is_synchro_request(row->type)) { >  request.header = row; >- return process_confirm_rollback(&request, >- row->type == IPROTO_CONFIRM); >+ return process_synchro_row(&request); >  } >  if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0) >  return -1; >diff --git a/src/box/box.cc b/src/box/box.cc >index 83eef5d98..8e811e9c1 100644 >--- a/src/box/box.cc >+++ b/src/box/box.cc >@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row) > { >  struct request request; >  if (iproto_type_is_synchro_request(row->type)) { >- uint32_t replica_id; >- int64_t lsn; >- switch(row->type) { >- case IPROTO_CONFIRM: >- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0) >- diag_raise(); >- assert(txn_limbo.instance_id == replica_id); >- txn_limbo_read_confirm(&txn_limbo, lsn); >- break; >- case IPROTO_ROLLBACK: >- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0) >- diag_raise(); >- assert(txn_limbo.instance_id == replica_id); >- txn_limbo_read_rollback(&txn_limbo, lsn); >- break; >- } >+ struct synchro_request syn_req; >+ if (xrow_decode_synchro(row, &syn_req) != 0) >+ diag_raise(); >+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0) >+ diag_raise(); >  return; >  } >  xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); >diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c >index a2043c17a..c6a4e5efc 100644 >--- a/src/box/txn_limbo.c >+++ b/src/box/txn_limbo.c >@@ -31,6 +31,7 @@ > #include "txn.h" > #include "txn_limbo.h" > #include "replication.h" >+#include "iproto_constants.h" >  > struct txn_limbo txn_limbo; >  >@@ -272,11 +273,16 @@ complete: > } >  > static void >-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, >- bool is_confirm) >+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn) > { >  assert(lsn > 0); >  >+ struct synchro_request req = { >+ .type = type, >+ .replica_id = limbo->instance_id, >+ .lsn = lsn, >+ }; >+ >  struct xrow_header row; >  struct request request = { >  .header = &row, >@@ -286,19 +292,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn, >  if (txn == NULL) >  goto rollback; >  >- int res = 0; >- if (is_confirm) { >- res = xrow_encode_confirm(&row, &txn->region, >- limbo->instance_id, lsn); >- } else { >- /* >- * This LSN is the first to be rolled back, so >- * the last "safe" lsn is lsn - 1. >- */ >- res = xrow_encode_rollback(&row, &txn->region, >- limbo->instance_id, lsn); >- } >- if (res == -1) >+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0) >  goto rollback; >  /* >  * This is not really a transaction. It just uses txn API >@@ -325,7 +319,7 @@ rollback: >  * problems are fixed. Or retry automatically with some period. >  */ >  panic("Could not write a synchro request to WAL: lsn = %lld, type = " >- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK"); >+ "%s\n", lsn, iproto_type_name(type)); > } >  > /** >@@ -338,10 +332,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) >  assert(lsn > limbo->confirmed_lsn); >  assert(!limbo->is_in_rollback); >  limbo->confirmed_lsn = lsn; >- txn_limbo_write_confirm_rollback(limbo, lsn, true); >+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn); > } >  >-void >+/** Confirm all the entries <= @a lsn. */ >+static void > txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) > { >  assert(limbo->instance_id != REPLICA_ID_NIL); >@@ -390,11 +385,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) >  assert(lsn > limbo->confirmed_lsn); >  assert(!limbo->is_in_rollback); >  limbo->is_in_rollback = true; >- txn_limbo_write_confirm_rollback(limbo, lsn, false); >+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn); >  limbo->is_in_rollback = false; > } >  >-void >+/** Rollback all the entries >= @a lsn. */ >+static void > txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) > { >  assert(limbo->instance_id != REPLICA_ID_NIL); >@@ -577,6 +573,27 @@ complete: >  return 0; > } >  >+int >+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req) >+{ >+ if (req->replica_id != limbo->instance_id) { >+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id, >+ limbo->instance_id); >+ return -1; >+ } >+ switch (req->type) { >+ case IPROTO_CONFIRM: >+ txn_limbo_read_confirm(limbo, req->lsn); >+ break; >+ case IPROTO_ROLLBACK: >+ txn_limbo_read_rollback(limbo, req->lsn); >+ break; >+ default: >+ unreachable(); >+ } >+ return 0; >+} >+ > void > txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn) > { >diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h >index 04ee7ea5c..eaf662987 100644 >--- a/src/box/txn_limbo.h >+++ b/src/box/txn_limbo.h >@@ -39,6 +39,7 @@ extern "C" { > #endif /* defined(__cplusplus) */ >  > struct txn; >+struct synchro_request; >  > /** >  * Transaction and its quorum metadata, to be stored in limbo. >@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn); > int > txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry); >  >-/** >- * Confirm all the entries up to the given master's LSN. >- */ >-void >-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn); >- >-/** >- * Rollback all the entries starting with given master's LSN. >- */ >-void >-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn); >+/** Execute a synchronous replication request. */ >+int >+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req); >  > /** >  * Waiting for confirmation of all "sync" transactions >diff --git a/src/box/xrow.c b/src/box/xrow.c >index 0c797a9d5..bf174c701 100644 >--- a/src/box/xrow.c >+++ b/src/box/xrow.c >@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region, >  return iovcnt; > } >  >-static int >-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, >- uint32_t replica_id, int64_t lsn, int type) >+int >+xrow_encode_synchro(struct xrow_header *row, struct region *region, >+ const struct synchro_request *req) > { >  size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) + >- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) + >- mp_sizeof_uint(lsn); >+ mp_sizeof_uint(req->replica_id) + >+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn); >  char *buf = (char *)region_alloc(region, len); >  if (buf == NULL) { >  diag_set(OutOfMemory, len, "region_alloc", "buf"); >@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, >  >  pos = mp_encode_map(pos, 2); >  pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); >- pos = mp_encode_uint(pos, replica_id); >+ pos = mp_encode_uint(pos, req->replica_id); >  pos = mp_encode_uint(pos, IPROTO_LSN); >- pos = mp_encode_uint(pos, lsn); >+ pos = mp_encode_uint(pos, req->lsn); >  >  memset(row, 0, sizeof(*row)); >  >@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region, >  row->body[0].iov_len = len; >  row->bodycnt = 1; >  >- row->type = type; >+ row->type = req->type; >  >  return 0; > } >  > int >-xrow_encode_confirm(struct xrow_header *row, struct region *region, >- uint32_t replica_id, int64_t lsn) >-{ >- return xrow_encode_confirm_rollback(row, region, replica_id, lsn, >- IPROTO_CONFIRM); >-} >- >-int >-xrow_encode_rollback(struct xrow_header *row, struct region *region, >- uint32_t replica_id, int64_t lsn) >-{ >- return xrow_encode_confirm_rollback(row, region, replica_id, lsn, >- IPROTO_ROLLBACK); >-} >- >-static int >-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, >- int64_t *lsn) >+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) > { >  if (row->bodycnt == 0) { >  diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); >@@ -960,6 +943,7 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, >  return -1; >  } >  >+ memset(req, 0, sizeof(*req)); >  d = data; >  uint32_t map_size = mp_decode_map(&d); >  for (uint32_t i = 0; i < map_size; i++) { >@@ -977,30 +961,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id, >  } >  switch (key) { >  case IPROTO_REPLICA_ID: >- *replica_id = mp_decode_uint(&d); >+ req->replica_id = mp_decode_uint(&d); >  break; >  case IPROTO_LSN: >- *lsn = mp_decode_uint(&d); >+ req->lsn = mp_decode_uint(&d); >  break; >  default: >  mp_next(&d); >  } >  } >+ req->type = row->type; >  return 0; > } >  >-int >-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) >-{ >- return xrow_decode_confirm_rollback(row, replica_id, lsn); >-} >- >-int >-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn) >-{ >- return xrow_decode_confirm_rollback(row, replica_id, lsn); >-} >- > int > xrow_to_iovec(const struct xrow_header *row, struct iovec *out) > { >diff --git a/src/box/xrow.h b/src/box/xrow.h >index e21ede5a3..02dca74e5 100644 >--- a/src/box/xrow.h >+++ b/src/box/xrow.h >@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region, >  struct iovec *iov); >  > /** >- * Encode the CONFIRM to row body and set row type to >- * IPROTO_CONFIRM. >- * @param row xrow header. >- * @param region Region to use to encode the confirmation body. >- * @param replica_id master's instance id. >- * @param lsn last confirmed lsn. >- * @retval -1 on error. >- * @retval 0 success. >+ * Synchronous replication request - confirmation or rollback of >+ * pending synchronous transactions. >  */ >-int >-xrow_encode_confirm(struct xrow_header *row, struct region *region, >- uint32_t replica_id, int64_t lsn); >+struct synchro_request { >+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */ >+ uint32_t type; >+ /** >+ * ID of the instance owning the pending transactions. >+ * Note, it may be not the same instance, who created this >+ * request. An instance can make an operation on foreign >+ * synchronous transactions in case a new master tries to >+ * finish transactions of an old master. >+ */ >+ uint32_t replica_id; >+ /** >+ * Operation LSN. >+ * In case of CONFIRM it means 'confirm all >+ * transactions with lsn <= this value'. >+ * In case of ROLLBACK it means 'rollback all transactions >+ * with lsn >= this value'. >+ */ >+ int64_t lsn; >+}; >  > /** >- * Decode the CONFIRM request body. >+ * Encode synchronous replication request. >  * @param row xrow header. >- * @param[out] replica_id master's instance id. >- * @param[out] lsn last confirmed lsn. >+ * @param region Region to use to encode the confirmation body. >+ * @param req Request parameters. >  * @retval -1 on error. >  * @retval 0 success. >  */ > int >-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); >- >-/** >- * Encode the ROLLBACK row body and set row type to >- * IPROTO_ROLLBACK. >- * @param row xrow header. >- * @param region Region to use to encode the rollback body. >- * @param replica_id master's instance id. >- * @param lsn lsn to rollback from, including it. >- * @retval -1 on error. >- * @retval 0 success. >- */ >-int >-xrow_encode_rollback(struct xrow_header *row, struct region *region, >- uint32_t replica_id, int64_t lsn); >+xrow_encode_synchro(struct xrow_header *row, struct region *region, >+ const struct synchro_request *req); >  > /** >- * Decode the ROLLBACK row body. >+ * Decode synchronous replication request. >  * @param row xrow header. >- * @param[out] replica_id master's instance id. >- * @param[out] lsn lsn to rollback from, including it. >+ * @param[out] req Request parameters. >  * @retval -1 on error. >  * @retval 0 success. >  */ > int >-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn); >+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); >  > /** >  * CALL/EVAL request. >    -- Serge Petrenko