Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine
@ 2020-08-14 21:14 Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
                   ` (8 more replies)
  0 siblings, 9 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

In this series we write CONFIRM/ROLLBACK messages into the WAL directly
without involving the txn engine.

Vlad, take a look please, once time permit. Note this branch carries
yours "xrow: introduce struct synchro_request" which I fetched and
applier manually since it wasn't yet in master branch.

First 4 patches you've read already and hopefully I addressed
all your comments.

issue https://github.com/tarantool/tarantool/issues/5129
branch gorcunov/gh-5129-journal-7

v3:
 - bootstrap journal left NULL for async write
 - journal_write_async_cb_t type for async callback
 - struct synchro_body_bin type for encoded message
 - xrow_encode_synchro helper to operate with synchro_body_bin

v7:
 - rebase on master
 - rework applier code

Cyrill Gorcunov (8):
  journal: bind asynchronous write completion to an entry
  journal: add journal_entry_create helper
  qsync: provide a binary form of syncro entries
  qsync: direct write of CONFIRM/ROLLBACK into a journal
  applier: factor out latch locking
  applier: add shorthands to queue access
  applier: process synchro requests without txn engine
  applier: drop process_synchro_row

 src/box/applier.cc  | 295 ++++++++++++++++++++++++++++++++++----------
 src/box/box.cc      |  15 +--
 src/box/journal.c   |   8 +-
 src/box/journal.h   |  36 ++++--
 src/box/txn.c       |   2 +-
 src/box/txn_limbo.c |  69 ++++++-----
 src/box/vy_log.c    |   2 +-
 src/box/wal.c       |  19 ++-
 src/box/wal.h       |   4 +-
 src/box/xrow.c      |  41 +++---
 src/box/xrow.h      |  20 ++-
 11 files changed, 351 insertions(+), 160 deletions(-)

-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper Cyrill Gorcunov
                   ` (7 subsequent siblings)
  8 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
wal journal operations such that asynchronous write completion
is a single instance per journal.

It turned out that such simplification is too tight and doesn't
allow us to pass entries into the journal with custom completions.

Thus lets allow back such ability. We will need it to be able
to write "confirm" records into wal directly without touching
transactions code at all.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc    | 15 ++++++++-------
 src/box/journal.c |  2 ++
 src/box/journal.h | 20 +++++++++++---------
 src/box/txn.c     |  2 +-
 src/box/vy_log.c  |  2 +-
 src/box/wal.c     | 19 ++++++++-----------
 src/box/wal.h     |  4 ++--
 7 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 8e811e9c1..faffd5769 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
 	 * Since there're no actual writes, fire a
 	 * journal_async_complete callback right away.
 	 */
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
 	journal_create(&journal.base, recovery_journal_write,
-		       txn_complete_async, recovery_journal_write);
+		       recovery_journal_write);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
@@ -2182,8 +2182,10 @@ engine_init()
 static int
 bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
 {
+	(void)base;
+
 	entry->res = 0;
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -2569,8 +2571,8 @@ box_cfg_xc(void)
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
-		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
+		     &INSTANCE_UUID, on_wal_garbage_collection,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
@@ -2617,8 +2619,7 @@ box_cfg_xc(void)
 	}
 
 	struct journal bootstrap_journal;
-	journal_create(&bootstrap_journal, NULL, txn_complete_async,
-		       bootstrap_journal_write);
+	journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
 	journal_set(&bootstrap_journal);
 	auto bootstrap_journal_guard = make_scoped_guard([] {
 		journal_set(NULL);
diff --git a/src/box/journal.c b/src/box/journal.c
index f1e89aaa2..48af9157b 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
 
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_f write_async_cb,
 		  void *complete_data)
 {
 	struct journal_entry *entry;
@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		return NULL;
 	}
 
+	entry->write_async_cb = write_async_cb;
 	entry->complete_data = complete_data;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
diff --git a/src/box/journal.h b/src/box/journal.h
index 1a10e66c3..4b019fecf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -42,6 +42,8 @@ extern "C" {
 struct xrow_header;
 struct journal_entry;
 
+typedef void (*journal_write_async_f)(struct journal_entry *entry);
+
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -61,6 +63,10 @@ struct journal_entry {
 	 * A journal entry completion callback argument.
 	 */
 	void *complete_data;
+	/**
+	 * Asynchronous write completion function.
+	 */
+	journal_write_async_f write_async_cb;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -84,6 +90,7 @@ struct region;
  */
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_f write_async_cb,
 		  void *complete_data);
 
 /**
@@ -96,22 +103,19 @@ struct journal {
 	int (*write_async)(struct journal *journal,
 			   struct journal_entry *entry);
 
-	/** Asynchronous write completion */
-	void (*write_async_cb)(struct journal_entry *entry);
-
 	/** Synchronous write */
 	int (*write)(struct journal *journal,
 		     struct journal_entry *entry);
 };
 
 /**
- * Finalize a single entry.
+ * Complete asynchronous write.
  */
 static inline void
-journal_async_complete(struct journal *journal, struct journal_entry *entry)
+journal_async_complete(struct journal_entry *entry)
 {
-	assert(journal->write_async_cb != NULL);
-	journal->write_async_cb(entry);
+	assert(entry->write_async_cb != NULL);
+	entry->write_async_cb(entry);
 }
 
 /**
@@ -173,12 +177,10 @@ static inline void
 journal_create(struct journal *journal,
 	       int (*write_async)(struct journal *journal,
 				  struct journal_entry *entry),
-	       void (*write_async_cb)(struct journal_entry *entry),
 	       int (*write)(struct journal *journal,
 			    struct journal_entry *entry))
 {
 	journal->write_async	= write_async;
-	journal->write_async_cb	= write_async_cb;
 	journal->write		= write;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index 9c21258c5..cc1f496c5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
 
 	/* Save space for an additional NOP row just in case. */
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
-				&txn->region, txn);
+				&txn->region, txn_complete_async, txn);
 	if (req == NULL)
 		return NULL;
 
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 311985c72..de4c5205c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 	size_t used = region_used(&fiber()->gc);
 
 	struct journal_entry *entry;
-	entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
+	entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index d8c92aa36..045006b60 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_async_complete(&writer->base, req);
+		journal_async_complete(req);
 }
 
 /**
@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
-		  void (*wall_async_cb)(struct journal_entry *entry),
-		  const char *wal_dirname,
-		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+		  const char *wal_dirname, int64_t wal_max_size,
+		  const struct tt_uuid *instance_uuid,
 		  wal_on_garbage_collection_f on_garbage_collection,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	journal_create(&writer->base,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none_async : wal_write_async,
-		       wall_async_cb,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none : wal_write);
 
@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
 }
 
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
-	wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
-			  wal_max_size, instance_uuid, on_garbage_collection,
+	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
+			  instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
@@ -1314,7 +1311,7 @@ wal_write_none_async(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_async_complete(journal, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
diff --git a/src/box/wal.h b/src/box/wal.h
index f348dc636..9d0cada46 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
  * Start WAL thread and initialize WAL writer.
  */
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries Cyrill Gorcunov
                   ` (6 subsequent siblings)
  8 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

To create raw journal entries. We will use it
to write confirm/rollback entries.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/journal.c |  8 ++------
 src/box/journal.h | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/src/box/journal.c b/src/box/journal.c
index 48af9157b..cb320b557 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -51,11 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		return NULL;
 	}
 
-	entry->write_async_cb = write_async_cb;
-	entry->complete_data = complete_data;
-	entry->approx_len = 0;
-	entry->n_rows = n_rows;
-	entry->res = -1;
-
+	journal_entry_create(entry, n_rows, 0, write_async_cb,
+			     complete_data);
 	return entry;
 }
diff --git a/src/box/journal.h b/src/box/journal.h
index 4b019fecf..5d8d5a726 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -83,6 +83,22 @@ struct journal_entry {
 
 struct region;
 
+/**
+ * Initialize a new journal entry.
+ */
+static inline void
+journal_entry_create(struct journal_entry *entry, size_t n_rows,
+		     size_t approx_len,
+		     journal_write_async_f write_async_cb,
+		     void *complete_data)
+{
+	entry->write_async_cb	= write_async_cb;
+	entry->complete_data	= complete_data;
+	entry->approx_len	= approx_len;
+	entry->n_rows		= n_rows;
+	entry->res		= -1;
+}
+
 /**
  * Create a new journal entry.
  *
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
                   ` (5 subsequent siblings)
  8 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

These msgpack entries will be needed to write them
down to a journal without involving txn engine. Same
time we would like to be able to allocate them on stack,
for this sake the binary form is predefined.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn_limbo.c |  9 +++++++--
 src/box/xrow.c      | 41 ++++++++++++++++++-----------------------
 src/box/xrow.h      | 20 +++++++++++++++-----
 3 files changed, 40 insertions(+), 30 deletions(-)

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index 944161c30..ed8c10419 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -282,6 +282,11 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 	req.replica_id = limbo->instance_id;
 	req.lsn = lsn;
 
+	/*
+	 * This is a synchronous commit so we can
+	 * use body and row allocated on a stack.
+	 */
+	struct synchro_body_bin body;
 	struct xrow_header row;
 	struct request request = {
 		.header = &row,
@@ -291,8 +296,8 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 	if (txn == NULL)
 		goto rollback;
 
-	if (xrow_encode_synchro(&row, &txn->region, &req) != 0)
-		goto rollback;
+	xrow_encode_synchro(&row, &body, &req);
+
 	/*
 	 * This is not really a transaction. It just uses txn API
 	 * to put the data into WAL. And obviously it should not
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 4b5d4356f..03a4abdda 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,35 +893,30 @@ xrow_encode_dml(const struct request *request, struct region *region,
 	return iovcnt;
 }
 
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+		    struct synchro_body_bin *body,
 		    const struct synchro_request *req)
 {
-	size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
-		     mp_sizeof_uint(req->replica_id) +
-		     mp_sizeof_uint(IPROTO_LSN) + mp_sizeof_uint(req->lsn);
-	char *buf = (char *)region_alloc(region, len);
-	if (buf == NULL) {
-		diag_set(OutOfMemory, len, "region_alloc", "buf");
-		return -1;
-	}
-	char *pos = buf;
-
-	pos = mp_encode_map(pos, 2);
-	pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
-	pos = mp_encode_uint(pos, req->replica_id);
-	pos = mp_encode_uint(pos, IPROTO_LSN);
-	pos = mp_encode_uint(pos, req->lsn);
+	/*
+	 * A map with two elements. We don't compress
+	 * numbers to have this structure constant in size,
+	 * which allows us to preallocate it on stack.
+	 */
+	body->m_body = 0x80 | 2;
+	body->k_replica_id = IPROTO_REPLICA_ID;
+	body->m_replica_id = 0xce;
+	body->v_replica_id = mp_bswap_u32(req->replica_id);
+	body->k_lsn = IPROTO_LSN;
+	body->m_lsn = 0xcf;
+	body->v_lsn = mp_bswap_u64(req->lsn);
 
 	memset(row, 0, sizeof(*row));
 
-	row->body[0].iov_base = buf;
-	row->body[0].iov_len = len;
-	row->bodycnt = 1;
-
 	row->type = req->type;
-
-	return 0;
+	row->body[0].iov_base = (void *)body;
+	row->body[0].iov_len = sizeof(*body);
+	row->bodycnt = 1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 02dca74e5..20e82034d 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -240,16 +240,26 @@ struct synchro_request {
 	int64_t lsn;
 };
 
+/** Synchro request xrow's body in MsgPack format. */
+struct PACKED synchro_body_bin {
+	uint8_t m_body;
+	uint8_t k_replica_id;
+	uint8_t m_replica_id;
+	uint32_t v_replica_id;
+	uint8_t k_lsn;
+	uint8_t m_lsn;
+	uint64_t v_lsn;
+};
+
 /**
  * Encode synchronous replication request.
  * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
+ * @param body Desination to use to encode the confirmation body.
  * @param req Request parameters.
- * @retval -1 on error.
- * @retval 0 success.
  */
-int
-xrow_encode_synchro(struct xrow_header *row, struct region *region,
+void
+xrow_encode_synchro(struct xrow_header *row,
+		    struct synchro_body_bin *body,
 		    const struct synchro_request *req);
 
 /**
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (2 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-15 15:04   ` Vladislav Shpilevoy
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov
                   ` (4 subsequent siblings)
  8 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

When we need to write CONFIRM or ROLLBACK message (which is
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.

Instead lets encode a record on the stack and write it to the journal
directly.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn_limbo.c | 64 ++++++++++++++++++++++-----------------------
 1 file changed, 32 insertions(+), 32 deletions(-)

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index ed8c10419..447630d23 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,7 @@
 #include "txn_limbo.h"
 #include "replication.h"
 #include "iproto_constants.h"
+#include "journal.h"
 
 struct txn_limbo txn_limbo;
 
@@ -272,6 +273,17 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return 0;
 }
 
+/**
+ * A callback for synchronous write: txn_limbo_write_synchro fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	fiber_wakeup(entry->complete_data);
+}
+
 static void
 txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 {
@@ -284,46 +296,34 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
 
 	/*
 	 * This is a synchronous commit so we can
-	 * use body and row allocated on a stack.
+	 * allocate everything on a stack.
 	 */
 	struct synchro_body_bin body;
 	struct xrow_header row;
-	struct request request = {
-		.header = &row,
-	};
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
 
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		goto rollback;
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
 
 	xrow_encode_synchro(&row, &body, &req);
 
-	/*
-	 * This is not really a transaction. It just uses txn API
-	 * to put the data into WAL. And obviously it should not
-	 * go to the limbo and block on the very same sync
-	 * transaction which it tries to confirm now.
-	 */
-	txn_set_flag(txn, TXN_FORCE_ASYNC);
-
-	if (txn_begin_stmt(txn, NULL) != 0)
-		goto rollback;
-	if (txn_commit_stmt(txn, &request) != 0)
-		goto rollback;
-	if (txn_commit(txn) != 0)
-		goto rollback;
-	return;
+	journal_entry_create(entry, 1, xrow_approx_len(&row),
+			     txn_limbo_write_cb, fiber());
 
-rollback:
-	/*
-	 * XXX: the stub is supposed to be removed once it is defined what to do
-	 * when a synchro request WAL write fails. One of the possible
-	 * solutions: log the error, keep the limbo queue as is and probably put
-	 * in rollback mode. Then provide a hook to call manually when WAL
-	 * problems are fixed. Or retry automatically with some period.
-	 */
-	panic("Could not write a synchro request to WAL: lsn = %lld, type = "
-	      "%s\n", lsn, iproto_type_name(type));
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		/*
+		 * XXX: the stub is supposed to be removed once it is defined what to do
+		 * when a synchro request WAL write fails. One of the possible
+		 * solutions: log the error, keep the limbo queue as is and probably put
+		 * in rollback mode. Then provide a hook to call manually when WAL
+		 * problems are fixed. Or retry automatically with some period.
+		 */
+		panic("Could not write a synchro request to WAL: lsn = %lld, type = "
+		      "%s\n", lsn, type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
+	}
 }
 
 /**
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (3 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-15 15:04   ` Vladislav Shpilevoy
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access Cyrill Gorcunov
                   ` (3 subsequent siblings)
  8 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

We will need to reuse this helpers.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 43 +++++++++++++++++++++++++++++--------------
 1 file changed, 29 insertions(+), 14 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 98fb87375..60689f6d3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -799,6 +799,29 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 	return 0;
 }
 
+/*
+ * In a full mesh topology, the same set of changes
+ * may arrive via two concurrently running appliers.
+ * Hence we need a latch to strictly order all changes
+ * that belong to the same server id.
+ */
+static inline struct latch *
+applier_lock(uint32_t replica_id)
+{
+	struct replica *replica = replica_by_id(replica_id);
+	struct latch *latch = (replica ? &replica->order_latch :
+			       &replicaset.applier.order_latch);
+	latch_lock(latch);
+	return latch;
+}
+
+static inline void
+applier_unlock(struct latch *latch)
+{
+	assert(latch != NULL);
+	latch_unlock(latch);
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -811,19 +834,11 @@ applier_apply_tx(struct stailq *rows)
 					struct applier_tx_row, next)->row;
 	struct xrow_header *last_row;
 	last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
-	struct replica *replica = replica_by_id(first_row->replica_id);
-	/*
-	 * In a full mesh topology, the same set of changes
-	 * may arrive via two concurrently running appliers.
-	 * Hence we need a latch to strictly order all changes
-	 * that belong to the same server id.
-	 */
-	struct latch *latch = (replica ? &replica->order_latch :
-			       &replicaset.applier.order_latch);
-	latch_lock(latch);
+	struct latch *latch = applier_lock(first_row->replica_id);
+
 	if (vclock_get(&replicaset.applier.vclock,
 		       last_row->replica_id) >= last_row->lsn) {
-		latch_unlock(latch);
+		applier_unlock(latch);
 		return 0;
 	} else if (vclock_get(&replicaset.applier.vclock,
 			      first_row->replica_id) >= first_row->lsn) {
@@ -855,7 +870,7 @@ applier_apply_tx(struct stailq *rows)
 	struct txn *txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL) {
-		latch_unlock(latch);
+		applier_unlock(latch);
 		return -1;
 	}
 	stailq_foreach_entry(item, rows, next) {
@@ -930,12 +945,12 @@ applier_apply_tx(struct stailq *rows)
 	 */
 	vclock_follow(&replicaset.applier.vclock, last_row->replica_id,
 		      last_row->lsn);
-	latch_unlock(latch);
+	applier_unlock(latch);
 	return 0;
 rollback:
 	txn_rollback(txn);
 fail:
-	latch_unlock(latch);
+	applier_unlock(latch);
 	fiber_gc();
 	return -1;
 }
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (4 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Cyrill Gorcunov
                   ` (2 subsequent siblings)
  8 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

We need to access first and last xrow in a queue
frenquently and opencoded variants are too ugly.
Lets provide shorthands (we will reuse them in
qsync packets handling as well).

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 37 ++++++++++++++++++++++++++-----------
 1 file changed, 26 insertions(+), 11 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 60689f6d3..a71516282 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -648,6 +648,26 @@ struct applier_tx_row {
 	struct xrow_header row;
 };
 
+/**
+ * Get first xrow from a list.
+ */
+static inline struct xrow_header *
+applier_first_row(struct stailq *rows)
+{
+	return &stailq_first_entry(rows,
+		struct applier_tx_row, next)->row;
+}
+
+/**
+ * Get last xrow from a list.
+ */
+static inline struct xrow_header *
+applier_last_row(struct stailq *rows)
+{
+	return &stailq_last_entry(rows,
+		struct applier_tx_row, next)->row;
+}
+
 static struct applier_tx_row *
 applier_read_tx_row(struct applier *applier)
 {
@@ -749,8 +769,7 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 		}
 		stailq_add_tail(rows, &tx_row->next);
 
-	} while (!stailq_last_entry(rows, struct applier_tx_row,
-				    next)->row.is_commit);
+	} while (!applier_last_row(rows)->is_commit);
 }
 
 static int
@@ -830,10 +849,8 @@ applier_unlock(struct latch *latch)
 static int
 applier_apply_tx(struct stailq *rows)
 {
-	struct xrow_header *first_row = &stailq_first_entry(rows,
-					struct applier_tx_row, next)->row;
-	struct xrow_header *last_row;
-	last_row = &stailq_last_entry(rows, struct applier_tx_row, next)->row;
+	struct xrow_header *first_row = applier_first_row(rows);
+	struct xrow_header *last_row = applier_last_row(rows);
 	struct latch *latch = applier_lock(first_row->replica_id);
 
 	if (vclock_get(&replicaset.applier.vclock,
@@ -849,9 +866,7 @@ applier_apply_tx(struct stailq *rows)
 		 */
 		struct xrow_header *tmp;
 		while (true) {
-			tmp = &stailq_first_entry(rows,
-						  struct applier_tx_row,
-						  next)->row;
+			tmp = applier_first_row(rows);
 			if (tmp->lsn <= vclock_get(&replicaset.applier.vclock,
 						   tmp->replica_id)) {
 				stailq_shift(rows);
@@ -1133,12 +1148,12 @@ applier_subscribe(struct applier *applier)
 		applier_read_tx(applier, &rows);
 
 		applier->last_row_time = ev_monotonic_now(loop());
+
 		/*
 		 * In case of an heartbeat message wake a writer up
 		 * and check applier state.
 		 */
-		if (stailq_first_entry(&rows, struct applier_tx_row,
-				       next)->row.lsn == 0)
+		if (applier_first_row(&rows)->lsn == 0)
 			applier_signal_ack(applier);
 		else if (applier_apply_tx(&rows) != 0)
 			diag_raise();
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (5 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-15 15:06   ` Vladislav Shpilevoy
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row Cyrill Gorcunov
  2020-08-15  8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
  8 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Transaction processing code is very heavy simply because
trasactions are carrying various data and involves a number
of other mechanisms to procceed.

In turn, when we receive confirm or rollback packed from
another node in a cluster we just need to inspect limbo
queue and write this packed into a WAL journal. So calling
a bunch of txn engine helpers is simply waste of cycles.

Thus lets rather handle them in a special light way:

 - allocate synchro_entry structure which would carry
   the journal entry itself and encoded message
 - process limbo queue to mark confirmed/rollback'ed
   messages
 - finally write this synchro_entry into a journal

Which is a way more simplier.

Part-of #5129

Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 172 insertions(+), 7 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..a1ce7a23f 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
 	latch_unlock(latch);
 }
 
+struct synchro_entry {
+	/** An applier initiated the syncho request. */
+	struct applier *applier;
+
+	/** Encoded form of a synchro record. */
+	struct synchro_body_bin	body_bin;
+
+	/** xrow to write, used by the journal engine. */
+	struct xrow_header row;
+
+	/**
+	 * The journal entry itself. Note since
+	 * it has unsized array it must be the
+	 * last entry in the structure.
+	 */
+	struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+	free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	struct synchro_entry *synchro_entry =
+		(struct synchro_entry *)entry->complete_data;
+	struct applier *applier = synchro_entry->applier;
+
+	/*
+	 * We can reuse triggers, they are allocated when
+	 * applier get subscribed and since packets handling
+	 * is processed after the subscribtion phase the triggers
+	 * will be alive.
+	 */
+	if (entry->res < 0) {
+		trigger_run(&replicaset.applier.on_rollback, applier);
+		/*
+		 * Restore the last written vlock value.
+		 */
+		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+	} else {
+		trigger_run(&replicaset.applier.on_wal_write, applier);
+	}
+
+	synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct applier *applier,
+		  struct xrow_header *applier_row,
+		  struct synchro_request *req)
+{
+	struct synchro_entry *entry;
+	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+	/*
+	 * For simplicity we use malloc here but
+	 * probably should provide some cache similar
+	 * to txn cache.
+	 */
+	entry = (struct synchro_entry *)malloc(size);
+	if (entry == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+		return NULL;
+	}
+
+	struct journal_entry *journal_entry = &entry->journal_entry;
+	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct xrow_header *row = &entry->row;
+
+	entry->applier = applier;
+	journal_entry->rows[0] = row;
+
+	xrow_encode_synchro(row, body_bin, req);
+
+	row->lsn = applier_row->lsn;
+	row->replica_id = applier_row->replica_id;
+
+	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+			     apply_synchro_row_cb, entry);
+	return entry;
+}
+
+/*
+ * Process a synchro request from incoming applier packet
+ * without using txn engine, for a speed sake.
+ */
+static int
+apply_synchro_row(struct applier *applier, struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+
+	struct latch *latch = applier_lock(row->replica_id);
+	if (vclock_get(&replicaset.applier.vclock,
+		       row->replica_id) >= row->lsn) {
+		applier_unlock(latch);
+		return 0;
+	}
+
+	struct synchro_request req;
+	if (xrow_decode_synchro(row, &req) != 0)
+		goto out;
+
+	if (txn_limbo_process(&txn_limbo, &req))
+		goto out;
+
+	struct synchro_entry *entry;
+	entry = synchro_entry_new(applier, row, &req);
+	if (entry == NULL)
+		goto out;
+
+	if (journal_write_async(&entry->journal_entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		goto out;
+	}
+
+	/*
+	 * In case if something get wrong the journal completion
+	 * handler will set the applier's vclock back to last
+	 * successfully WAL written value.
+	 */
+	vclock_follow(&replicaset.applier.vclock,
+		      row->replica_id, row->lsn);
+	applier_unlock(latch);
+	return 0;
+
+out:
+	diag_log();
+	applier_unlock(latch);
+	return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -1118,7 +1265,13 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle WAL writes and rollbacks. */
+	/*
+	 * Register triggers to handle WAL writes and rollbacks.
+	 *
+	 * Note we use them for syncronous packets handling as well
+	 * thus when changing make sure that synchro handling won't
+	 * be broken.
+	 */
 	struct trigger on_wal_write;
 	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
 	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
@@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
 		applier_read_tx(applier, &rows);
 
 		applier->last_row_time = ev_monotonic_now(loop());
+		struct xrow_header *row = applier_first_row(&rows);
 
-		/*
-		 * In case of an heartbeat message wake a writer up
-		 * and check applier state.
-		 */
-		if (applier_first_row(&rows)->lsn == 0)
+		if (row->lsn == 0) {
+			/*
+			 * In case of an heartbeat message
+			 * wake a writer up and check
+			 * the applier state.
+			 */
 			applier_signal_ack(applier);
-		else if (applier_apply_tx(&rows) != 0)
+		} else if (iproto_type_is_synchro_request(row->type)) {
+			/*
+			 * Make sure synchro messages are never reached
+			 * in a batch (this is by design for simplicity
+			 * sake).
+			 */
+			assert(stailq_first(&rows) == stailq_last(&rows));
+			if (apply_synchro_row(applier, row) != 0)
+				diag_raise();
+		} else if (applier_apply_tx(&rows) != 0) {
 			diag_raise();
+		}
 
 		if (ibuf_used(ibuf) == 0)
 			ibuf_reset(ibuf);
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (6 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Cyrill Gorcunov
@ 2020-08-14 21:14 ` Cyrill Gorcunov
  2020-08-15  8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
  8 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-14 21:14 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Handling of synchro requests now are passed
via separate apply_synchro_row helper so
we no longer need process_synchro_row and
can drop it.

Closes #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/applier.cc | 38 +++++---------------------------------
 1 file changed, 5 insertions(+), 33 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index a1ce7a23f..7652e1acd 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -270,45 +270,17 @@ process_nop(struct request *request)
 	return txn_commit_stmt(txn, request);
 }
 
-/*
- * CONFIRM/ROLLBACK rows aren't dml requests  and require special
- * handling: instead of performing some operations on spaces,
- * processing these requests requires txn_limbo to either confirm
- * or rollback some of its entries.
- */
 static int
-process_synchro_row(struct request *request)
+apply_row(struct xrow_header *row)
 {
-	assert(iproto_type_is_synchro_request(request->header->type));
-	struct txn *txn = in_txn();
+	struct request request;
 
-	struct synchro_request syn_req;
-	if (xrow_decode_synchro(request->header, &syn_req) != 0)
-		return -1;
-	assert(txn->n_applier_rows == 0);
 	/*
-	 * This is not really a transaction. It just uses txn API
-	 * to put the data into WAL. And obviously it should not
-	 * go to the limbo and block on the very same sync
-	 * transaction which it tries to confirm now.
+	 * Synchro requests must never use txn engine,
+	 * instead they are handled separately.
 	 */
-	txn_set_flag(txn, TXN_FORCE_ASYNC);
+	assert(!iproto_type_is_synchro_request(row->type));
 
-	if (txn_begin_stmt(txn, NULL) != 0)
-		return -1;
-	if (txn_commit_stmt(txn, request) != 0)
-		return -1;
-	return txn_limbo_process(&txn_limbo, &syn_req);
-}
-
-static int
-apply_row(struct xrow_header *row)
-{
-	struct request request;
-	if (iproto_type_is_synchro_request(row->type)) {
-		request.header = row;
-		return process_synchro_row(&request);
-	}
 	if (xrow_decode_dml(row, &request, dml_request_key_map(row->type)) != 0)
 		return -1;
 	if (request.type == IPROTO_NOP)
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing
  2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (7 preceding siblings ...)
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row Cyrill Gorcunov
@ 2020-08-15  8:38 ` Cyrill Gorcunov
  2020-08-15 15:06   ` Vladislav Shpilevoy
  8 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-15  8:38 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Since we no longer use txn engine for synchro
packets processing this code is never executed.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
Happen to miss this yesterday. I pushed into the same branch.

 src/box/txn.c | 9 +--------
 1 file changed, 1 insertion(+), 8 deletions(-)

diff --git a/src/box/txn.c b/src/box/txn.c
index cc1f496c5..b2d342355 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -82,14 +82,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
 	 */
 	struct space *space = stmt->space;
 	row->group_id = space != NULL ? space_group_id(space) : 0;
-	/*
-	 * Sychronous replication entries are supplementary and
-	 * aren't valid dml requests. They're encoded manually.
-	 */
-	if (likely(!iproto_type_is_synchro_request(row->type)))
-		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
-	else
-		row->bodycnt = xrow_header_dup_body(row, &txn->region);
+	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
 	if (row->bodycnt < 0)
 		return -1;
 	stmt->row = row;
-- 
2.26.2

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-08-15 15:04   ` Vladislav Shpilevoy
  2020-08-15 16:26     ` Cyrill Gorcunov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-15 15:04 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Hi! Thanks for the patch!

See 3 comments below.

On 14.08.2020 23:14, Cyrill Gorcunov wrote:
> When we need to write CONFIRM or ROLLBACK message (which is
> a binary record in msgpack format) into a journal we use txn code
> to allocate a new transaction, encode there a message and pass it
> to walk the long txn path before it hit the journal. This is not
> only resource wasting but also somehow strange from architectural
> point of view.
> 
> Instead lets encode a record on the stack and write it to the journal
> directly.
> 
> Part-of #5129
> 
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>  src/box/txn_limbo.c | 64 ++++++++++++++++++++++-----------------------
>  1 file changed, 32 insertions(+), 32 deletions(-)
> 
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index ed8c10419..447630d23 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -284,46 +296,34 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint32_t type, int64_t lsn)
>  
>  	/*
>  	 * This is a synchronous commit so we can
> -	 * use body and row allocated on a stack.
> +	 * allocate everything on a stack.
>  	 */

1. Unnecessary change.

>  	struct synchro_body_bin body;
>  	struct xrow_header row;
> -	struct request request = {
> -		.header = &row,
> -	};
> +	char buf[sizeof(struct journal_entry) +
> +		 sizeof(struct xrow_header *)];

2. Is there a guarantee, that 'buf' will be aligned by at least
8 bytes?

>  
> -	struct txn *txn = txn_begin();
> -	if (txn == NULL)
> -		goto rollback;
> +	struct journal_entry *entry = (struct journal_entry *)buf;
> +	entry->rows[0] = &row;
>  
>  	xrow_encode_synchro(&row, &body, &req);
>  
> -	/*
> -	 * This is not really a transaction. It just uses txn API
> -	 * to put the data into WAL. And obviously it should not
> -	 * go to the limbo and block on the very same sync
> -	 * transaction which it tries to confirm now.
> -	 */
> -	txn_set_flag(txn, TXN_FORCE_ASYNC);
> -
> -	if (txn_begin_stmt(txn, NULL) != 0)
> -		goto rollback;
> -	if (txn_commit_stmt(txn, &request) != 0)
> -		goto rollback;
> -	if (txn_commit(txn) != 0)
> -		goto rollback;
> -	return;
> +	journal_entry_create(entry, 1, xrow_approx_len(&row),
> +			     txn_limbo_write_cb, fiber());
>  
> -rollback:
> -	/*
> -	 * XXX: the stub is supposed to be removed once it is defined what to do
> -	 * when a synchro request WAL write fails. One of the possible
> -	 * solutions: log the error, keep the limbo queue as is and probably put
> -	 * in rollback mode. Then provide a hook to call manually when WAL
> -	 * problems are fixed. Or retry automatically with some period.
> -	 */
> -	panic("Could not write a synchro request to WAL: lsn = %lld, type = "
> -	      "%s\n", lsn, iproto_type_name(type));
> +	if (journal_write(entry) != 0 || entry->res < 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		/*
> +		 * XXX: the stub is supposed to be removed once it is defined what to do
> +		 * when a synchro request WAL write fails. One of the possible
> +		 * solutions: log the error, keep the limbo queue as is and probably put
> +		 * in rollback mode. Then provide a hook to call manually when WAL
> +		 * problems are fixed. Or retry automatically with some period.

3. Out of 80 symbols.

> +		 */
> +		panic("Could not write a synchro request to WAL: lsn = %lld, type = "
> +		      "%s\n", lsn, type == IPROTO_CONFIRM ? "CONFIRM" : "ROLLBACK");
> +	}
>  }
>  
>  /**
> 

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov
@ 2020-08-15 15:04   ` Vladislav Shpilevoy
  2020-08-15 16:27     ` Cyrill Gorcunov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-15 15:04 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Thanks for the patch!

On 14.08.2020 23:14, Cyrill Gorcunov wrote:
> We will need to reuse this helpers.
> 
> Part-of #5129
> 
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>  src/box/applier.cc | 43 +++++++++++++++++++++++++++++--------------
>  1 file changed, 29 insertions(+), 14 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 98fb87375..60689f6d3 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -799,6 +799,29 @@ applier_txn_wal_write_cb(struct trigger *trigger, void *event)
>  	return 0;
>  }
>  
> +/*
> + * In a full mesh topology, the same set of changes
> + * may arrive via two concurrently running appliers.
> + * Hence we need a latch to strictly order all changes
> + * that belong to the same server id.
> + */
> +static inline struct latch *
> +applier_lock(uint32_t replica_id)
> +{
> +	struct replica *replica = replica_by_id(replica_id);
> +	struct latch *latch = (replica ? &replica->order_latch :

We usually use != NULL to check if a pointer is not NULL. To emphasize
it is not a boolean variable in the code.

> +			       &replicaset.applier.order_latch);
> +	latch_lock(latch);
> +	return latch;
> +}

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
  2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Cyrill Gorcunov
@ 2020-08-15 15:06   ` Vladislav Shpilevoy
  2020-08-17 12:42     ` Cyrill Gorcunov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-15 15:06 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Thanks for the patch!

See 10 comments below.

On 14.08.2020 23:14, Cyrill Gorcunov wrote:
> Transaction processing code is very heavy simply because
> trasactions are carrying various data and involves a number
> of other mechanisms to procceed.

1. trasactions -> transactions.
   procceed -> proceed.

> In turn, when we receive confirm or rollback packed from
> another node in a cluster we just need to inspect limbo
> queue and write this packed into a WAL journal. So calling
> a bunch of txn engine helpers is simply waste of cycles.
> 
> Thus lets rather handle them in a special light way:
> 
>  - allocate synchro_entry structure which would carry
>    the journal entry itself and encoded message
>  - process limbo queue to mark confirmed/rollback'ed
>    messages
>  - finally write this synchro_entry into a journal
> 
> Which is a way more simplier.

2. 'more simplier' -> simpler. Otherwise looks like 'более лучше'.

> Part-of #5129
> 
> Suggedsted-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
>  src/box/applier.cc | 179 +++++++++++++++++++++++++++++++++++++++++++--
>  1 file changed, 172 insertions(+), 7 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index a71516282..a1ce7a23f 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -841,6 +843,151 @@ applier_unlock(struct latch *latch)
>  	latch_unlock(latch);
>  }
>  
> +struct synchro_entry {
> +	/** An applier initiated the syncho request. */
> +	struct applier *applier;

3. Actually 'applier' is not needed. I looked around and realized it is never
used. I even dropped it and nothing changed.

> +
> +	/** Encoded form of a synchro record. */
> +	struct synchro_body_bin	body_bin;
> +
> +	/** xrow to write, used by the journal engine. */
> +	struct xrow_header row;
> +
> +	/**
> +	 * The journal entry itself. Note since
> +	 * it has unsized array it must be the
> +	 * last entry in the structure.
> +	 */
> +	struct journal_entry journal_entry;
> +};
> +
> +static void
> +synchro_entry_delete(struct synchro_entry *entry)
> +{
> +	free(entry);
> +}
> +
> +/**
> + * Async write journal completion.
> + */
> +static void
> +apply_synchro_row_cb(struct journal_entry *entry)
> +{
> +	assert(entry->complete_data != NULL);
> +	struct synchro_entry *synchro_entry =
> +		(struct synchro_entry *)entry->complete_data;
> +	struct applier *applier = synchro_entry->applier;
> +
> +	/*
> +	 * We can reuse triggers, they are allocated when
> +	 * applier get subscribed and since packets handling
> +	 * is processed after the subscribtion phase the triggers
> +	 * will be alive.

4. subscribtion -> subscription. Also I don't think I understood
the comment.

> +	 */
> +	if (entry->res < 0) {
> +		trigger_run(&replicaset.applier.on_rollback, applier);
> +		/*
> +		 * Restore the last written vlock value.
> +		 */
> +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();

5. Error should be set before running on_rollback, and it should be installed
into replicaset.applier.diag.

> +	} else {
> +		trigger_run(&replicaset.applier.on_wal_write, applier);
> +	}
> +
> +	synchro_entry_delete(synchro_entry);
> +}
> +
> +/**
> + * Allocate a new synchro_entry to be passed to
> + * the journal engine in async write way.
> + */
> +static struct synchro_entry *
> +synchro_entry_new(struct applier *applier,
> +		  struct xrow_header *applier_row,
> +		  struct synchro_request *req)
> +{
> +	struct synchro_entry *entry;
> +	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);

6. Why don't you just add 'struct xrow_header*[1]' to the end of
struct synchro_entry? There is no a case, when the entry is needed
without the xrow_header pointer in the end.

> +
> +	/*
> +	 * For simplicity we use malloc here but
> +	 * probably should provide some cache similar
> +	 * to txn cache.
> +	 */
> +	entry = (struct synchro_entry *)malloc(size);
> +	if (entry == NULL) {
> +		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
> +		return NULL;
> +	}
> +
> +	struct journal_entry *journal_entry = &entry->journal_entry;
> +	struct synchro_body_bin *body_bin = &entry->body_bin;
> +	struct xrow_header *row = &entry->row;
> +
> +	entry->applier = applier;
> +	journal_entry->rows[0] = row;
> +
> +	xrow_encode_synchro(row, body_bin, req);
> +
> +	row->lsn = applier_row->lsn;
> +	row->replica_id = applier_row->replica_id;
> +
> +	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
> +			     apply_synchro_row_cb, entry);
> +	return entry;
> +}
> +
> +/*
> + * Process a synchro request from incoming applier packet
> + * without using txn engine, for a speed sake.

7. It is not about speed. Txn module is fast, it is one of the hottest and
most optimized places in the whole code base. And this is exactly why synchro
requests *should not* be there - they slow down and complicate txn, not vice
versa.

> + */
> +static int
> +apply_synchro_row(struct applier *applier, struct xrow_header *row)
> +{
> +	assert(iproto_type_is_synchro_request(row->type));
> +
> +	struct latch *latch = applier_lock(row->replica_id);
> +	if (vclock_get(&replicaset.applier.vclock,
> +		       row->replica_id) >= row->lsn) {
> +		applier_unlock(latch);
> +		return 0;
> +	}
> +
> +	struct synchro_request req;
> +	if (xrow_decode_synchro(row, &req) != 0)
> +		goto out;
> +
> +	if (txn_limbo_process(&txn_limbo, &req))
> +		goto out;
> +
> +	struct synchro_entry *entry;
> +	entry = synchro_entry_new(applier, row, &req);
> +	if (entry == NULL)
> +		goto out;
> +
> +	if (journal_write_async(&entry->journal_entry) != 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		goto out;
> +	}
> +
> +	/*
> +	 * In case if something get wrong the journal completion
> +	 * handler will set the applier's vclock back to last
> +	 * successfully WAL written value.
> +	 */
> +	vclock_follow(&replicaset.applier.vclock,
> +		      row->replica_id, row->lsn);
> +	applier_unlock(latch);

8. Code duplication is too big. And I wouldn't mind if it was just applier locks,
but vclock propagation and rollback is not that simple.

I think we should do all that inside {applier_apply_tx()}. Because technically you
apply tx - the synchro row is stored inside {struct stailq *rows} which is the tx.

I moved it into {applier_apply_tx()}, and the code became smaller, simpler, with
less duplication, and even less diff. It also allows to drop the commits 5/8 and
6/8. Take a look and lets discuss.

> +	return 0;
> +
> +out:
> +	diag_log();
> +	applier_unlock(latch);
> +	return -1;
> +}
> +
>  /**
>   * Apply all rows in the rows queue as a single transaction.
>   *
> @@ -1148,15 +1301,27 @@ applier_subscribe(struct applier *applier)
>  		applier_read_tx(applier, &rows);
>  
>  		applier->last_row_time = ev_monotonic_now(loop());
> +		struct xrow_header *row = applier_first_row(&rows);
>  
> -		/*
> -		 * In case of an heartbeat message wake a writer up
> -		 * and check applier state.
> -		 */
> -		if (applier_first_row(&rows)->lsn == 0)
> +		if (row->lsn == 0) {
> +			/*
> +			 * In case of an heartbeat message
> +			 * wake a writer up and check
> +			 * the applier state.
> +			 */
>  			applier_signal_ack(applier);
> -		else if (applier_apply_tx(&rows) != 0)
> +		} else if (iproto_type_is_synchro_request(row->type)) {
> +			/*
> +			 * Make sure synchro messages are never reached
> +			 * in a batch (this is by design for simplicity
> +			 * sake).

9. It is not about simplicity. It is about being not necessary. Transactions
exist for DML and DDL (which is also DML on system spaces) only. For other
WAL writes transactions in their common sense don't exist. So each row is a
'transaction'. In future we may want to change that and, for example,
incorporate several synchro requests into a 'tx' (don't know why would we
need that, but it is technically possible).

> +			 */
> +			assert(stailq_first(&rows) == stailq_last(&rows));
> +			if (apply_synchro_row(applier, row) != 0)
> +				diag_raise();
> +		} else if (applier_apply_tx(&rows) != 0) {
>  			diag_raise();
> +		}
>  
>  		if (ibuf_used(ibuf) == 0)
>  			ibuf_reset(ibuf);
> 

10. Consider my changes on top of this commit on your branch. Below I paste my
diff squashed into your commit (on the branch they are not squashed).

====================
diff --git a/src/box/applier.cc b/src/box/applier.cc
index a71516282..dfa62b72a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -51,8 +51,10 @@
 #include "schema.h"
 #include "txn.h"
 #include "box.h"
+#include "xrow.h"
 #include "scoped_guard.h"
 #include "txn_limbo.h"
+#include "journal.h"
 
 STRS(applier_state, applier_STATE);
 
@@ -772,19 +774,9 @@ applier_read_tx(struct applier *applier, struct stailq *rows)
 	} while (!applier_last_row(rows)->is_commit);
 }
 
-static int
-applier_txn_rollback_cb(struct trigger *trigger, void *event)
+static void
+applier_rollback_by_wal_io(void)
 {
-	(void) trigger;
-	struct txn *txn = (struct txn *) event;
-	/*
-	 * Synchronous transaction rollback due to receiving a
-	 * ROLLBACK entry is a normal event and requires no
-	 * special handling.
-	 */
-	if (txn->signature == TXN_SIGNATURE_SYNC_ROLLBACK)
-		return 0;
-
 	/*
 	 * Setup shared applier diagnostic area.
 	 *
@@ -793,19 +785,32 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 	 * diag use per-applier diag instead all the time
 	 * (which actually already present in the structure).
 	 *
-	 * But remember that transactions are asynchronous
-	 * and rollback may happen a way latter after it
-	 * passed to the journal engine.
+	 * But remember that WAL writes are asynchronous and
+	 * rollback may happen a way later after it was passed to
+	 * the journal engine.
 	 */
 	diag_set(ClientError, ER_WAL_IO);
 	diag_set_error(&replicaset.applier.diag,
 		       diag_last_error(diag_get()));
 
-	/* Broadcast the rollback event across all appliers. */
-	trigger_run(&replicaset.applier.on_rollback, event);
-
+	/* Broadcast the rollback across all appliers. */
+	trigger_run(&replicaset.applier.on_rollback, NULL);
 	/* Rollback applier vclock to the committed one. */
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+}
+
+static int
+applier_txn_rollback_cb(struct trigger *trigger, void *event)
+{
+	(void) trigger;
+	struct txn *txn = (struct txn *) event;
+	/*
+	 * Synchronous transaction rollback due to receiving a
+	 * ROLLBACK entry is a normal event and requires no
+	 * special handling.
+	 */
+	if (txn->signature != TXN_SIGNATURE_SYNC_ROLLBACK)
+		applier_rollback_by_wal_io();
 	return 0;
 }
 
@@ -841,6 +846,110 @@ applier_unlock(struct latch *latch)
 	latch_unlock(latch);
 }
 
+struct synchro_entry {
+	/** Encoded form of a synchro record. */
+	struct synchro_body_bin	body_bin;
+
+	/** xrow to write, used by the journal engine. */
+	struct xrow_header row;
+
+	/**
+	 * The journal entry itself. Note since
+	 * it has unsized array it must be the
+	 * last entry in the structure.
+	 */
+	struct journal_entry journal_entry;
+};
+
+static void
+synchro_entry_delete(struct synchro_entry *entry)
+{
+	free(entry);
+}
+
+/**
+ * Async write journal completion.
+ */
+static void
+apply_synchro_row_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	struct synchro_entry *synchro_entry =
+		(struct synchro_entry *)entry->complete_data;
+	if (entry->res < 0)
+		applier_rollback_by_wal_io();
+	else
+		trigger_run(&replicaset.applier.on_wal_write, NULL);
+
+	synchro_entry_delete(synchro_entry);
+}
+
+/**
+ * Allocate a new synchro_entry to be passed to
+ * the journal engine in async write way.
+ */
+static struct synchro_entry *
+synchro_entry_new(struct xrow_header *applier_row,
+		  struct synchro_request *req)
+{
+	struct synchro_entry *entry;
+	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
+
+	/*
+	 * For simplicity we use malloc here but
+	 * probably should provide some cache similar
+	 * to txn cache.
+	 */
+	entry = (struct synchro_entry *)malloc(size);
+	if (entry == NULL) {
+		diag_set(OutOfMemory, size, "malloc", "synchro_entry");
+		return NULL;
+	}
+
+	struct journal_entry *journal_entry = &entry->journal_entry;
+	struct synchro_body_bin *body_bin = &entry->body_bin;
+	struct xrow_header *row = &entry->row;
+
+	journal_entry->rows[0] = row;
+
+	xrow_encode_synchro(row, body_bin, req);
+
+	row->lsn = applier_row->lsn;
+	row->replica_id = applier_row->replica_id;
+
+	journal_entry_create(journal_entry, 1, xrow_approx_len(row),
+			     apply_synchro_row_cb, entry);
+	return entry;
+}
+
+/** Process a synchro request. */
+static int
+apply_synchro_row(struct xrow_header *row)
+{
+	assert(iproto_type_is_synchro_request(row->type));
+
+	struct synchro_request req;
+	if (xrow_decode_synchro(row, &req) != 0)
+		goto err;
+
+	if (txn_limbo_process(&txn_limbo, &req))
+		goto err;
+
+	struct synchro_entry *entry;
+	entry = synchro_entry_new(row, &req);
+	if (entry == NULL)
+		goto err;
+
+	if (journal_write_async(&entry->journal_entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		goto err;
+	}
+	return 0;
+err:
+	diag_log();
+	return -1;
+}
+
 /**
  * Apply all rows in the rows queue as a single transaction.
  *
@@ -876,13 +985,26 @@ applier_apply_tx(struct stailq *rows)
 		}
 	}
 
+	if (unlikely(iproto_type_is_synchro_request(first_row->type))) {
+		/*
+		 * Synchro messages are not transactions, in terms
+		 * of DML. Always sent and written isolated from
+		 * each other.
+		 */
+		assert(first_row == last_row);
+		if (apply_synchro_row(first_row) != 0)
+			diag_raise();
+		goto success;
+	}
+
 	/**
 	 * Explicitly begin the transaction so that we can
 	 * control fiber->gc life cycle and, in case of apply
 	 * conflict safely access failed xrow object and allocate
 	 * IPROTO_NOP on gc.
 	 */
-	struct txn *txn = txn_begin();
+	struct txn *txn;
+	txn = txn_begin();
 	struct applier_tx_row *item;
 	if (txn == NULL) {
 		applier_unlock(latch);
@@ -951,6 +1073,7 @@ applier_apply_tx(struct stailq *rows)
 	if (txn_commit_async(txn) < 0)
 		goto fail;
 
+success:
 	/*
 	 * The transaction was sent to journal so promote vclock.
 	 *
@@ -1118,7 +1241,13 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle WAL writes and rollbacks. */
+	/*
+	 * Register triggers to handle WAL writes and rollbacks.
+	 *
+	 * Note we use them for syncronous packets handling as well
+	 * thus when changing make sure that synchro handling won't
+	 * be broken.
+	 */
 	struct trigger on_wal_write;
 	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
 	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing
  2020-08-15  8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
@ 2020-08-15 15:06   ` Vladislav Shpilevoy
  2020-08-17  8:03     ` Cyrill Gorcunov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-15 15:06 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Thanks for the patch!

On 15.08.2020 10:38, Cyrill Gorcunov wrote:
> Since we no longer use txn engine for synchro
> packets processing this code is never executed.
> 
> Part-of #5129
> 
> Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
> ---
> Happen to miss this yesterday. I pushed into the same branch.
> 
>  src/box/txn.c | 9 +--------
>  1 file changed, 1 insertion(+), 8 deletions(-)
> 
> diff --git a/src/box/txn.c b/src/box/txn.c
> index cc1f496c5..b2d342355 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -82,14 +82,7 @@ txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request)
>  	 */
>  	struct space *space = stmt->space;
>  	row->group_id = space != NULL ? space_group_id(space) : 0;
> -	/*
> -	 * Sychronous replication entries are supplementary and
> -	 * aren't valid dml requests. They're encoded manually.
> -	 */
> -	if (likely(!iproto_type_is_synchro_request(row->type)))
> -		row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
> -	else
> -		row->bodycnt = xrow_header_dup_body(row, &txn->region);

xrow_header_dup_body() can be deleted now.

> +	row->bodycnt = xrow_encode_dml(request, &txn->region, row->body);
>  	if (row->bodycnt < 0)
>  		return -1;
>  	stmt->row = row;
> 

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal
  2020-08-15 15:04   ` Vladislav Shpilevoy
@ 2020-08-15 16:26     ` Cyrill Gorcunov
  0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-15 16:26 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sat, Aug 15, 2020 at 05:04:27PM +0200, Vladislav Shpilevoy wrote:
> > +	char buf[sizeof(struct journal_entry) +
> > +		 sizeof(struct xrow_header *)];
> 
> 2. Is there a guarantee, that 'buf' will be aligned by at least
> 8 bytes?

Yup. On 32bit machines it would be 4 bytes aligned, on 64bit -- 8 byte
(this is basically how stack management works by default).

I'll address the rest of comments. Thanks!

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking
  2020-08-15 15:04   ` Vladislav Shpilevoy
@ 2020-08-15 16:27     ` Cyrill Gorcunov
  0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-15 16:27 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sat, Aug 15, 2020 at 05:04:32PM +0200, Vladislav Shpilevoy wrote:
> > +static inline struct latch *
> > +applier_lock(uint32_t replica_id)
> > +{
> > +	struct replica *replica = replica_by_id(replica_id);
> > +	struct latch *latch = (replica ? &replica->order_latch :
> 
> We usually use != NULL to check if a pointer is not NULL. To emphasize
> it is not a boolean variable in the code.

I simply copied this code from old place. But sure, will add.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing
  2020-08-15 15:06   ` Vladislav Shpilevoy
@ 2020-08-17  8:03     ` Cyrill Gorcunov
  0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17  8:03 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sat, Aug 15, 2020 at 05:06:09PM +0200, Vladislav Shpilevoy wrote:
> 
> xrow_header_dup_body() can be deleted now.

Thanks!

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
  2020-08-15 15:06   ` Vladislav Shpilevoy
@ 2020-08-17 12:42     ` Cyrill Gorcunov
  2020-08-17 20:49       ` Vladislav Shpilevoy
  0 siblings, 1 reply; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-17 12:42 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Sat, Aug 15, 2020 at 05:06:05PM +0200, Vladislav Shpilevoy wrote:
> > +static struct synchro_entry *
> > +synchro_entry_new(struct applier *applier,
> > +		  struct xrow_header *applier_row,
> > +		  struct synchro_request *req)
> > +{
> > +	struct synchro_entry *entry;
> > +	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
> 
> 6. Why don't you just add 'struct xrow_header*[1]' to the end of
> struct synchro_entry? There is no a case, when the entry is needed
> without the xrow_header pointer in the end.

This is forbidden by asan and some other compilers we've in travis runs.
I've been already trying.

Thanks for all comments, Vlad. I've merged your changes. Once tests
are passed I'll send new series out.

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
  2020-08-17 12:42     ` Cyrill Gorcunov
@ 2020-08-17 20:49       ` Vladislav Shpilevoy
  2020-08-18  8:08         ` Cyrill Gorcunov
  0 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2020-08-17 20:49 UTC (permalink / raw)
  To: Cyrill Gorcunov; +Cc: tml

On 17.08.2020 14:42, Cyrill Gorcunov wrote:
> On Sat, Aug 15, 2020 at 05:06:05PM +0200, Vladislav Shpilevoy wrote:
>>> +static struct synchro_entry *
>>> +synchro_entry_new(struct applier *applier,
>>> +		  struct xrow_header *applier_row,
>>> +		  struct synchro_request *req)
>>> +{
>>> +	struct synchro_entry *entry;
>>> +	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
>>
>> 6. Why don't you just add 'struct xrow_header*[1]' to the end of
>> struct synchro_entry? There is no a case, when the entry is needed
>> without the xrow_header pointer in the end.
> 
> This is forbidden by asan and some other compilers we've in travis runs.
> I've been already trying.

We use clang and gcc, just 2. To workaround that I suggest to add
-Wno-gnu-variable-sized-type-not-at-end to compiler.cmake (with that
flag it works, at least on clang - I need you to check it on gcc).

^ permalink raw reply	[flat|nested] 20+ messages in thread

* Re: [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine
  2020-08-17 20:49       ` Vladislav Shpilevoy
@ 2020-08-18  8:08         ` Cyrill Gorcunov
  0 siblings, 0 replies; 20+ messages in thread
From: Cyrill Gorcunov @ 2020-08-18  8:08 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Mon, Aug 17, 2020 at 10:49:00PM +0200, Vladislav Shpilevoy wrote:
> On 17.08.2020 14:42, Cyrill Gorcunov wrote:
> > On Sat, Aug 15, 2020 at 05:06:05PM +0200, Vladislav Shpilevoy wrote:
> >>> +static struct synchro_entry *
> >>> +synchro_entry_new(struct applier *applier,
> >>> +		  struct xrow_header *applier_row,
> >>> +		  struct synchro_request *req)
> >>> +{
> >>> +	struct synchro_entry *entry;
> >>> +	size_t size = sizeof(*entry) + sizeof(struct xrow_header *);
> >>
> >> 6. Why don't you just add 'struct xrow_header*[1]' to the end of
> >> struct synchro_entry? There is no a case, when the entry is needed
> >> without the xrow_header pointer in the end.
> > 
> > This is forbidden by asan and some other compilers we've in travis runs.
> > I've been already trying.
> 
> We use clang and gcc, just 2. To workaround that I suggest to add
> -Wno-gnu-variable-sized-type-not-at-end to compiler.cmake (with that
> flag it works, at least on clang - I need you to check it on gcc).

I think this could be addressed later (if ever) -- we allocate an entry
dinamically and we have similar thing in general xrow allocation, so
in this POV at least they all are unified.

^ permalink raw reply	[flat|nested] 20+ messages in thread

end of thread, other threads:[~2020-08-18  8:08 UTC | newest]

Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-14 21:14 [Tarantool-patches] [PATCH v7 0/8] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 1/8] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 2/8] journal: add journal_entry_create helper Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 3/8] qsync: provide a binary form of syncro entries Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 4/8] qsync: direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-08-15 15:04   ` Vladislav Shpilevoy
2020-08-15 16:26     ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 5/8] applier: factor out latch locking Cyrill Gorcunov
2020-08-15 15:04   ` Vladislav Shpilevoy
2020-08-15 16:27     ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 6/8] applier: add shorthands to queue access Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 7/8] applier: process synchro requests without txn engine Cyrill Gorcunov
2020-08-15 15:06   ` Vladislav Shpilevoy
2020-08-17 12:42     ` Cyrill Gorcunov
2020-08-17 20:49       ` Vladislav Shpilevoy
2020-08-18  8:08         ` Cyrill Gorcunov
2020-08-14 21:14 ` [Tarantool-patches] [PATCH v7 8/8] applier: drop process_synchro_row Cyrill Gorcunov
2020-08-15  8:38 ` [Tarantool-patches] [PATCH v7 9/8] txn: txn_add_redo -- drop synchro processing Cyrill Gorcunov
2020-08-15 15:06   ` Vladislav Shpilevoy
2020-08-17  8:03     ` Cyrill Gorcunov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox