* [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine
@ 2020-08-19 21:34 Cyrill Gorcunov
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
` (7 more replies)
0 siblings, 8 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
In this series we write CONFIRM/ROLLBACK messages into the WAL directly
without involving the txn engine.
Vlad, take a look please, once time permit. Since the series reaches
v9 I desided to resend it instead of continue replying old thread,
just for a final review.
First 4 patches you've read already and hopefully I addressed
all your comments.
issue https://github.com/tarantool/tarantool/issues/5129
branch gorcunov/gh-5129-journal-9
v3:
- bootstrap journal left NULL for async write
- journal_write_async_cb_t type for async callback
- struct synchro_body_bin type for encoded message
- xrow_encode_synchro helper to operate with synchro_body_bin
v7:
- rebase on master
- rework applier code
v8:
- move synchro requests processing into
applier_apply_tx (by Vlad)
- drop synchro processing from txn_add_redo
v9:
- rebase on master branch
- drop "applier: add shorthands to queue access"
from the series
Cyrill Gorcunov (7):
journal: bind asynchronous write completion to an entry
journal: add journal_entry_create helper
qsync: provide a binary form of syncro entries
qsync: direct write of CONFIRM/ROLLBACK into a journal
applier: process synchro requests without txn engine
txn: txn_add_redo -- drop synchro processing
xrow: drop xrow_header_dup_body
src/box/applier.cc | 200 ++++++++++++++++++++++++++++++++------------
src/box/box.cc | 15 ++--
src/box/journal.c | 8 +-
src/box/journal.h | 36 ++++++--
src/box/txn.c | 11 +--
src/box/txn_limbo.c | 71 +++++++++-------
src/box/vy_log.c | 2 +-
src/box/wal.c | 19 ++---
src/box/wal.h | 4 +-
src/box/xrow.c | 56 ++++---------
src/box/xrow.h | 28 ++++---
11 files changed, 271 insertions(+), 179 deletions(-)
base-commit: ee07eab4da1d00da6ed848f1833cacd32b71c6eb
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 7:48 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper Cyrill Gorcunov
` (6 subsequent siblings)
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
wal journal operations such that asynchronous write completion
is a single instance per journal.
It turned out that such simplification is too tight and doesn't
allow us to pass entries into the journal with custom completions.
Thus lets allow back such ability. We will need it to be able
to write "confirm" records into wal directly without touching
transactions code at all.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/box.cc | 15 ++++++++-------
src/box/journal.c | 2 ++
src/box/journal.h | 20 +++++++++++---------
src/box/txn.c | 2 +-
src/box/vy_log.c | 2 +-
src/box/wal.c | 19 ++++++++-----------
src/box/wal.h | 4 ++--
7 files changed, 33 insertions(+), 31 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 8e811e9c1..faffd5769 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
* Since there're no actual writes, fire a
* journal_async_complete callback right away.
*/
- journal_async_complete(base, entry);
+ journal_async_complete(entry);
return 0;
}
@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
{
static struct recovery_journal journal;
journal_create(&journal.base, recovery_journal_write,
- txn_complete_async, recovery_journal_write);
+ recovery_journal_write);
journal.vclock = v;
journal_set(&journal.base);
}
@@ -2182,8 +2182,10 @@ engine_init()
static int
bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
{
+ (void)base;
+
entry->res = 0;
- journal_async_complete(base, entry);
+ journal_async_complete(entry);
return 0;
}
@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
+ &INSTANCE_UUID, on_wal_garbage_collection,
on_wal_checkpoint_threshold) != 0) {
diag_raise();
}
@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
}
struct journal bootstrap_journal;
- journal_create(&bootstrap_journal, NULL, txn_complete_async,
- bootstrap_journal_write);
+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
journal_set(&bootstrap_journal);
auto bootstrap_journal_guard = make_scoped_guard([] {
journal_set(NULL);
diff --git a/src/box/journal.c b/src/box/journal.c
index f1e89aaa2..48af9157b 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
+ journal_write_async_f write_async_cb,
void *complete_data)
{
struct journal_entry *entry;
@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
return NULL;
}
+ entry->write_async_cb = write_async_cb;
entry->complete_data = complete_data;
entry->approx_len = 0;
entry->n_rows = n_rows;
diff --git a/src/box/journal.h b/src/box/journal.h
index 1a10e66c3..4b019fecf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -42,6 +42,8 @@ extern "C" {
struct xrow_header;
struct journal_entry;
+typedef void (*journal_write_async_f)(struct journal_entry *entry);
+
/**
* An entry for an abstract journal.
* Simply put, a write ahead log request.
@@ -61,6 +63,10 @@ struct journal_entry {
* A journal entry completion callback argument.
*/
void *complete_data;
+ /**
+ * Asynchronous write completion function.
+ */
+ journal_write_async_f write_async_cb;
/**
* Approximate size of this request when encoded.
*/
@@ -84,6 +90,7 @@ struct region;
*/
struct journal_entry *
journal_entry_new(size_t n_rows, struct region *region,
+ journal_write_async_f write_async_cb,
void *complete_data);
/**
@@ -96,22 +103,19 @@ struct journal {
int (*write_async)(struct journal *journal,
struct journal_entry *entry);
- /** Asynchronous write completion */
- void (*write_async_cb)(struct journal_entry *entry);
-
/** Synchronous write */
int (*write)(struct journal *journal,
struct journal_entry *entry);
};
/**
- * Finalize a single entry.
+ * Complete asynchronous write.
*/
static inline void
-journal_async_complete(struct journal *journal, struct journal_entry *entry)
+journal_async_complete(struct journal_entry *entry)
{
- assert(journal->write_async_cb != NULL);
- journal->write_async_cb(entry);
+ assert(entry->write_async_cb != NULL);
+ entry->write_async_cb(entry);
}
/**
@@ -173,12 +177,10 @@ static inline void
journal_create(struct journal *journal,
int (*write_async)(struct journal *journal,
struct journal_entry *entry),
- void (*write_async_cb)(struct journal_entry *entry),
int (*write)(struct journal *journal,
struct journal_entry *entry))
{
journal->write_async = write_async;
- journal->write_async_cb = write_async_cb;
journal->write = write;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index 9c21258c5..cc1f496c5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
/* Save space for an additional NOP row just in case. */
req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
- &txn->region, txn);
+ &txn->region, txn_complete_async, txn);
if (req == NULL)
return NULL;
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 311985c72..de4c5205c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
size_t used = region_used(&fiber()->gc);
struct journal_entry *entry;
- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
if (entry == NULL)
goto err;
diff --git a/src/box/wal.c b/src/box/wal.c
index d8c92aa36..045006b60 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
static void
tx_schedule_queue(struct stailq *queue)
{
- struct wal_writer *writer = &wal_writer_singleton;
struct journal_entry *req, *tmp;
stailq_foreach_entry_safe(req, tmp, queue, fifo)
- journal_async_complete(&writer->base, req);
+ journal_async_complete(req);
}
/**
@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
*/
static void
wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
- void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname,
- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+ const char *wal_dirname, int64_t wal_max_size,
+ const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
journal_create(&writer->base,
wal_mode == WAL_NONE ?
wal_write_none_async : wal_write_async,
- wall_async_cb,
wal_mode == WAL_NONE ?
wal_write_none : wal_write);
@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
}
int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
/* Initialize the state. */
struct wal_writer *writer = &wal_writer_singleton;
- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
- wal_max_size, instance_uuid, on_garbage_collection,
+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
+ instance_uuid, on_garbage_collection,
on_checkpoint_threshold);
/* Start WAL thread. */
@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
vclock_merge(&writer->vclock, &vclock_diff);
vclock_copy(&replicaset.vclock, &writer->vclock);
entry->res = vclock_sum(&writer->vclock);
- journal_async_complete(journal, entry);
+ journal_async_complete(entry);
return 0;
}
diff --git a/src/box/wal.h b/src/box/wal.h
index f348dc636..9d0cada46 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
* Start WAL thread and initialize WAL writer.
*/
int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold);
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 7:51 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries Cyrill Gorcunov
` (5 subsequent siblings)
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
To create raw journal entries. We will use it
to write confirm/rollback entries.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/journal.c | 8 ++------
src/box/journal.h | 16 ++++++++++++++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/src/box/journal.c b/src/box/journal.c
index 48af9157b..cb320b557 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -51,11 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
return NULL;
}
- entry->write_async_cb = write_async_cb;
- entry->complete_data = complete_data;
- entry->approx_len = 0;
- entry->n_rows = n_rows;
- entry->res = -1;
-
+ journal_entry_create(entry, n_rows, 0, write_async_cb,
+ complete_data);
return entry;
}
diff --git a/src/box/journal.h b/src/box/journal.h
index 4b019fecf..5d8d5a726 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -83,6 +83,22 @@ struct journal_entry {
struct region;
+/**
+ * Initialize a new journal entry.
+ */
+static inline void
+journal_entry_create(struct journal_entry *entry, size_t n_rows,
+ size_t approx_len,
+ journal_write_async_f write_async_cb,
+ void *complete_data)
+{
+ entry->write_async_cb = write_async_cb;
+ entry->complete_data = complete_data;
+ entry->approx_len = approx_len;
+ entry->n_rows = n_rows;
+ entry->res = -1;
+}
+
/**
* Create a new journal entry.
*
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 8:15 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
` (4 subsequent siblings)
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
These msgpack entries will be needed to write them
down to a journal without involving txn engine. Same
time we would like to be able to allocate them on stack,
for this sake the binary form is predefined.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn_limbo.c | 9 +++++++--
src/box/xrow.c | 41 ++++++++++++++++++-----------------------
src/box/xrow.h | 20 +++++++++++++++-----
3 files changed, 40 insertions(+), 30 deletions(-)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index c6a4e5efc..e458dad75 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -283,6 +283,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
.lsn = lsn,
};
+ /*
+ * This is a synchronous commit so we can
+ * use body and row allocated on a stack.
+ */
+ struct synchro_body_bin body;
struct xrow_header row;
struct request request = {
.header = &row,
@@ -292,8 +297,8 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
if (txn == NULL)
goto rollback;
- if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
- goto rollback;
+ xrow_encode_synchro(&row, &body, &req);
+
/*
* This is not really a transaction. It just uses txn API
* to put the data into WAL. And obviously it should not
diff --git a/src/box/xrow.c b/src/box/xrow.c
index bf174c701..9c6fb4fc1 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,35 +893,30 @@ xrow_encode_dml(const struct request *request, struct region *region,
return iovcnt;
}
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+ struct synchro_body_bin *body,
const struct synchro_request *req)
{
- size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
- mp_sizeof_uint(req->replica_id) +
- mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
- char *buf = (char *)region_alloc(region, len);
- if (buf == NULL) {
- diag_set(OutOfMemory, len, "region_alloc", "buf");
- return -1;
- }
- char *pos = buf;
-
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
- pos = mp_encode_uint(pos, req->replica_id);
- pos = mp_encode_uint(pos, IPROTO_LSN);
- pos = mp_encode_uint(pos, req->lsn);
+ /*
+ * A map with two elements. We don't compress
+ * numbers to have this structure constant in size,
+ * which allows us to preallocate it on stack.
+ */
+ body->m_body = 0x80 | 2;
+ body->k_replica_id = IPROTO_REPLICA_ID;
+ body->m_replica_id = 0xce;
+ body->v_replica_id = mp_bswap_u32(req->replica_id);
+ body->k_lsn = IPROTO_LSN;
+ body->m_lsn = 0xcf;
+ body->v_lsn = mp_bswap_u64(req->lsn);
memset(row, 0, sizeof(*row));
- row->body[0].iov_base = buf;
- row->body[0].iov_len = len;
- row->bodycnt = 1;
-
row->type = req->type;
-
- return 0;
+ row->body[0].iov_base = (void *)body;
+ row->body[0].iov_len = sizeof(*body);
+ row->bodycnt = 1;
}
int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 02dca74e5..20e82034d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -240,16 +240,26 @@ struct synchro_request {
int64_t lsn;
};
+/** Synchro request xrow's body in MsgPack format. */
+struct PACKED synchro_body_bin {
+ uint8_t m_body;
+ uint8_t k_replica_id;
+ uint8_t m_replica_id;
+ uint32_t v_replica_id;
+ uint8_t k_lsn;
+ uint8_t m_lsn;
+ uint64_t v_lsn;
+};
+
/**
* Encode synchronous replication request.
* @param row xrow header.
- * @param region Region to use to encode the confirmation body.
+ * @param body Desination to use to encode the confirmation body.
* @param req Request parameters.
- * @retval -1 on error.
- * @retval 0 success.
*/
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+ struct synchro_body_bin *body,
const struct synchro_request *req);
/**
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (2 preceding siblings ...)
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-19 22:43 ` Vladislav Shpilevoy
2020-08-21 8:36 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Cyrill Gorcunov
` (3 subsequent siblings)
7 siblings, 2 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
When we need to write CONFIRM or ROLLBACK message (which is
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.
Instead lets encode a record on the stack and write it to the journal
directly.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn_limbo.c | 66 +++++++++++++++++++++++----------------------
1 file changed, 34 insertions(+), 32 deletions(-)
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index e458dad75..4b90d7fa5 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,7 @@
#include "txn_limbo.h"
#include "replication.h"
#include "iproto_constants.h"
+#include "journal.h"
struct txn_limbo txn_limbo;
@@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
return 0;
}
+/**
+ * A callback for synchronous write: txn_limbo_write_synchro fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ fiber_wakeup(entry->complete_data);
+}
+
static void
txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
{
@@ -285,46 +297,36 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
/*
* This is a synchronous commit so we can
- * use body and row allocated on a stack.
+ * allocate everything on a stack.
*/
struct synchro_body_bin body;
struct xrow_header row;
- struct request request = {
- .header = &row,
- };
+ char buf[sizeof(struct journal_entry) +
+ sizeof(struct xrow_header *)];
- struct txn *txn = txn_begin();
- if (txn == NULL)
- goto rollback;
+ struct journal_entry *entry = (struct journal_entry *)buf;
+ entry->rows[0] = &row;
xrow_encode_synchro(&row, &body, &req);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- goto rollback;
- if (txn_commit_stmt(txn, &request) != 0)
- goto rollback;
- if (txn_commit(txn) != 0)
- goto rollback;
- return;
+ journal_entry_create(entry, 1, xrow_approx_len(&row),
+ txn_limbo_write_cb, fiber());
-rollback:
- /*
- * XXX: the stub is supposed to be removed once it is defined what to do
- * when a synchro request WAL write fails. One of the possible
- * solutions: log the error, keep the limbo queue as is and probably put
- * in rollback mode. Then provide a hook to call manually when WAL
- * problems are fixed. Or retry automatically with some period.
- */
- panic("Could not write a synchro request to WAL: lsn = %lld, type = "
- "%s\n", lsn, iproto_type_name(type));
+ if (journal_write(entry) != 0 || entry->res < 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ diag_log();
+ /*
+ * XXX: the stub is supposed to be removed once it is defined
+ * what to do when a synchro request WAL write fails. One of
+ * the possible solutions: log the error, keep the limbo
+ * queue as is and probably put in rollback mode. Then
+ * provide a hook to call manually when WAL problems are fixed.
+ * Or retry automatically with some period.
+ */
+ panic("Could not write a synchro request to WAL: "
+ "lsn = %lld, type = %s\n", lsn,
+ type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
+ }
}
/**
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (3 preceding siblings ...)
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 8:51 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
` (2 subsequent siblings)
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
Transaction processing code is very heavy simply because
transactions are carrying various data and involves a number
of other mechanisms to proceed.
In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.
Thus lets rather handle them in a special light way:
- allocate synchro_entry structure which would carry
the journal entry itself and encoded message
- process limbo queue to mark confirmed/rollback'ed
messages
- finally write this synchro_entry into a journal
Which is a way simplier.
Part-of #5129
Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Co-developed-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------
1 file changed, 148 insertions(+), 52 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1387d518c..c1d07ca54 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
#include "schema.h"
#include "txn.h"
#include "box.h"
+#include "xrow.h"
#include "scoped_guard.h"
#include "txn_limbo.h"
+#include "journal.h"
STRS(applier_state, applier_STATE);
@@ -268,45 +270,11 @@ process_nop(struct request *request)
return txn_commit_stmt(txn, request);
}
-/*
- * CONFIRM/ROLLBACK rows aren't dml requests and require special
- * handling: instead of performing some operations on spaces,
- * processing these requests requires txn_limbo to either confirm
- * or rollback some of its entries.
- */
-static int
-process_synchro_row(struct request *request)
-{
- assert(iproto_type_is_synchro_request(request->header->type));
- struct txn *txn = in_txn();
-
- struct synchro_request syn_req;
- if (xrow_decode_synchro(request->header, &syn_req) != 0)
- return -1;
- assert(txn->n_applier_rows == 0);
- /*
- * This is not really a transaction. It just uses txn API
- * to put the data into WAL. And obviously it should not
- * go to the limbo and block on the very same sync
- * transaction which it tries to confirm now.
- */
- txn_set_flag(txn, TXN_FORCE_ASYNC);
-
- if (txn_begin_stmt(txn, NULL) != 0)
- return -1;
- if (txn_commit_stmt(txn, request) != 0)
- return -1;
- return txn_limbo_process(&txn_limbo, &syn_req);
-}
-
static int
apply_row(struct xrow_header *row)
{
struct request request;
- if (iproto_type_is_synchro_request(row->type)) {
- request.header = row;
- return process_synchro_row(&request);
- }
+ assert(!iproto_type_is_synchro_request(row->type));
if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
return -1;
if (request.type == IPROTO_NOP)
@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
next)->row.is_commit);
}
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
{
- (void) trigger;
- struct txn *txn = (struct txn *) event;
- /*
- * Synchronous transaction rollback due to receiving a
- * ROLLBACK entry is a normal event and requires no
- * special handling.
- */
- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
- return 0;
-
/*
* Setup shared applier diagnostic area.
*
@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
* diag use per-applier diag instead all the time
* (which actually already present in the structure).
*
- * But remember that transactions are asynchronous
- * and rollback may happen a way latter after it
- * passed to the journal engine.
+ * But remember that WAL writes are asynchronous and
+ * rollback may happen a way later after it was passed to
+ * the journal engine.
*/
diag_set(ClientError, ER_WAL_IO);
diag_set_error(&replicaset.applier.diag,
@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
/* Rollback applier vclock to the committed one. */
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+ (void) trigger;
+ struct txn *txn = (struct txn *) event;
+ /*
+ * Synchronous transaction rollback due to receiving a
+ * ROLLBACK entry is a normal event and requires no
+ * special handling.
+ */
+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+ applier_rollback_by_wal_io();
return 0;
}
@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
return 0;
}
+struct synchro_entry {
+ /** Encoded form of a synchro record. */
+ struct synchro_body_bin body_bin;
+
+ /** xrow to write, used by the journal engine. */
+ struct xrow_header row;
+
+ /**
+ * The journal entry itself. Note since
+ * it has unsized array it must be the
+ * last entry in the structure.
+ */
+ struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+ free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+ assert(entry->complete_data != NULL);
+ struct synchro_entry *synchro_entry =
+ (struct synchro_entry *)entry->complete_data;
+ if (entry->res < 0)
+ applier_rollback_by_wal_io();
+ else
+ trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+ synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+ struct synchro_request *req)
+{
+ struct synchro_entry *entry;
+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+ /*
+ * For simplicity we use malloc here but
+ * probably should provide some cache similar
+ * to txn cache.
+ */
+ entry = (struct synchro_entry *)malloc(size);
+ if (entry == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+ return NULL;
+ }
+
+ struct journal_entry *journal_entry = &entry->journal_entry;
+ struct synchro_body_bin *body_bin = &entry->body_bin;
+ struct xrow_header *row = &entry->row;
+
+ journal_entry->rows[0] = row;
+
+ xrow_encode_synchro(row, body_bin, req);
+
+ row->lsn = applier_row->lsn;
+ row->replica_id = applier_row->replica_id;
+
+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+ apply_synchro_row_cb, entry);
+ return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+ assert(iproto_type_is_synchro_request(row->type));
+
+ struct synchro_request req;
+ if (xrow_decode_synchro(row, &req) != 0)
+ goto err;
+
+ if (txn_limbo_process(&txn_limbo, &req))
+ goto err;
+
+ struct synchro_entry *entry;
+ entry = synchro_entry_new(row, &req);
+ if (entry == NULL)
+ goto err;
+
+ if (journal_write_async(&entry->journal_entry) != 0) {
+ diag_set(ClientError, ER_WAL_IO);
+ goto err;
+ }
+ return 0;
+err:
+ diag_log();
+ return -1;
+}
+
/**
* Apply all rows in the rows queue as a single transaction.
*
@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
}
}
+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+ /*
+ * Synchro messages are not transactions, in terms
+ * of DML. Always sent and written isolated from
+ * each other.
+ */
+ assert(first_row == last_row);
+ if (apply_synchro_row(first_row) != 0)
+ diag_raise();
+ goto success;
+ }
+
/**
* Explicitly begin the transaction so that we can
* control fiber->gc life cycle and, in case of apply
* conflict safely access failed xrow object and allocate
* IPROTO_NOP on gc.
*/
- struct txn *txn = txn_begin();
+ struct txn *txn;
+ txn = txn_begin();
struct applier_tx_row *item;
if (txn == NULL) {
latch_unlock(latch);
@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)
if (txn_commit_async(txn) < 0)
goto fail;
+success:
/*
* The transaction was sent to journal so promote vclock.
*
@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)
applier->lag = TIMEOUT_INFINITY;
- /* Register triggers to handle WAL writes and rollbacks. */
+ /*
+ * Register triggers to handle WAL writes and rollbacks.
+ *
+ * Note we use them for syncronous packets handling as well
+ * thus when changing make sure that synchro handling won't
+ * be broken.
+ */
struct trigger on_wal_write;
trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (4 preceding siblings ...)
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 8:52 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body Cyrill Gorcunov
2020-08-24 21:16 ` [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
Since we no longer use txn engine for synchro
packets processing this code is never executed.
Part-of #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/txn.c | 9 +--------
1 file changed, 1 insertion(+), 8 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index cc1f496c5..b2d342355 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -82,14 +82,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
*/
struct space *space = stmt->space;
row->group_id = space != NULL ? space_group_id(space) : 0;
- /*
- * Sychronous replication entries are supplementary and
- * aren't valid dml requests. They're encoded manually.
- */
- if (likely(!iproto_type_is_synchro_request(row->type)))
- row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
- else
- row->bodycnt = xrow_header_dup_body(row, &txn->region);
+ row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
if (row->bodycnt < 0)
return -1;
stmt->row = row;
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (5 preceding siblings ...)
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
@ 2020-08-19 21:34 ` Cyrill Gorcunov
2020-08-21 8:57 ` Serge Petrenko
2020-08-24 21:16 ` [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
7 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-19 21:34 UTC (permalink / raw)
To: tml; +Cc: Vladislav Shpilevoy
We no longer use it.
Closes #5129
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
src/box/xrow.c | 15 ---------------
src/box/xrow.h | 8 --------
2 files changed, 23 deletions(-)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 9c6fb4fc1..95ddb1fe7 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -220,21 +220,6 @@ xrow_header_decode(struct xrow_header *header, const char **pos,
return 0;
}
-int
-xrow_header_dup_body(struct xrow_header *row, struct region *region)
-{
- assert(row->bodycnt == 1);
- size_t size = row->body[0].iov_len;
- char *copy = (char *)region_alloc(region, size);
- if (copy == NULL) {
- diag_set(OutOfMemory, size, "region_alloc", "copy");
- return -1;
- }
- memcpy(copy, row->body[0].iov_base, size);
- row->body[0].iov_base = copy;
- return 1;
-}
-
/**
* @pre pos points at a valid msgpack
*/
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 20e82034d..58d47b12d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -141,14 +141,6 @@ int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end, bool end_is_exact);
-/**
- * Duplicate the xrow's body onto the given region.
- * @retval -1 Error.
- * @retval >= 0 Iov count in the body.
- */
-int
-xrow_header_dup_body(struct xrow_header *header, struct region *region);
-
/**
* DML request.
*/
--
2.26.2
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-19 22:43 ` Vladislav Shpilevoy
2020-08-20 7:13 ` Cyrill Gorcunov
2020-08-21 8:36 ` Serge Petrenko
1 sibling, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-19 22:43 UTC (permalink / raw)
To: Cyrill Gorcunov, tml
Thanks for the patch!
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index e458dad75..4b90d7fa5 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -285,46 +297,36 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>
> /*
> * This is a synchronous commit so we can
> - * use body and row allocated on a stack.
> + * allocate everything on a stack.
> */
> struct synchro_body_bin body;
> struct xrow_header row;
> - struct request request = {
> - .header = &row,
> - };
> + char buf[sizeof(struct journal_entry) +
> + sizeof(struct xrow_header *)];
>
> - struct txn *txn = txn_begin();
> - if (txn == NULL)
> - goto rollback;
> + struct journal_entry *entry = (struct journal_entry *)buf;
> + entry->rows[0] = &row;
>
> xrow_encode_synchro(&row, &body, &req);
>
> - /*
> - * This is not really a transaction. It just uses txn API
> - * to put the data into WAL. And obviously it should not
> - * go to the limbo and block on the very same sync
> - * transaction which it tries to confirm now.
> - */
> - txn_set_flag(txn, TXN_FORCE_ASYNC);
> -
> - if (txn_begin_stmt(txn, NULL) != 0)
> - goto rollback;
> - if (txn_commit_stmt(txn, &request) != 0)
> - goto rollback;
> - if (txn_commit(txn) != 0)
> - goto rollback;
> - return;
> + journal_entry_create(entry, 1, xrow_approx_len(&row),
> + txn_limbo_write_cb, fiber());
>
> -rollback:
> - /*
> - * XXX: the stub is supposed to be removed once it is defined what to do
> - * when a synchro request WAL write fails. One of the possible
> - * solutions: log the error, keep the limbo queue as is and probably put
> - * in rollback mode. Then provide a hook to call manually when WAL
> - * problems are fixed. Or retry automatically with some period.
> - */
> - panic("Could not write a synchro request to WAL: lsn = %lld, type = "
> - "%s\n", lsn, iproto_type_name(type));
> + if (journal_write(entry) != 0 || entry->res < 0) {
> + diag_set(ClientError, ER_WAL_IO);
> + diag_log();
> + /*
> + * XXX: the stub is supposed to be removed once it is defined
> + * what to do when a synchro request WAL write fails. One of
> + * the possible solutions: log the error, keep the limbo
> + * queue as is and probably put in rollback mode. Then
> + * provide a hook to call manually when WAL problems are fixed.
> + * Or retry automatically with some period.
> + */
> + panic("Could not write a synchro request to WAL: "
> + "lsn = %lld, type = %s\n", lsn,
> + type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
Why did you inline iproto_type_name(type)?
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-19 22:43 ` Vladislav Shpilevoy
@ 2020-08-20 7:13 ` Cyrill Gorcunov
0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-20 7:13 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
On Thu, Aug 20, 2020 at 12:43:45AM +0200, Vladislav Shpilevoy wrote:
> > + if (journal_write(entry) != 0 || entry->res < 0) {
> > + diag_set(ClientError, ER_WAL_IO);
> > + diag_log();
> > + /*
> > + * XXX: the stub is supposed to be removed once it is defined
> > + * what to do when a synchro request WAL write fails. One of
> > + * the possible solutions: log the error, keep the limbo
> > + * queue as is and probably put in rollback mode. Then
> > + * provide a hook to call manually when WAL problems are fixed.
> > + * Or retry automatically with some period.
> > + */
> > + panic("Could not write a synchro request to WAL: "
> > + "lsn = %lld, type = %s\n", lsn,
> > + type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
>
> Why did you inline iproto_type_name(type)?
It happened to sneak in from old commits (I've cherry picked it,
and there were no such code previously). I've fixed it and pushed
into the branch. Thanks for noticing!
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-08-21 7:48 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 7:48 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 9544 bytes --]
Hi! Thanks for the patch! LGTM.
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
>wal journal operations such that asynchronous write completion
>is a single instance per journal.
>
>It turned out that such simplification is too tight and doesn't
>allow us to pass entries into the journal with custom completions.
>
>Thus lets allow back such ability. We will need it to be able
>to write "confirm" records into wal directly without touching
>transactions code at all.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/box.cc | 15 ++++++++-------
> src/box/journal.c | 2 ++
> src/box/journal.h | 20 +++++++++++---------
> src/box/txn.c | 2 +-
> src/box/vy_log.c | 2 +-
> src/box/wal.c | 19 ++++++++-----------
> src/box/wal.h | 4 ++--
> 7 files changed, 33 insertions(+), 31 deletions(-)
>
>diff --git a/src/box/box.cc b/src/box/box.cc
>index 8e811e9c1..faffd5769 100644
>--- a/src/box/box.cc
>+++ b/src/box/box.cc
>@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
> * Since there're no actual writes, fire a
> * journal_async_complete callback right away.
> */
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
> {
> static struct recovery_journal journal;
> journal_create(&journal.base, recovery_journal_write,
>- txn_complete_async, recovery_journal_write);
>+ recovery_journal_write);
> journal.vclock = v;
> journal_set(&journal.base);
> }
>@@ -2182,8 +2182,10 @@ engine_init()
> static int
> bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
> {
>+ (void)base;
>+
> entry->res = 0;
>- journal_async_complete(base, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
>
> int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
> enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
>- if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
>- wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
>+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
>+ &INSTANCE_UUID, on_wal_garbage_collection,
> on_wal_checkpoint_threshold) != 0) {
> diag_raise();
> }
>@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
> }
>
> struct journal bootstrap_journal;
>- journal_create(&bootstrap_journal, NULL, txn_complete_async,
>- bootstrap_journal_write);
>+ journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
> journal_set(&bootstrap_journal);
> auto bootstrap_journal_guard = make_scoped_guard([] {
> journal_set(NULL);
>diff --git a/src/box/journal.c b/src/box/journal.c
>index f1e89aaa2..48af9157b 100644
>--- a/src/box/journal.c
>+++ b/src/box/journal.c
>@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
>
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
> void *complete_data)
> {
> struct journal_entry *entry;
>@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
> return NULL;
> }
>
>+ entry->write_async_cb = write_async_cb;
> entry->complete_data = complete_data;
> entry->approx_len = 0;
> entry->n_rows = n_rows;
>diff --git a/src/box/journal.h b/src/box/journal.h
>index 1a10e66c3..4b019fecf 100644
>--- a/src/box/journal.h
>+++ b/src/box/journal.h
>@@ -42,6 +42,8 @@ extern "C" {
> struct xrow_header;
> struct journal_entry;
>
>+typedef void (*journal_write_async_f)(struct journal_entry *entry);
>+
> /**
> * An entry for an abstract journal.
> * Simply put, a write ahead log request.
>@@ -61,6 +63,10 @@ struct journal_entry {
> * A journal entry completion callback argument.
> */
> void *complete_data;
>+ /**
>+ * Asynchronous write completion function.
>+ */
>+ journal_write_async_f write_async_cb;
> /**
> * Approximate size of this request when encoded.
> */
>@@ -84,6 +90,7 @@ struct region;
> */
> struct journal_entry *
> journal_entry_new(size_t n_rows, struct region *region,
>+ journal_write_async_f write_async_cb,
> void *complete_data);
>
> /**
>@@ -96,22 +103,19 @@ struct journal {
> int (*write_async)(struct journal *journal,
> struct journal_entry *entry);
>
>- /** Asynchronous write completion */
>- void (*write_async_cb)(struct journal_entry *entry);
>-
> /** Synchronous write */
> int (*write)(struct journal *journal,
> struct journal_entry *entry);
> };
>
> /**
>- * Finalize a single entry.
>+ * Complete asynchronous write.
> */
> static inline void
>-journal_async_complete(struct journal *journal, struct journal_entry *entry)
>+journal_async_complete(struct journal_entry *entry)
> {
>- assert(journal->write_async_cb != NULL);
>- journal->write_async_cb(entry);
>+ assert(entry->write_async_cb != NULL);
>+ entry->write_async_cb(entry);
> }
>
> /**
>@@ -173,12 +177,10 @@ static inline void
> journal_create(struct journal *journal,
> int (*write_async)(struct journal *journal,
> struct journal_entry *entry),
>- void (*write_async_cb)(struct journal_entry *entry),
> int (*write)(struct journal *journal,
> struct journal_entry *entry))
> {
> journal->write_async = write_async;
>- journal->write_async_cb = write_async_cb;
> journal->write = write;
> }
>
>diff --git a/src/box/txn.c b/src/box/txn.c
>index 9c21258c5..cc1f496c5 100644
>--- a/src/box/txn.c
>+++ b/src/box/txn.c
>@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
>
> /* Save space for an additional NOP row just in case. */
> req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
>- &txn->region, txn);
>+ &txn->region, txn_complete_async, txn);
> if (req == NULL)
> return NULL;
>
>diff --git a/src/box/vy_log.c b/src/box/vy_log.c
>index 311985c72..de4c5205c 100644
>--- a/src/box/vy_log.c
>+++ b/src/box/vy_log.c
>@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
> size_t used = region_used(&fiber()->gc);
>
> struct journal_entry *entry;
>- entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
>+ entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
> if (entry == NULL)
> goto err;
>
>diff --git a/src/box/wal.c b/src/box/wal.c
>index d8c92aa36..045006b60 100644
>--- a/src/box/wal.c
>+++ b/src/box/wal.c
>@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
> static void
> tx_schedule_queue(struct stailq *queue)
> {
>- struct wal_writer *writer = &wal_writer_singleton;
> struct journal_entry *req, *tmp;
> stailq_foreach_entry_safe(req, tmp, queue, fifo)
>- journal_async_complete(&writer->base, req);
>+ journal_async_complete(req);
> }
>
> /**
>@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
> */
> static void
> wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>- void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname,
>- int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+ const char *wal_dirname, int64_t wal_max_size,
>+ const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
>@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> journal_create(&writer->base,
> wal_mode == WAL_NONE ?
> wal_write_none_async : wal_write_async,
>- wall_async_cb,
> wal_mode == WAL_NONE ?
> wal_write_none : wal_write);
>
>@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
> }
>
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold)
> {
> /* Initialize the state. */
> struct wal_writer *writer = &wal_writer_singleton;
>- wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
>- wal_max_size, instance_uuid, on_garbage_collection,
>+ wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
>+ instance_uuid, on_garbage_collection,
> on_checkpoint_threshold);
>
> /* Start WAL thread. */
>@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
> vclock_merge(&writer->vclock, &vclock_diff);
> vclock_copy(&replicaset.vclock, &writer->vclock);
> entry->res = vclock_sum(&writer->vclock);
>- journal_async_complete(journal, entry);
>+ journal_async_complete(entry);
> return 0;
> }
>
>diff --git a/src/box/wal.h b/src/box/wal.h
>index f348dc636..9d0cada46 100644
>--- a/src/box/wal.h
>+++ b/src/box/wal.h
>@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
> * Start WAL thread and initialize WAL writer.
> */
> int
>-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
>- const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
>+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
>+ int64_t wal_max_size, const struct tt_uuid *instance_uuid,
> wal_on_garbage_collection_f on_garbage_collection,
> wal_on_checkpoint_threshold_f on_checkpoint_threshold);
>
>--
>2.26.2
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 11376 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper Cyrill Gorcunov
@ 2020-08-21 7:51 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 7:51 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 1619 bytes --]
Thanks for the patch! LGTM.
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>To create raw journal entries. We will use it
>to write confirm/rollback entries.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/journal.c | 8 ++------
> src/box/journal.h | 16 ++++++++++++++++
> 2 files changed, 18 insertions(+), 6 deletions(-)
>
>diff --git a/src/box/journal.c b/src/box/journal.c
>index 48af9157b..cb320b557 100644
>--- a/src/box/journal.c
>+++ b/src/box/journal.c
>@@ -51,11 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
> return NULL;
> }
>
>- entry->write_async_cb = write_async_cb;
>- entry->complete_data = complete_data;
>- entry->approx_len = 0;
>- entry->n_rows = n_rows;
>- entry->res = -1;
>-
>+ journal_entry_create(entry, n_rows, 0, write_async_cb,
>+ complete_data);
> return entry;
> }
>diff --git a/src/box/journal.h b/src/box/journal.h
>index 4b019fecf..5d8d5a726 100644
>--- a/src/box/journal.h
>+++ b/src/box/journal.h
>@@ -83,6 +83,22 @@ struct journal_entry {
>
> struct region;
>
>+/**
>+ * Initialize a new journal entry.
>+ */
>+static inline void
>+journal_entry_create(struct journal_entry *entry, size_t n_rows,
>+ size_t approx_len,
>+ journal_write_async_f write_async_cb,
>+ void *complete_data)
>+{
>+ entry->write_async_cb = write_async_cb;
>+ entry->complete_data = complete_data;
>+ entry->approx_len = approx_len;
>+ entry->n_rows = n_rows;
>+ entry->res = -1;
>+}
>+
> /**
> * Create a new journal entry.
> *
>--
>2.26.2
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 2250 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries Cyrill Gorcunov
@ 2020-08-21 8:15 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 8:15 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 4195 bytes --]
LGTM.
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>These msgpack entries will be needed to write them
>down to a journal without involving txn engine. Same
>time we would like to be able to allocate them on stack,
>for this sake the binary form is predefined.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/txn_limbo.c | 9 +++++++--
> src/box/xrow.c | 41 ++++++++++++++++++-----------------------
> src/box/xrow.h | 20 +++++++++++++++-----
> 3 files changed, 40 insertions(+), 30 deletions(-)
>
>diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>index c6a4e5efc..e458dad75 100644
>--- a/src/box/txn_limbo.c
>+++ b/src/box/txn_limbo.c
>@@ -283,6 +283,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> .lsn = lsn,
> };
>
>+ /*
>+ * This is a synchronous commit so we can
>+ * use body and row allocated on a stack.
>+ */
>+ struct synchro_body_bin body;
> struct xrow_header row;
> struct request request = {
> .header = &row,
>@@ -292,8 +297,8 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> if (txn == NULL)
> goto rollback;
>
>- if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
>- goto rollback;
>+ xrow_encode_synchro(&row, &body, &req);
>+
> /*
> * This is not really a transaction. It just uses txn API
> * to put the data into WAL. And obviously it should not
>diff --git a/src/box/xrow.c b/src/box/xrow.c
>index bf174c701..9c6fb4fc1 100644
>--- a/src/box/xrow.c
>+++ b/src/box/xrow.c
>@@ -893,35 +893,30 @@ xrow_encode_dml(const struct request *request, struct region *region,
> return iovcnt;
> }
>
>-int
>-xrow_encode_synchro(struct xrow_header *row, struct region *region,
>+void
>+xrow_encode_synchro(struct xrow_header *row,
>+ struct synchro_body_bin *body,
> const struct synchro_request *req)
> {
>- size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
>- mp_sizeof_uint(req->replica_id) +
>- mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
>- char *buf = (char *)region_alloc(region, len);
>- if (buf == NULL) {
>- diag_set(OutOfMemory, len, "region_alloc", "buf");
>- return -1;
>- }
>- char *pos = buf;
>-
>- pos = mp_encode_map(pos, 2);
>- pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
>- pos = mp_encode_uint(pos, req->replica_id);
>- pos = mp_encode_uint(pos, IPROTO_LSN);
>- pos = mp_encode_uint(pos, req->lsn);
>+ /*
>+ * A map with two elements. We don't compress
>+ * numbers to have this structure constant in size,
>+ * which allows us to preallocate it on stack.
>+ */
>+ body->m_body = 0x80 | 2;
>+ body->k_replica_id = IPROTO_REPLICA_ID;
>+ body->m_replica_id = 0xce;
>+ body->v_replica_id = mp_bswap_u32(req->replica_id);
>+ body->k_lsn = IPROTO_LSN;
>+ body->m_lsn = 0xcf;
>+ body->v_lsn = mp_bswap_u64(req->lsn);
>
> memset(row, 0, sizeof(*row));
>
>- row->body[0].iov_base = buf;
>- row->body[0].iov_len = len;
>- row->bodycnt = 1;
>-
> row->type = req->type;
>-
>- return 0;
>+ row->body[0].iov_base = (void *)body;
>+ row->body[0].iov_len = sizeof(*body);
>+ row->bodycnt = 1;
> }
>
> int
>diff --git a/src/box/xrow.h b/src/box/xrow.h
>index 02dca74e5..20e82034d 100644
>--- a/src/box/xrow.h
>+++ b/src/box/xrow.h
>@@ -240,16 +240,26 @@ struct synchro_request {
> int64_t lsn;
> };
>
>+/** Synchro request xrow's body in MsgPack format. */
>+struct PACKED synchro_body_bin {
>+ uint8_t m_body;
>+ uint8_t k_replica_id;
>+ uint8_t m_replica_id;
>+ uint32_t v_replica_id;
>+ uint8_t k_lsn;
>+ uint8_t m_lsn;
>+ uint64_t v_lsn;
>+};
>+
> /**
> * Encode synchronous replication request.
> * @param row xrow header.
>- * @param region Region to use to encode the confirmation body.
>+ * @param body Desination to use to encode the confirmation body.
> * @param req Request parameters.
>- * @retval -1 on error.
>- * @retval 0 success.
> */
>-int
>-xrow_encode_synchro(struct xrow_header *row, struct region *region,
>+void
>+xrow_encode_synchro(struct xrow_header *row,
>+ struct synchro_body_bin *body,
> const struct synchro_request *req);
>
> /**
>--
>2.26.2
>
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 5126 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-08-19 22:43 ` Vladislav Shpilevoy
@ 2020-08-21 8:36 ` Serge Petrenko
1 sibling, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 8:36 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 3968 bytes --]
Thanks for the patch!
LGTM.
>Четверг, 20 августа 2020, 0:35 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>When we need to write CONFIRM or ROLLBACK message (which is
>a binary record in msgpack format) into a journal we use txn code
>to allocate a new transaction, encode there a message and pass it
>to walk the long txn path before it hit the journal. This is not
>only resource wasting but also somehow strange from architectural
>point of view.
>
>Instead lets encode a record on the stack and write it to the journal
>directly.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/txn_limbo.c | 66 +++++++++++++++++++++++----------------------
> 1 file changed, 34 insertions(+), 32 deletions(-)
>
>diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>index e458dad75..4b90d7fa5 100644
>--- a/src/box/txn_limbo.c
>+++ b/src/box/txn_limbo.c
>@@ -32,6 +32,7 @@
> #include "txn_limbo.h"
> #include "replication.h"
> #include "iproto_constants.h"
>+#include "journal.h"
>
> struct txn_limbo txn_limbo;
>
>@@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
> return 0;
> }
>
>+/**
>+ * A callback for synchronous write: txn_limbo_write_synchro fiber
>+ * waiting to proceed once a record is written to WAL.
>+ */
>+static void
>+txn_limbo_write_cb(struct journal_entry *entry)
>+{
>+ assert(entry->complete_data != NULL);
>+ fiber_wakeup(entry->complete_data);
>+}
>+
> static void
> txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
> {
>@@ -285,46 +297,36 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>
> /*
> * This is a synchronous commit so we can
>- * use body and row allocated on a stack.
>+ * allocate everything on a stack.
> */
> struct synchro_body_bin body;
> struct xrow_header row;
>- struct request request = {
>- .header = &row,
>- };
>+ char buf[sizeof(struct journal_entry) +
>+ sizeof(struct xrow_header *)];
>
>- struct txn *txn = txn_begin();
>- if (txn == NULL)
>- goto rollback;
>+ struct journal_entry *entry = (struct journal_entry *)buf;
>+ entry->rows[0] = &row;
>
> xrow_encode_synchro(&row, &body, &req);
>
>- /*
>- * This is not really a transaction. It just uses txn API
>- * to put the data into WAL. And obviously it should not
>- * go to the limbo and block on the very same sync
>- * transaction which it tries to confirm now.
>- */
>- txn_set_flag(txn, TXN_FORCE_ASYNC);
>-
>- if (txn_begin_stmt(txn, NULL) != 0)
>- goto rollback;
>- if (txn_commit_stmt(txn, &request) != 0)
>- goto rollback;
>- if (txn_commit(txn) != 0)
>- goto rollback;
>- return;
>+ journal_entry_create(entry, 1, xrow_approx_len(&row),
>+ txn_limbo_write_cb, fiber());
>
>-rollback:
>- /*
>- * XXX: the stub is supposed to be removed once it is defined what to do
>- * when a synchro request WAL write fails. One of the possible
>- * solutions: log the error, keep the limbo queue as is and probably put
>- * in rollback mode. Then provide a hook to call manually when WAL
>- * problems are fixed. Or retry automatically with some period.
>- */
>- panic("Could not write a synchro request to WAL: lsn = %lld, type = "
>- "%s\n", lsn, iproto_type_name(type));
>+ if (journal_write(entry) != 0 || entry->res < 0) {
>+ diag_set(ClientError, ER_WAL_IO);
>+ diag_log();
>+ /*
>+ * XXX: the stub is supposed to be removed once it is defined
>+ * what to do when a synchro request WAL write fails. One of
>+ * the possible solutions: log the error, keep the limbo
>+ * queue as is and probably put in rollback mode. Then
>+ * provide a hook to call manually when WAL problems are fixed.
>+ * Or retry automatically with some period.
>+ */
>+ panic("Could not write a synchro request to WAL: "
>+ "lsn = %lld, type = %s\n", lsn,
>+ type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
>+ }
> }
>
> /**
>--
>2.26.2
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 4867 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Cyrill Gorcunov
@ 2020-08-21 8:51 ` Serge Petrenko
2020-08-21 21:59 ` Vladislav Shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 8:51 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 9162 bytes --]
Hi! Thanks for the patch.
LGTM with one comment which’s up to you.
>Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>Transaction processing code is very heavy simply because
>transactions are carrying various data and involves a number
>of other mechanisms to proceed.
>
>In turn, when we receive confirm or rollback packed from
>another node in a cluster we just need to inspect limbo
>queue and write this packed into a WAL journal. So calling
>a bunch of txn engine helpers is simply waste of cycles.
>
>Thus lets rather handle them in a special light way:
>
> - allocate synchro_entry structure which would carry
> the journal entry itself and encoded message
> - process limbo queue to mark confirmed/rollback'ed
> messages
> - finally write this synchro_entry into a journal
>
>Which is a way simplier.
>
>Part-of #5129
>
>Suggedsted-by: Vladislav Shpilevoy < v.shpilevoy@tarantool.org >
>Co-developed-by: Vladislav Shpilevoy < v.shpilevoy@tarantool.org >
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/applier.cc | 200 +++++++++++++++++++++++++++++++++------------
> 1 file changed, 148 insertions(+), 52 deletions(-)
>
>diff --git a/src/box/applier.cc b/src/box/applier.cc
>index 1387d518c..c1d07ca54 100644
>--- a/src/box/applier.cc
>+++ b/src/box/applier.cc
>@@ -51,8 +51,10 @@
> #include "schema.h"
> #include "txn.h"
> #include "box.h"
>+#include "xrow.h"
> #include "scoped_guard.h"
> #include "txn_limbo.h"
>+#include "journal.h"
>
> STRS(applier_state, applier_STATE);
>
>@@ -268,45 +270,11 @@ process_nop(struct request *request)
> return txn_commit_stmt(txn, request);
> }
>
>-/*
>- * CONFIRM/ROLLBACK rows aren't dml requests and require special
>- * handling: instead of performing some operations on spaces,
>- * processing these requests requires txn_limbo to either confirm
>- * or rollback some of its entries.
>- */
>-static int
>-process_synchro_row(struct request *request)
>-{
>- assert(iproto_type_is_synchro_request(request->header->type));
>- struct txn *txn = in_txn();
>-
>- struct synchro_request syn_req;
>- if (xrow_decode_synchro(request->header, &syn_req) != 0)
>- return -1;
>- assert(txn->n_applier_rows == 0);
>- /*
>- * This is not really a transaction. It just uses txn API
>- * to put the data into WAL. And obviously it should not
>- * go to the limbo and block on the very same sync
>- * transaction which it tries to confirm now.
>- */
>- txn_set_flag(txn, TXN_FORCE_ASYNC);
>-
>- if (txn_begin_stmt(txn, NULL) != 0)
>- return -1;
>- if (txn_commit_stmt(txn, request) != 0)
>- return -1;
>- return txn_limbo_process(&txn_limbo, &syn_req);
>-}
>-
> static int
> apply_row(struct xrow_header *row)
> {
> struct request request;
>- if (iproto_type_is_synchro_request(row->type)) {
>- request.header = row;
>- return process_synchro_row(&request);
>- }
>+ assert(!iproto_type_is_synchro_request(row->type));
> if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
> return -1;
> if (request.type == IPROTO_NOP)
>@@ -753,19 +721,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
> next)->row.is_commit);
> }
>
>-static int
>-applier_txn_rollback_cb(struct trigger *trigger, void *event)
>+static void
>+applier_rollback_by_wal_io(void)
> {
>- (void) trigger;
>- struct txn *txn = (struct txn *) event;
>- /*
>- * Synchronous transaction rollback due to receiving a
>- * ROLLBACK entry is a normal event and requires no
>- * special handling.
>- */
>- if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
>- return 0;
>-
> /*
> * Setup shared applier diagnostic area.
> *
>@@ -774,9 +732,9 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
> * diag use per-applier diag instead all the time
> * (which actually already present in the structure).
> *
>- * But remember that transactions are asynchronous
>- * and rollback may happen a way latter after it
>- * passed to the journal engine.
>+ * But remember that WAL writes are asynchronous and
>+ * rollback may happen a way later after it was passed to
>+ * the journal engine.
> */
> diag_set(ClientError, ER_WAL_IO);
> diag_set_error(&replicaset.applier.diag,
>@@ -787,6 +745,20 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
>
> /* Rollback applier vclock to the committed one. */
> vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
>+}
>+
>+static int
>+applier_txn_rollback_cb(struct trigger *trigger, void *event)
>+{
>+ (void) trigger;
>+ struct txn *txn = (struct txn *) event;
>+ /*
>+ * Synchronous transaction rollback due to receiving a
>+ * ROLLBACK entry is a normal event and requires no
>+ * special handling.
>+ */
>+ if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
>+ applier_rollback_by_wal_io();
> return 0;
> }
>
>@@ -800,6 +772,110 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
> return 0;
> }
>
>+struct synchro_entry {
>+ /** Encoded form of a synchro record. */
>+ struct synchro_body_bin body_bin;
>+
>+ /** xrow to write, used by the journal engine. */
>+ struct xrow_header row;
>+
>+ /**
>+ * The journal entry itself. Note since
>+ * it has unsized array it must be the
>+ * last entry in the structure.
>+ */
>+ struct journal_entry journal_entry;
>+};
>+
>+static void
>+synchro_entry_delete(struct synchro_entry *entry)
>+{
>+ free(entry);
>+}
>+
>+/**
>+ * Async write journal completion.
>+ */
>+static void
>+apply_synchro_row_cb(struct journal_entry *entry)
>+{
>+ assert(entry->complete_data != NULL);
>+ struct synchro_entry *synchro_entry =
>+ (struct synchro_entry *)entry->complete_data;
>+ if (entry->res < 0)
>+ applier_rollback_by_wal_io();
>+ else
>+ trigger_run(&replicaset.applier.on_wal_write, NULL);
>+
>+ synchro_entry_delete(synchro_entry);
>+}
>+
>+/**
>+ * Allocate a new synchro_entry to be passed to
>+ * the journal engine in async write way.
>+ */
>+static struct synchro_entry *
>+synchro_entry_new(struct xrow_header *applier_row,
>+ struct synchro_request *req)
>+{
>+ struct synchro_entry *entry;
>+ size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
>+
>+ /*
>+ * For simplicity we use malloc here but
>+ * probably should provide some cache similar
>+ * to txn cache.
>+ */
>+ entry = (struct synchro_entry *)malloc(size);
>+ if (entry == NULL) {
>+ diag_set(OutOfMemory, size, "malloc", "synchro_entry");
>+ return NULL;
>+ }
>+
>+ struct journal_entry *journal_entry = &entry->journal_entry;
>+ struct synchro_body_bin *body_bin = &entry->body_bin;
>+ struct xrow_header *row = &entry->row;
>+
>+ journal_entry->rows[0] = row;
>+
>+ xrow_encode_synchro(row, body_bin, req);
>+
>+ row->lsn = applier_row->lsn;
>+ row->replica_id = applier_row->replica_id;
>+
>+ journal_entry_create(journal_entry, 1, xrow_approx_len(row),
>+ apply_synchro_row_cb, entry);
>+ return entry;
>+}
>+
>+/** Process a synchro request. */
>+static int
>+apply_synchro_row(struct xrow_header *row)
>+{
>+ assert(iproto_type_is_synchro_request(row->type));
>+
>+ struct synchro_request req;
>+ if (xrow_decode_synchro(row, &req) != 0)
>+ goto err;
>+
>+ if (txn_limbo_process(&txn_limbo, &req))
>+ goto err;
>+
>+ struct synchro_entry *entry;
>+ entry = synchro_entry_new(row, &req);
>+ if (entry == NULL)
>+ goto err;
>+
>+ if (journal_write_async(&entry->journal_entry) != 0) {
>+ diag_set(ClientError, ER_WAL_IO);
>+ goto err;
>+ }
>+ return 0;
>+err:
>+ diag_log();
>+ return -1;
>+}
>+
> /**
> * Apply all rows in the rows queue as a single transaction.
> *
>@@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
> }
> }
>
>+ if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>+ /*
>+ * Synchro messages are not transactions, in terms
>+ * of DML. Always sent and written isolated from
>+ * each other.
>+ */
>+ assert(first_row == last_row);
>+ if (apply_synchro_row(first_row) != 0)
>+ diag_raise();
>+ goto success;
>+ }
>+
> /**
> * Explicitly begin the transaction so that we can
> * control fiber->gc life cycle and, in case of apply
> * conflict safely access failed xrow object and allocate
> * IPROTO_NOP on gc.
> */
>- struct txn *txn = txn_begin();
>+ struct txn *txn;
>+ txn = txn_begin();
Why this change?
> struct applier_tx_row *item;
> if (txn == NULL) {
> latch_unlock(latch);
>@@ -922,6 +1011,7 @@ applier_apply_tx(struct stailq *rows)
> if (txn_commit_async(txn) < 0)
> goto fail;
>
>+success:
> /*
> * The transaction was sent to journal so promote vclock.
> *
>@@ -1089,7 +1179,13 @@ applier_subscribe(struct applier *applier)
>
> applier->lag = TIMEOUT_INFINITY;
>
>- /* Register triggers to handle WAL writes and rollbacks. */
>+ /*
>+ * Register triggers to handle WAL writes and rollbacks.
>+ *
>+ * Note we use them for syncronous packets handling as well
>+ * thus when changing make sure that synchro handling won't
>+ * be broken.
>+ */
> struct trigger on_wal_write;
> trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
> trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
>--
>2.26.2
>
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 11001 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
@ 2020-08-21 8:52 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 8:52 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 1190 bytes --]
LGTM.
>Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>Since we no longer use txn engine for synchro
>packets processing this code is never executed.
>
>Part-of #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/txn.c | 9 +--------
> 1 file changed, 1 insertion(+), 8 deletions(-)
>
>diff --git a/src/box/txn.c b/src/box/txn.c
>index cc1f496c5..b2d342355 100644
>--- a/src/box/txn.c
>+++ b/src/box/txn.c
>@@ -82,14 +82,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
> */
> struct space *space = stmt->space;
> row->group_id = space != NULL ? space_group_id(space) : 0;
>- /*
>- * Sychronous replication entries are supplementary and
>- * aren't valid dml requests. They're encoded manually.
>- */
>- if (likely(!iproto_type_is_synchro_request(row->type)))
>- row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>- else
>- row->bodycnt = xrow_header_dup_body(row, &txn->region);
>+ row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
> if (row->bodycnt < 0)
> return -1;
> stmt->row = row;
>--
>2.26.2
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 1753 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body Cyrill Gorcunov
@ 2020-08-21 8:57 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-21 8:57 UTC (permalink / raw)
To: Cyrill Gorcunov; +Cc: tml, Vladislav Shpilevoy
[-- Attachment #1: Type: text/plain, Size: 1563 bytes --]
LGTM.
>Четверг, 20 августа 2020, 0:36 +03:00 от Cyrill Gorcunov <gorcunov@gmail.com>:
>
>We no longer use it.
>
>Closes #5129
>
>Signed-off-by: Cyrill Gorcunov < gorcunov@gmail.com >
>---
> src/box/xrow.c | 15 ---------------
> src/box/xrow.h | 8 --------
> 2 files changed, 23 deletions(-)
>
>diff --git a/src/box/xrow.c b/src/box/xrow.c
>index 9c6fb4fc1..95ddb1fe7 100644
>--- a/src/box/xrow.c
>+++ b/src/box/xrow.c
>@@ -220,21 +220,6 @@ xrow_header_decode(struct xrow_header *header, const char **pos,
> return 0;
> }
>
>-int
>-xrow_header_dup_body(struct xrow_header *row, struct region *region)
>-{
>- assert(row->bodycnt == 1);
>- size_t size = row->body[0].iov_len;
>- char *copy = (char *)region_alloc(region, size);
>- if (copy == NULL) {
>- diag_set(OutOfMemory, size, "region_alloc", "copy");
>- return -1;
>- }
>- memcpy(copy, row->body[0].iov_base, size);
>- row->body[0].iov_base = copy;
>- return 1;
>-}
>-
> /**
> * @pre pos points at a valid msgpack
> */
>diff --git a/src/box/xrow.h b/src/box/xrow.h
>index 20e82034d..58d47b12d 100644
>--- a/src/box/xrow.h
>+++ b/src/box/xrow.h
>@@ -141,14 +141,6 @@ int
> xrow_header_decode(struct xrow_header *header, const char **pos,
> const char *end, bool end_is_exact);
>
>-/**
>- * Duplicate the xrow's body onto the given region.
>- * @retval -1 Error.
>- * @retval >= 0 Iov count in the body.
>- */
>-int
>-xrow_header_dup_body(struct xrow_header *header, struct region *region);
>-
> /**
> * DML request.
> */
>--
>2.26.2
>
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 2173 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine
2020-08-21 8:51 ` Serge Petrenko
@ 2020-08-21 21:59 ` Vladislav Shpilevoy
2020-08-23 12:15 ` Serge Petrenko
0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-21 21:59 UTC (permalink / raw)
To: Serge Petrenko, Cyrill Gorcunov; +Cc: tml
Hi! Thanks for the review!
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 1387d518c..c1d07ca54 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
> }
> }
>
> + if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
> + /*
> + * Synchro messages are not transactions, in terms
> + * of DML. Always sent and written isolated from
> + * each other.
> + */
> + assert(first_row == last_row);
> + if (apply_synchro_row(first_row) != 0)
> + diag_raise();
> + goto success;
> + }
> +
> /**
> * Explicitly begin the transaction so that we can
> * control fiber->gc life cycle and, in case of apply
> * conflict safely access failed xrow object and allocate
> * IPROTO_NOP on gc.
> */
> - struct txn *txn = txn_begin();
> + struct txn *txn;
> + txn = txn_begin();
>
>
> Why this change?
In C++ you can't declare and assign variables bypassing labels (at
least in clang). It won't compile. Here a new label is added -
'success:'.
But it appeared, that it still allows to declare a variable and
assign it on a next line. So here it is done.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine
2020-08-21 21:59 ` Vladislav Shpilevoy
@ 2020-08-23 12:15 ` Serge Petrenko
0 siblings, 0 replies; 20+ messages in thread
From: Serge Petrenko @ 2020-08-23 12:15 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tml
[-- Attachment #1: Type: text/plain, Size: 1383 bytes --]
>Суббота, 22 августа 2020, 0:59 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
>
>Hi! Thanks for the review!
>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index 1387d518c..c1d07ca54 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -847,13 +923,26 @@ applier_apply_tx(struct stailq *rows)
>> }
>> }
>>
>> + if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
>> + /*
>> + * Synchro messages are not transactions, in terms
>> + * of DML. Always sent and written isolated from
>> + * each other.
>> + */
>> + assert(first_row == last_row);
>> + if (apply_synchro_row(first_row) != 0)
>> + diag_raise();
>> + goto success;
>> + }
>> +
>> /**
>> * Explicitly begin the transaction so that we can
>> * control fiber->gc life cycle and, in case of apply
>> * conflict safely access failed xrow object and allocate
>> * IPROTO_NOP on gc.
>> */
>> - struct txn *txn = txn_begin();
>> + struct txn *txn;
>> + txn = txn_begin();
>>
>>
>> Why this change?
>In C++ you can't declare and assign variables bypassing labels (at
>least in clang). It won't compile. Here a new label is added -
>'success:'.
>
>But it appeared, that it still allows to declare a variable and
>assign it on a next line. So here it is done.
I see. Thanks for the explanation!
--
Serge Petrenko
[-- Attachment #2: Type: text/html, Size: 2044 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
` (6 preceding siblings ...)
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body Cyrill Gorcunov
@ 2020-08-24 21:16 ` Vladislav Shpilevoy
7 siblings, 0 replies; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-24 21:16 UTC (permalink / raw)
To: Cyrill Gorcunov, tml
Pushed to master and 2.5.
^ permalink raw reply [flat|nested] 20+ messages in thread
end of thread, other threads:[~2020-08-24 21:16 UTC | newest]
Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-19 21:34 [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 1/7] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-08-21 7:48 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 2/7] journal: add journal_entry_create helper Cyrill Gorcunov
2020-08-21 7:51 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 3/7] qsync: provide a binary form of syncro entries Cyrill Gorcunov
2020-08-21 8:15 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 4/7] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-08-19 22:43 ` Vladislav Shpilevoy
2020-08-20 7:13 ` Cyrill Gorcunov
2020-08-21 8:36 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 5/7] applier: process synchro requests without txn engine Cyrill Gorcunov
2020-08-21 8:51 ` Serge Petrenko
2020-08-21 21:59 ` Vladislav Shpilevoy
2020-08-23 12:15 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 6/7] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
2020-08-21 8:52 ` Serge Petrenko
2020-08-19 21:34 ` [Tarantool-patches] [PATCH v9 7/7] xrow: drop xrow_header_dup_body Cyrill Gorcunov
2020-08-21 8:57 ` Serge Petrenko
2020-08-24 21:16 ` [Tarantool-patches] [PATCH v9 0/7] qsync: write CONFIRM/ROLLBACK without txn engine Vladislav Shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox