* [Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 2/9] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
` (8 subsequent siblings)
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
All requests saved to WAL and transmitted through network have
their own request structure with parameters:
- struct request for DML;
- struct call_request for CALL/EVAL;
- struct auth_request for AUTH;
- struct ballot for VOTE;
- struct sql_request for SQL;
- struct greeting for greeting.
It is done for a reason - not to pass all the request parameters
into each function one by one, and manage them all at once
instead.
For synchronous requests IPROTO_CONFIRM and IPROTO_ROLLBACK it was
not done. Because so far it was not too hard to carry just 2
parameters: lsn and replica_id, from their body.
But it will be changed in #5129. Because in fact these requests
have more parameters, but they were filled by txn module, since
synchro requests were saved to WAL via transactions (due to lack
of alternative API to access WAL).
After #5129 it will be necessary to save LSN and replica_id of the
request author. This patch introduces struct synchro_request to
simplify extension of the synchro parameters.
Closes #5151
Needed for #5129
---
src/box/applier.cc | 32 +++++------------------
src/box/box.cc | 21 ++++-----------
src/box/txn_limbo.c | 56 ++++++++++++++++++++++++++--------------
src/box/txn_limbo.h | 15 +++--------
src/box/xrow.c | 52 +++++++++----------------------------
src/box/xrow.h | 63 +++++++++++++++++++++------------------------
6 files changed, 93 insertions(+), 146 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a953d293e..98fb87375 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -275,26 +275,14 @@ process_nop(struct request *request)
* or rollback some of its entries.
*/
static int
-process_confirm_rollback(struct request *request, bool is_confirm)
+process_synchro_row(struct request *request)
{
assert(iproto_type_is_synchro_request(request->header->type));
- uint32_t replica_id;
struct txn *txn = in_txn();
- int64_t lsn = 0;
- int res = 0;
- if (is_confirm)
- res = xrow_decode_confirm(request->header, &replica_id, &lsn);
- else
- res = xrow_decode_rollback(request->header, &replica_id, &lsn);
- if (res == -1)
- return -1;
-
- if (replica_id != txn_limbo.instance_id) {
- diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, replica_id,
- txn_limbo.instance_id);
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(request->header, &syn_req) != 0)
return -1;
- }
assert(txn->n_applier_rows == 0);
/*
* This is not really a transaction. It just uses txn API
@@ -306,16 +294,9 @@ process_confirm_rollback(struct request *request, bool is_confirm)
if (txn_begin_stmt(txn, NULL) != 0)
return -1;
-
- if (txn_commit_stmt(txn, request) == 0) {
- if (is_confirm)
- txn_limbo_read_confirm(&txn_limbo, lsn);
- else
- txn_limbo_read_rollback(&txn_limbo, lsn);
- return 0;
- } else {
+ if (txn_commit_stmt(txn, request) != 0)
return -1;
- }
+ return txn_limbo_process(&txn_limbo, &syn_req);
}
static int
@@ -324,8 +305,7 @@ apply_row(struct xrow_header *row)
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
request.header = row;
- return process_confirm_rollback(&request,
- row->type == IPROTO_CONFIRM);
+ return process_synchro_row(&request);
}
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..8e811e9c1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -367,22 +367,11 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
{
struct request request;
if (iproto_type_is_synchro_request(row->type)) {
- uint32_t replica_id;
- int64_t lsn;
- switch(row->type) {
- case IPROTO_CONFIRM:
- if (xrow_decode_confirm(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_confirm(&txn_limbo, lsn);
- break;
- case IPROTO_ROLLBACK:
- if (xrow_decode_rollback(row, &replica_id, &lsn) < 0)
- diag_raise();
- assert(txn_limbo.instance_id == replica_id);
- txn_limbo_read_rollback(&txn_limbo, lsn);
- break;
- }
+ struct synchro_request syn_req;
+ if (xrow_decode_synchro(row, &syn_req) != 0)
+ diag_raise();
+ if (txn_limbo_process(&txn_limbo, &syn_req) != 0)
+ diag_raise();
return;
}
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a2043c17a..944161c30 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -31,6 +31,7 @@
#include "txn.h"
#include "txn_limbo.h"
#include "replication.h"
+#include "iproto_constants.h"
struct txn_limbo txn_limbo;
@@ -272,11 +273,15 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
}
static void
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
- bool is_confirm)
+txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
assert(lsn > 0);
+ struct synchro_request req;
+ req.type = type;
+ req.replica_id = limbo->instance_id;
+ req.lsn = lsn;
+
struct xrow_header row;
struct request request = {
.header = &row,
@@ -286,19 +291,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
if (txn == NULL)
goto rollback;
- int res = 0;
- if (is_confirm) {
- res = xrow_encode_confirm(&row, &txn->region,
- limbo->instance_id, lsn);
- } else {
- /*
- * This LSN is the first to be rolled back, so
- * the last "safe" lsn is lsn - 1.
- */
- res = xrow_encode_rollback(&row, &txn->region,
- limbo->instance_id, lsn);
- }
- if (res == -1)
+ if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
goto rollback;
/*
* This is not really a transaction. It just uses txn API
@@ -325,7 +318,7 @@ txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
* problems are fixed. Or retry automatically with some period.
*/
panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, is_confirm ? "CONFIRM" : "ROLLBACK");
+ "%s\n", lsn, iproto_type_name(type));
}
/**
@@ -338,10 +331,11 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->confirmed_lsn = lsn;
- txn_limbo_write_confirm_rollback(limbo, lsn, true);
+ txn_limbo_write_synchro(limbo, IPROTO_CONFIRM, lsn);
}
-void
+/** Confirm all the entries <= @a lsn. */
+static void
txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -390,11 +384,12 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
assert(lsn > limbo->confirmed_lsn);
assert(!limbo->is_in_rollback);
limbo->is_in_rollback = true;
- txn_limbo_write_confirm_rollback(limbo, lsn, false);
+ txn_limbo_write_synchro(limbo, IPROTO_ROLLBACK, lsn);
limbo->is_in_rollback = false;
}
-void
+/** Rollback all the entries >= @a lsn. */
+static void
txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn)
{
assert(limbo->instance_id != REPLICA_ID_NIL);
@@ -577,6 +572,27 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
return 0;
}
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req)
+{
+ if (req->replica_id != limbo->instance_id) {
+ diag_set(ClientError, ER_SYNC_MASTER_MISMATCH, req->replica_id,
+ limbo->instance_id);
+ return -1;
+ }
+ switch (req->type) {
+ case IPROTO_CONFIRM:
+ txn_limbo_read_confirm(limbo, req->lsn);
+ break;
+ case IPROTO_ROLLBACK:
+ txn_limbo_read_rollback(limbo, req->lsn);
+ break;
+ default:
+ unreachable();
+ }
+ return 0;
+}
+
void
txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 04ee7ea5c..eaf662987 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -39,6 +39,7 @@ extern "C" {
#endif /* defined(__cplusplus) */
struct txn;
+struct synchro_request;
/**
* Transaction and its quorum metadata, to be stored in limbo.
@@ -245,17 +246,9 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn);
int
txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry);
-/**
- * Confirm all the entries up to the given master's LSN.
- */
-void
-txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn);
-
-/**
- * Rollback all the entries starting with given master's LSN.
- */
-void
-txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn);
+/** Execute a synchronous replication request. */
+int
+txn_limbo_process(struct txn_limbo *limbo, const struct synchro_request *req);
/**
* Waiting for confirmation of all "sync" transactions
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 0c797a9d5..4b5d4356f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,13 +893,13 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
-static int
-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn, int type)
+int
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req)
{
size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
- mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
- mp_sizeof_uint(lsn);
+ mp_sizeof_uint(req->replica_id) +
+ mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
char *buf = (char *)region_alloc(region, len);
if (buf == NULL) {
diag_set(OutOfMemory, len, "region_alloc", "buf");
@@ -909,9 +909,9 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
pos = mp_encode_map(pos, 2);
pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
- pos = mp_encode_uint(pos, replica_id);
+ pos = mp_encode_uint(pos, req->replica_id);
pos = mp_encode_uint(pos, IPROTO_LSN);
- pos = mp_encode_uint(pos, lsn);
+ pos = mp_encode_uint(pos, req->lsn);
memset(row, 0, sizeof(*row));
@@ -919,30 +919,13 @@ xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
row->body[0].iov_len = len;
row->bodycnt = 1;
- row->type = type;
+ row->type = req->type;
return 0;
}
int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_CONFIRM);
-}
-
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn)
-{
- return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
- IPROTO_ROLLBACK);
-}
-
-static int
-xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
- int64_t *lsn)
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
{
if (row->bodycnt == 0) {
diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -977,30 +960,19 @@ xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
}
switch (key) {
case IPROTO_REPLICA_ID:
- *replica_id = mp_decode_uint(&d);
+ req->replica_id = mp_decode_uint(&d);
break;
case IPROTO_LSN:
- *lsn = mp_decode_uint(&d);
+ req->lsn = mp_decode_uint(&d);
break;
default:
mp_next(&d);
}
}
+ req->type = row->type;
return 0;
}
-int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
-int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn)
-{
- return xrow_decode_confirm_rollback(row, replica_id, lsn);
-}
-
int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
diff --git a/src/box/xrow.h b/src/box/xrow.h
index e21ede5a3..02dca74e5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -216,54 +216,51 @@ xrow_encode_dml(const struct request *request, struct region *region,
struct iovec *iov);
/**
- * Encode the CONFIRM to row body and set row type to
- * IPROTO_CONFIRM.
- * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
- * @param replica_id master's instance id.
- * @param lsn last confirmed lsn.
- * @retval -1 on error.
- * @retval 0 success.
+ * Synchronous replication request - confirmation or rollback of
+ * pending synchronous transactions.
*/
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+struct synchro_request {
+ /** Operation type - IPROTO_ROLLBACK or IPROTO_CONFIRM. */
+ uint32_t type;
+ /**
+ * ID of the instance owning the pending transactions.
+ * Note, it may be not the same instance, who created this
+ * request. An instance can make an operation on foreign
+ * synchronous transactions in case a new master tries to
+ * finish transactions of an old master.
+ */
+ uint32_t replica_id;
+ /**
+ * Operation LSN.
+ * In case of CONFIRM it means 'confirm all
+ * transactions with lsn <= this value'.
+ * In case of ROLLBACK it means 'rollback all transactions
+ * with lsn >= this value'.
+ */
+ int64_t lsn;
+};
/**
- * Decode the CONFIRM request body.
+ * Encode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn last confirmed lsn.
+ * @param region Region to use to encode the confirmation body.
+ * @param req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
-
-/**
- * Encode the ROLLBACK row body and set row type to
- * IPROTO_ROLLBACK.
- * @param row xrow header.
- * @param region Region to use to encode the rollback body.
- * @param replica_id master's instance id.
- * @param lsn lsn to rollback from, including it.
- * @retval -1 on error.
- * @retval 0 success.
- */
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
- uint32_t replica_id, int64_t lsn);
+xrow_encode_synchro(struct xrow_header *row, struct region *region,
+ const struct synchro_request *req);
/**
- * Decode the ROLLBACK row body.
+ * Decode synchronous replication request.
* @param row xrow header.
- * @param[out] replica_id master's instance id.
- * @param[out] lsn lsn to rollback from, including it.
+ * @param[out] req Request parameters.
* @retval -1 on error.
* @retval 0 success.
*/
int
-xrow_decode_rollback(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
+xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);
/**
* CALL/EVAL request.
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 2/9] journal: bind asynchronous write completion to an entry
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 3/9] journal: add journal_entry_create helper Cyrill Gorcunov
` (7 subsequent siblings)
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
wal journal operations such that asynchronous write completion
is a single instance per journal.
It turned out that such simplification is too tight and doesn't
allow us to pass entries into the journal with custom completions.
Thus lets allow back such ability. We will need it to be able
to write "confirm" records into wal directly without touching
transactions code at all.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/box.cc | 15 ++++++++-------
src/box/journal.c | 2 ++
src/box/journal.h | 20 +++++++++++---------
src/box/txn.c | 2 +-
src/box/vy_log.c | 2 +-
src/box/wal.c | 19 ++++++++-----------
src/box/wal.h | 4 ++--
7 files changed, 33 insertions(+), 31 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 8e811e9c1..faffd5769 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
* Since there're no actual writes, fire a
* journal_async_complete callback right away.
*/
- journal_async_complete(base, entry);
+ journal_async_complete(entry);
return 0;
}
@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
{
static struct recovery_journal journal;
journal_create(&journal.base, recovery_journal_write,
- txn_complete_async, recovery_journal_write);
+ recovery_journal_write);
journal.vclock = v;
journal_set(&journal.base);
}
@@ -2182,8 +2182,10 @@ engine_init()
static int
bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
{
+ (void)base;
+
entry->res = 0;
- journal_async_complete(base, entry);
+ journal_async_complete(entry);
return 0;
}
@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
+ &INSTANCE_UUID, on_wal_garbage_collection,
on_wal_checkpoint_threshold) != 0) {
diag_raise();
}
@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
}
struct journal bootstrap_journal;
- journal_create(&bootstrap_journal, NULL, txn_complete_async,
- bootstrap_journal_write);
+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
journal_set(&bootstrap_journal);
auto bootstrap_journal_guard = make_scoped_guard([] {
journal_set(NULL);
diff --git a/src/box/journal.c b/src/box/journal.c
index f1e89aaa2..48af9157b 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
+ journal_write_async_f write_async_cb,
void *complete_data)
{
struct journal_entry *entry;
@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
return NULL;
}
+ entry->write_async_cb = write_async_cb;
entry->complete_data = complete_data;
entry->approx_len = 0;
entry->n_rows = n_rows;
diff --git a/src/box/journal.h b/src/box/journal.h
index 1a10e66c3..4b019fecf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -42,6 +42,8 @@ extern "C" {
struct xrow_header;
struct journal_entry;
+typedef void (*journal_write_async_f)(struct journal_entry *entry);
+
/**
* An entry for an abstract journal.
* Simply put, a write ahead log request.
@@ -61,6 +63,10 @@ struct journal_entry {
* A journal entry completion callback argument.
*/
void *complete_data;
+ /**
+ * Asynchronous write completion function.
+ */
+ journal_write_async_f write_async_cb;
/**
* Approximate size of this request when encoded.
*/
@@ -84,6 +90,7 @@ struct region;
*/
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
+ journal_write_async_f write_async_cb,
void *complete_data);
/**
@@ -96,22 +103,19 @@ struct journal {
int (*write_async)(struct journal *journal,
struct journal_entry *entry);
- /** Asynchronous write completion */
- void (*write_async_cb)(struct journal_entry *entry);
-
/** Synchronous write */
int (*write)(struct journal *journal,
struct journal_entry *entry);
};
/**
- * Finalize a single entry.
+ * Complete asynchronous write.
*/
static inline void
-journal_async_complete(struct journal *journal, struct journal_entry *entry)
+journal_async_complete(struct journal_entry *entry)
{
- assert(journal->write_async_cb != NULL);
- journal->write_async_cb(entry);
+ assert(entry->write_async_cb != NULL);
+ entry->write_async_cb(entry);
}
/**
@@ -173,12 +177,10 @@ static inline void
journal_create(struct journal *journal,
int (*write_async)(struct journal *journal,
struct journal_entry *entry),
- void (*write_async_cb)(struct journal_entry *entry),
int (*write)(struct journal *journal,
struct journal_entry *entry))
{
journal->write_async = write_async;
- journal->write_async_cb = write_async_cb;
journal->write = write;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 9c21258c5..cc1f496c5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
/* Save space for an additional NOP row just in case. */
req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
- &txn->region, txn);
+ &txn->region, txn_complete_async, txn);
if (req == NULL)
return NULL;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 311985c72..de4c5205c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
size_t used = region_used(&fiber()->gc);
struct journal_entry *entry;
- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index d8c92aa36..045006b60 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
static void
tx_schedule_queue(struct stailq *queue)
{
- struct wal_writer *writer = &wal_writer_singleton;
struct journal_entry *req, *tmp;
stailq_foreach_entry_safe(req, tmp, queue, fifo)
- journal_async_complete(&writer->base, req);
+ journal_async_complete(req);
}
/**
@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
*/
static void
wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
- void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname,
- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+ const char *wal_dirname, int64_t wal_max_size,
+ const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
journal_create(&writer->base,
wal_mode == WAL_NONE ?
wal_write_none_async : wal_write_async,
- wall_async_cb,
wal_mode == WAL_NONE ?
wal_write_none : wal_write);
@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
}
int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
/* Initialize the state. */
struct wal_writer *writer = &wal_writer_singleton;
- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
- wal_max_size, instance_uuid, on_garbage_collection,
+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
+ instance_uuid, on_garbage_collection,
on_checkpoint_threshold);
/* Start WAL thread. */
@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
- journal_async_complete(journal, entry);
+ journal_async_complete(entry);
return 0;
}
diff --git a/src/box/wal.h b/src/box/wal.h
index f348dc636..9d0cada46 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
* Start WAL thread and initialize WAL writer.
*/
int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold);
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 3/9] journal: add journal_entry_create helper
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 1/9] xrow: introduce struct synchro_request Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 2/9] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 4/9] qsync: provide a binary form of syncro entries Cyrill Gorcunov
` (6 subsequent siblings)
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
To create raw journal entries. We will use it
to write confirm/rollback entries.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/journal.c | 8 ++------
src/box/journal.h | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 48af9157b..cb320b557 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -51,11 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
return NULL;
}
- entry->write_async_cb = write_async_cb;
- entry->complete_data = complete_data;
- entry->approx_len = 0;
- entry->n_rows = n_rows;
- entry->res = -1;
-
+ journal_entry_create(entry, n_rows, 0, write_async_cb,
+ complete_data);
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 4b019fecf..5d8d5a726 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -83,6 +83,22 @@ struct journal_entry {
struct region;
+/**
+ * Initialize a new journal entry.
+ */
+static inline void
+journal_entry_create(struct journal_entry *entry, size_t n_rows,
+ size_t approx_len,
+ journal_write_async_f write_async_cb,
+ void *complete_data)
+{
+ entry->write_async_cb = write_async_cb;
+ entry->complete_data = complete_data;
+ entry->approx_len = approx_len;
+ entry->n_rows = n_rows;
+ entry->res = -1;
+}
+
/**
* Create a new journal entry.
*
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 4/9] qsync: provide a binary form of syncro entries
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (2 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 3/9] journal: add journal_entry_create helper Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
` (5 subsequent siblings)
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
These msgpack entries will be needed to write them
down to a journal without involving txn engine. Same
time we would like to be able to allocate them on stack,
for this sake the binary form is predefined.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn_limbo.c | 9 +++++++--
src/box/xrow.c | 41 ++++++++++++++++++-----------------------
src/box/xrow.h | 20 +++++++++++++++-----
3 files changed, 40 insertions(+), 30 deletions(-)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 944161c30..ed8c10419 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -282,6 +282,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
req.replica_id = limbo->instance_id;
req.lsn = lsn;
+ /*
+ * This is a synchronous commit so we can
+ * use body and row allocated on a stack.
+ */
+ struct synchro_body_bin body;
struct xrow_header row;
struct request request = {
.header = &row,
@@ -291,8 +296,8 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
if (txn == NULL)
goto rollback;
- if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
- goto rollback;
+ xrow_encode_synchro(&row, &body, &req);
+
/*
* This is not really a transaction. It just uses txn API
* to put the data into WAL. And obviously it should not
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 4b5d4356f..03a4abdda 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,35 +893,30 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+ struct synchro_body_bin *body,
const struct synchro_request *req)
{
- size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
- mp_sizeof_uint(req->replica_id) +
- mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
- char *buf = (char *)region_alloc(region, len);
- if (buf == NULL) {
- diag_set(OutOfMemory, len, "region_alloc", "buf");
- return -1;
- }
- char *pos = buf;
-
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
- pos = mp_encode_uint(pos, req->replica_id);
- pos = mp_encode_uint(pos, IPROTO_LSN);
- pos = mp_encode_uint(pos, req->lsn);
+ /*
+ * A map with two elements. We don't compress
+ * numbers to have this structure constant in size,
+ * which allows us to preallocate it on stack.
+ */
+ body->m_body = 0x80 | 2;
+ body->k_replica_id = IPROTO_REPLICA_ID;
+ body->m_replica_id = 0xce;
+ body->v_replica_id = mp_bswap_u32(req->replica_id);
+ body->k_lsn = IPROTO_LSN;
+ body->m_lsn = 0xcf;
+ body->v_lsn = mp_bswap_u64(req->lsn);
memset(row, 0, sizeof(*row));
- row->body[0].iov_base = buf;
- row->body[0].iov_len = len;
- row->bodycnt = 1;
-
row->type = req->type;
-
- return 0;
+ row->body[0].iov_base = (void *)body;
+ row->body[0].iov_len = sizeof(*body);
+ row->bodycnt = 1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 02dca74e5..20e82034d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -240,16 +240,26 @@ struct synchro_request {
int64_t lsn;
};
+/** Synchro request xrow's body in MsgPack format. */
+struct PACKED synchro_body_bin {
+ uint8_t m_body;
+ uint8_t k_replica_id;
+ uint8_t m_replica_id;
+ uint32_t v_replica_id;
+ uint8_t k_lsn;
+ uint8_t m_lsn;
+ uint64_t v_lsn;
+};
+
/**
* Encode synchronous replication request.
* @param row xrow header.
- * @param region Region to use to encode the confirmation body.
+ * @param body Desination to use to encode the confirmation body.
* @param req Request parameters.
- * @retval -1 on error.
- * @retval 0 success.
*/
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+ struct synchro_body_bin *body,
const struct synchro_request *req);
/**
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (3 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 4/9] qsync: provide a binary form of syncro entries Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access Cyrill Gorcunov
` (4 subsequent siblings)
9 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
When we need to write CONFIRM or ROLLBACK message (which is
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.
Instead lets encode a record on the stack and write it to the journal
directly.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn_limbo.c | 64 ++++++++++++++++++++++-----------------------
1 file changed, 32 insertions(+), 32 deletions(-)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index ed8c10419..447630d23 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,7 @@
#include "txn_limbo.h"
#include "replication.h"
#include "iproto_constants.h"
+#include "journal.h"
struct txn_limbo txn_limbo;
@@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
return 0;
}
+/**
+ * A callback for synchronous write: txn_limbo_write_synchro fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ fiber_wakeup(entry->complete_data);
+}
+
static void
txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
@@ -284,46 +296,34 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
/*
* This is a synchronous commit so we can
- * use body and row allocated on a stack.
+ * allocate everything on a stack.
*/
struct synchro_body_bin body;
struct xrow_header row;
- struct request request = {
- .header = &row,
- };
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
- struct txn *txn = txn_begin();
- if (txn == NULL)
- goto rollback;
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
xrow_encode_synchro(&row, &body, &req);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- goto rollback;
- if (txn_commit_stmt(txn, &request) != 0)
- goto rollback;
- if (txn_commit(txn) != 0)
- goto rollback;
- return;
+ journal_entry_create(entry, 1, xrow_approx_len(&row),
+ txn_limbo_write_cb, fiber());
-rollback:
- /*
- * XXX: the stub is supposed to be removed once it is defined what to do
- * when a synchro request WAL write fails. One of the possible
- * solutions: log the error, keep the limbo queue as is and probably put
- * in rollback mode. Then provide a hook to call manually when WAL
- * problems are fixed. Or retry automatically with some period.
- */
- panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, iproto_type_name(type));
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ /*
+ * XXX: the stub is supposed to be removed once it is defined what to do
+ * when a synchro request WAL write fails. One of the possible
+ * solutions: log the error, keep the limbo queue as is and probably put
+ * in rollback mode. Then provide a hook to call manually when WAL
+ * problems are fixed. Or retry automatically with some period.
+ */
+ panic("Could not write a synchro request to WAL: lsn = %lld, type = "
+ "%s\n", lsn, type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
+ }
}
/**
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-17 22:16 ` Cyrill Gorcunov
2020-08-17 22:23 ` Cyrill Gorcunov
0 siblings, 2 replies; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-17 20:49 UTC (permalink / raw)
To: Cyrill Gorcunov, tml
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index ed8c10419..447630d23 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> @@ -284,46 +296,34 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> -rollback:
> - /*
> - * XXX: the stub is supposed to be removed once it is defined what to do
> - * when a synchro request WAL write fails. One of the possible
> - * solutions: log the error, keep the limbo queue as is and probably put
> - * in rollback mode. Then provide a hook to call manually when WAL
> - * problems are fixed. Or retry automatically with some period.
> - */
> - panic("Could not write a synchro request to WAL: lsn = %lld, type = "
> - "%s\n", lsn, iproto_type_name(type));
> + if (journal_write(entry) != 0 || entry->res < 0) {
> + diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + /*
> + * XXX: the stub is supposed to be removed once it is defined what to do
> + * when a synchro request WAL write fails. One of the possible
> + * solutions: log the error, keep the limbo queue as is and probably put
> + * in rollback mode. Then provide a hook to call manually when WAL
> + * problems are fixed. Or retry automatically with some period.
Still out of 80 symbols.
> + */
> + panic("Could not write a synchro request to WAL: lsn = %lld, type = "
> + "%s\n", lsn, type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
> + }
> }
>
> /**
>
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-17 20:49 ` Vladislav Shpilevoy
@ 2020-08-17 22:16 ` Cyrill Gorcunov
2020-08-17 22:23 ` Cyrill Gorcunov
1 sibling, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 22:16 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
On Mon, Aug 17, 2020 at 10:49:19PM +0200, Vladislav Shpilevoy wrote:
> > + if (journal_write(entry) != 0 || entry->res < 0) {
> > + diag_set(ClientError, ER_WAL_IO);
> > + diag_log();
> > + /*
> > + * XXX: the stub is supposed to be removed once it is defined what to do
> > + * when a synchro request WAL write fails. One of the possible
> > + * solutions: log the error, keep the limbo queue as is and probably put
> > + * in rollback mode. Then provide a hook to call manually when WAL
> > + * problems are fixed. Or retry automatically with some period.
>
> Still out of 80 symbols.
Shame on me :( You already pointed and I remember this problem, but then
been rebasing and it flew out of my head. I could force push update if
this is the only problem.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-17 22:16 ` Cyrill Gorcunov
@ 2020-08-17 22:23 ` Cyrill Gorcunov
1 sibling, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 22:23 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
When we need to write CONFIRM or ROLLBACK message (which is
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.
Instead lets encode a record on the stack and write it to the journal
directly.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
Force-pushed into the same branch
src/box/txn_limbo.c | 66 +++++++++++++++++++++++----------------------
1 file changed, 34 insertions(+), 32 deletions(-)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index ed8c10419..53fcdf137 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,7 @@
#include "txn_limbo.h"
#include "replication.h"
#include "iproto_constants.h"
+#include "journal.h"
struct txn_limbo txn_limbo;
@@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
return 0;
}
+/**
+ * A callback for synchronous write: txn_limbo_write_synchro fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ fiber_wakeup(entry->complete_data);
+}
+
static void
txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
@@ -284,46 +296,36 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
/*
* This is a synchronous commit so we can
- * use body and row allocated on a stack.
+ * allocate everything on a stack.
*/
struct synchro_body_bin body;
struct xrow_header row;
- struct request request = {
- .header = &row,
- };
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
- struct txn *txn = txn_begin();
- if (txn == NULL)
- goto rollback;
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
xrow_encode_synchro(&row, &body, &req);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- goto rollback;
- if (txn_commit_stmt(txn, &request) != 0)
- goto rollback;
- if (txn_commit(txn) != 0)
- goto rollback;
- return;
+ journal_entry_create(entry, 1, xrow_approx_len(&row),
+ txn_limbo_write_cb, fiber());
-rollback:
- /*
- * XXX: the stub is supposed to be removed once it is defined what to do
- * when a synchro request WAL write fails. One of the possible
- * solutions: log the error, keep the limbo queue as is and probably put
- * in rollback mode. Then provide a hook to call manually when WAL
- * problems are fixed. Or retry automatically with some period.
- */
- panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, iproto_type_name(type));
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ /*
+ * XXX: the stub is supposed to be removed once it is defined
+ * what to do when a synchro request WAL write fails. One of
+ * the possible solutions: log the error, keep the limbo
+ * queue as is and probably put in rollback mode. Then
+ * provide a hook to call manually when WAL problems are fixed.
+ * Or retry automatically with some period.
+ */
+ panic("Could not write a synchro request to WAL: "
+ "lsn = %lld, type = %s\n", lsn,
+ type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
+ }
}
/**
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (4 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 5/9] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine Cyrill Gorcunov
` (3 subsequent siblings)
9 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
We need to access first and last xrow in a queue
frenquently and opencoded variants are too ugly.
Lets provide shorthands.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/applier.cc | 36 +++++++++++++++++++++++++-----------
1 file changed, 25 insertions(+), 11 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 98fb87375..860a18681 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -648,6 +648,26 @@ struct applier_tx_row {
struct xrow_header row;
};
+/**
+ * Get first xrow from a list.
+ */
+static inline struct xrow_header *
+applier_first_row(struct stailq *rows)
+{
+ return &stailq_first_entry(rows,
+ struct applier_tx_row, next)->row;
+}
+
+/**
+ * Get last xrow from a list.
+ */
+static inline struct xrow_header *
+applier_last_row(struct stailq *rows)
+{
+ return &stailq_last_entry(rows,
+ struct applier_tx_row, next)->row;
+}
+
static struct applier_tx_row *
applier_read_tx_row(struct applier *applier)
{
@@ -749,8 +769,7 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
}
stailq_add_tail(rows, &tx_row->next);
- } while (!stailq_last_entry(rows, struct applier_tx_row,
- next)->row.is_commit);
+ } while (!applier_last_row(rows)->is_commit);
}
static int
@@ -807,10 +826,8 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
static int
applier_apply_tx(struct stailq *rows)
{
- struct xrow_header *first_row = &stailq_first_entry(rows,
- struct applier_tx_row, next)->row;
- struct xrow_header *last_row;
- last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
+ struct xrow_header *first_row = applier_first_row(rows);
+ struct xrow_header *last_row = applier_last_row(rows);
struct replica *replica = replica_by_id(first_row->replica_id);
/*
* In a full mesh topology, the same set of changes
@@ -834,9 +851,7 @@ applier_apply_tx(struct stailq *rows)
*/
struct xrow_header *tmp;
while (true) {
- tmp = &stailq_first_entry(rows,
- struct applier_tx_row,
- next)->row;
+ tmp = applier_first_row(rows);
if (tmp->lsn <= vclock_get(&replicaset.applier.vclock,
tmp->replica_id)) {
stailq_shift(rows);
@@ -1122,8 +1137,7 @@ applier_subscribe(struct applier *applier)
* In case of an heartbeat message wake a writer up
* and check applier state.
*/
- if (stailq_first_entry(&rows, struct applier_tx_row,
- next)->row.lsn == 0)
+ if (applier_first_row(&rows)->lsn == 0)
applier_signal_ack(applier);
else if (applier_apply_tx(&rows) != 0)
diag_raise();
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access Cyrill Gorcunov
@ 2020-08-17 20:49 ` Vladislav Shpilevoy
2020-08-17 22:14 ` Cyrill Gorcunov
0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-17 20:49 UTC (permalink / raw)
To: Cyrill Gorcunov, tml
It seems this commit is not needed - I dropped it and nothing changed.
Even no merge/rebase conflicts.
On 17.08.2020 15:39, Cyrill Gorcunov wrote:
> We need to access first and last xrow in a queue
> frenquently and opencoded variants are too ugly.
frenquently -> frequently.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-17 20:49 ` Vladislav Shpilevoy
@ 2020-08-17 22:14 ` Cyrill Gorcunov
2020-08-18 19:18 ` Vladislav Shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 22:14 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
On Mon, Aug 17, 2020 at 10:49:23PM +0200, Vladislav Shpilevoy wrote:
> It seems this commit is not needed - I dropped it and nothing changed.
> Even no merge/rebase conflicts.
>
> On 17.08.2020 15:39, Cyrill Gorcunov wrote:
> > We need to access first and last xrow in a queue
> > frenquently and opencoded variants are too ugly.
>
> frenquently -> frequently.
The code is ugly as hell without it :/ We use them not once
in the code that's why I made helpers. But I won't insist,
drop it if you prefer.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-17 22:14 ` Cyrill Gorcunov
@ 2020-08-18 19:18 ` Vladislav Shpilevoy
2020-08-19 20:37 ` Vladislav Shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-18 19:18 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml
On 18.08.2020 00:14, Cyrill Gorcunov wrote:
> On Mon, Aug 17, 2020 at 10:49:23PM +0200, Vladislav Shpilevoy wrote:
>> It seems this commit is not needed - I dropped it and nothing changed.
>> Even no merge/rebase conflicts.
>>
>> On 17.08.2020 15:39, Cyrill Gorcunov wrote:
>>> We need to access first and last xrow in a queue
>>> frenquently and opencoded variants are too ugly.
>>
>> frenquently -> frequently.
>
> The code is ugly as hell without it :/
There are many ugly things, but it does not mean we need to rush
changing them. It was discussed already many times. Please, drop
this commit. It is not necessary. It does not interfere with the
other patches in a single line.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-18 19:18 ` Vladislav Shpilevoy
@ 2020-08-19 20:37 ` Vladislav Shpilevoy
2020-08-19 20:49 ` Cyrill Gorcunov
0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-19 20:37 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml
Hi! Today you said you did all the review fixes. I see that
the branch didn't change, and this comment is still not
addressed.
On 18.08.2020 21:18, Vladislav Shpilevoy wrote:
> On 18.08.2020 00:14, Cyrill Gorcunov wrote:
>> On Mon, Aug 17, 2020 at 10:49:23PM +0200, Vladislav Shpilevoy wrote:
>>> It seems this commit is not needed - I dropped it and nothing changed.
>>> Even no merge/rebase conflicts.
>>>
>>> On 17.08.2020 15:39, Cyrill Gorcunov wrote:
>>>> We need to access first and last xrow in a queue
>>>> frenquently and opencoded variants are too ugly.
>>>
>>> frenquently -> frequently.
>>
>> The code is ugly as hell without it :/
>
> There are many ugly things, but it does not mean we need to rush
> changing them. It was discussed already many times. Please, drop
> this commit. It is not necessary. It does not interfere with the
> other patches in a single line.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access
2020-08-19 20:37 ` Vladislav Shpilevoy
@ 2020-08-19 20:49 ` Cyrill Gorcunov
0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 20:49 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
On Wed, Aug 19, 2020 at 10:37:30PM +0200, Vladislav Shpilevoy wrote:
> Hi! Today you said you did all the review fixes. I see that
> the branch didn't change, and this comment is still not
> addressed.
> >>
> >> The code is ugly as hell without it :/
> >
> > There are many ugly things, but it does not mean we need to rush
> > changing them. It was discussed already many times. Please, drop
> > this commit. It is not necessary. It does not interfere with the
> > other patches in a single line.
Hmm, seems this your reply somehow drown in my inbox :( I'm pretty
damn sure that when we're changin the code we should improve it
if we can.
I'll update the branch and remove this commit.
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (5 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 6/9] applier: add shorthands to queue access Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 8/9] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
` (2 subsequent siblings)
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
Transaction processing code is very heavy simply because
transactions are carrying various data and involves a number
of other mechanisms to proceed.
In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.
Thus lets rather handle them in a special light way:
- allocate synchro_entry structure which would carry
the journal entry itself and encoded message
- process limbo queue to mark confirmed/rollback'ed
messages
- finally write this synchro_entry into a journal
Which is a way simplier.
Part-of #5129
Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/applier.cc | 169 +++++++++++++++++++++++++++++++++++++++------
1 file changed, 149 insertions(+), 20 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 860a18681..83f6da461 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "xrow.h"
#include "scoped_guard.h"
#include "txn_limbo.h"
+#include "journal.h"
STRS(applier_state, applier_STATE);
@@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
} while (!applier_last_row(rows)->is_commit);
}
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
{
- (void) trigger;
- struct txn *txn = (struct txn *) event;
- /*
- * Synchronous transaction rollback due to receiving a
- * ROLLBACK entry is a normal event and requires no
- * special handling.
- */
- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
- return 0;
-
/*
* Setup shared applier diagnostic area.
*
@@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
* diag use per-applier diag instead all the time
* (which actually already present in the structure).
*
- * But remember that transactions are asynchronous
- * and rollback may happen a way latter after it
- * passed to the journal engine.
+ * But remember that WAL writes are asynchronous and
+ * rollback may happen a way later after it was passed to
+ * the journal engine.
*/
diag_set(ClientError, ER_WAL_IO);
diag_set_error(&replicaset.applier.diag,
diag_last_error(diag_get()));
- /* Broadcast the rollback event across all appliers. */
- trigger_run(&replicaset.applier.on_rollback, event);
-
+ /* Broadcast the rollback across all appliers. */
+ trigger_run(&replicaset.applier.on_rollback, NULL);
/* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ struct txn *txn = (struct txn *) event;
+ /*
+ * Synchronous transaction rollback due to receiving a
+ * ROLLBACK entry is a normal event and requires no
+ * special handling.
+ */
+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+ applier_rollback_by_wal_io();
return 0;
}
@@ -818,6 +823,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
return 0;
}
+struct synchro_entry {
+ /** Encoded form of a synchro record. */
+ struct synchro_body_bin body_bin;
+
+ /** xrow to write, used by the journal engine. */
+ struct xrow_header row;
+
+ /**
+ * The journal entry itself. Note since
+ * it has unsized array it must be the
+ * last entry in the structure.
+ */
+ struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+ free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ struct synchro_entry *synchro_entry =
+ (struct synchro_entry *)entry->complete_data;
+ if (entry->res < 0)
+ applier_rollback_by_wal_io();
+ else
+ trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+ synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+ struct synchro_request *req)
+{
+ struct synchro_entry *entry;
+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+ /*
+ * For simplicity we use malloc here but
+ * probably should provide some cache similar
+ * to txn cache.
+ */
+ entry = (struct synchro_entry *)malloc(size);
+ if (entry == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+ return NULL;
+ }
+
+ struct journal_entry *journal_entry = &entry->journal_entry;
+ struct synchro_body_bin *body_bin = &entry->body_bin;
+ struct xrow_header *row = &entry->row;
+
+ journal_entry->rows[0] = row;
+
+ xrow_encode_synchro(row, body_bin, req);
+
+ row->lsn = applier_row->lsn;
+ row->replica_id = applier_row->replica_id;
+
+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+ apply_synchro_row_cb, entry);
+ return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0)
+ goto err;
+
+ if (txn_limbo_process(&txn_limbo, &req))
+ goto err;
+
+ struct synchro_entry *entry;
+ entry = synchro_entry_new(row, &req);
+ if (entry == NULL)
+ goto err;
+
+ if (journal_write_async(&entry->journal_entry) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ goto err;
+ }
+ return 0;
+err:
+ diag_log();
+ return -1;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -861,13 +970,26 @@ applier_apply_tx(struct stailq *rows)
}
}
+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+ /*
+ * Synchro messages are not transactions, in terms
+ * of DML. Always sent and written isolated from
+ * each other.
+ */
+ assert(first_row == last_row);
+ if (apply_synchro_row(first_row) != 0)
+ diag_raise();
+ goto success;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin();
+ struct txn *txn;
+ txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL) {
latch_unlock(latch);
@@ -936,6 +1058,7 @@ applier_apply_tx(struct stailq *rows)
if (txn_commit_async(txn) < 0)
goto fail;
+success:
/*
* The transaction was sent to journal so promote vclock.
*
@@ -1103,7 +1226,13 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
- /* Register triggers to handle WAL writes and rollbacks. */
+ /*
+ * Register triggers to handle WAL writes and rollbacks.
+ *
+ * Note we use them for syncronous packets handling as well
+ * thus when changing make sure that synchro handling won't
+ * be broken.
+ */
struct trigger on_wal_write;
trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 8/9] txn: txn_add_redo -- drop synchro processing
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (6 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 7/9] applier: process synchro requests without txn engine Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 9/9] xrow: drop xrow_header_dup_body Cyrill Gorcunov
2020-08-17 21:24 ` [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
Since we no longer use txn engine for synchro
packets processing this code is never executed.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn.c | 9 +--------
1 file changed, 1 insertion(+), 8 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index cc1f496c5..b2d342355 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -82,14 +82,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
*/
struct space *space = stmt->space;
row->group_id = space != NULL ? space_group_id(space) : 0;
- /*
- * Sychronous replication entries are supplementary and
- * aren't valid dml requests. They're encoded manually.
- */
- if (likely(!iproto_type_is_synchro_request(row->type)))
- row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
- else
- row->bodycnt = xrow_header_dup_body(row, &txn->region);
+ row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
if (row->bodycnt < 0)
return -1;
stmt->row = row;
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v8 9/9] xrow: drop xrow_header_dup_body
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (7 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 8/9] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
@ 2020-08-17 13:39 ` Cyrill Gorcunov
2020-08-17 21:24 ` [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
9 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 13:39 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
We no longer use it.
Closes #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/xrow.c | 15 ---------------
src/box/xrow.h | 8 --------
2 files changed, 23 deletions(-)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 03a4abdda..9aa2fae00 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -220,21 +220,6 @@ xrow_header_decode(struct xrow_header *header, const char **pos,
return 0;
}
-int
-xrow_header_dup_body(struct xrow_header *row, struct region *region)
-{
- assert(row->bodycnt == 1);
- size_t size = row->body[0].iov_len;
- char *copy = (char *)region_alloc(region, size);
- if (copy == NULL) {
- diag_set(OutOfMemory, size, "region_alloc", "copy");
- return -1;
- }
- memcpy(copy, row->body[0].iov_base, size);
- row->body[0].iov_base = copy;
- return 1;
-}
-
/**
* @pre pos points at a valid msgpack
*/
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 20e82034d..58d47b12d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -141,14 +141,6 @@ int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end, bool end_is_exact);
-/**
- * Duplicate the xrow's body onto the given region.
- * @retval -1 Error.
- * @retval >= 0 Iov count in the body.
- */
-int
-xrow_header_dup_body(struct xrow_header *header, struct region *region);
-
/**
* DML request.
*/
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine
2020-08-17 13:39 [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (8 preceding siblings ...)
2020-08-17 13:39 ` [Tarantool-patches] [PATCH v8 9/9] xrow: drop xrow_header_dup_body Cyrill Gorcunov
@ 2020-08-17 21:24 ` Vladislav Shpilevoy
2020-08-17 21:54 ` Cyrill Gorcunov
9 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-17 21:24 UTC (permalink / raw)
To: Cyrill Gorcunov, tml
process_synchro_row() is still present on the branch.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine
2020-08-17 21:24 ` [Tarantool-patches] [PATCH v8 0/9] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
@ 2020-08-17 21:54 ` Cyrill Gorcunov
0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 21:54 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
On Mon, Aug 17, 2020 at 11:24:40PM +0200, Vladislav Shpilevoy wrote:
> process_synchro_row() is still present on the branch.
Sigh. Overlooked, thanks! I've force-pushed the branch, just in case
here is a commit on top. We can simply drop this helper it harmless.
---
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 83f6da461..5d7c35ca6 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -270,45 +270,11 @@ process_nop(struct request *request)
return txn_commit_stmt(txn, request);
}
-/*
- * CONFIRM/ROLLBACK rows aren't dml requests and require special
- * handling: instead of performing some operations on spaces,
- * processing these requests requires txn_limbo to either confirm
- * or rollback some of its entries.
- */
-static int
-process_synchro_row(struct request *request)
-{
- assert(iproto_type_is_synchro_request(request->header->type));
- struct txn *txn = in_txn();
-
- struct synchro_request syn_req;
- if (xrow_decode_synchro(request->header, &syn_req) != 0)
- return -1;
- assert(txn->n_applier_rows == 0);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- return -1;
- if (txn_commit_stmt(txn, request) != 0)
- return -1;
- return txn_limbo_process(&txn_limbo, &syn_req);
-}
-
static int
apply_row(struct xrow_header *row)
{
struct request request;
- if (iproto_type_is_synchro_request(row->type)) {
- request.header = row;
- return process_synchro_row(&request);
- }
+ assert(!iproto_type_is_synchro_request(row->type));
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
if (request.type == IPROTO_NOP)
^ permalink raw reply [flat|nested] 20+ messages in thread