Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine
@ 2020-07-27 14:06 Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 1/6] journal: drop redundant declaration Cyrill Gorcunov
                   ` (6 more replies)
  0 siblings, 7 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

In this series we write CONFIRM/ROLLBACK messages into the WAL directly
without involving the txn engine.

Vlad, take a look please, once time permit.

issue https://github.com/tarantool/tarantool/issues/5129
branch gorcunov/gh-5129-journal-3

v3:
 - bootstrap journal left NULL for async write
 - journal_write_async_cb_t type for async callback
 - struct synchro_body_bin type for encoded message
 - xrow_encode_synchro helper to operate with synchro_body_bin

Cyrill Gorcunov (6):
  journal: drop redundant declaration
  journal: bind asynchronous write completion to an entry
  journal: add journal_entry_create helper
  qsync: provide a binary form of syncro entries
  qsync: implement direct write of CONFIRM/ROLLBACK into a journal
  qsync: drop no longer used encoding helpers

 src/box/box.cc      | 15 ++++----
 src/box/journal.c   |  8 ++--
 src/box/journal.h   | 38 +++++++++++++-----
 src/box/txn.c       |  2 +-
 src/box/txn_limbo.c | 94 ++++++++++++++++++++++++++-------------------
 src/box/vy_log.c    |  2 +-
 src/box/wal.c       | 19 ++++-----
 src/box/wal.h       |  4 +-
 src/box/xrow.c      | 60 ++++++++++-------------------
 src/box/xrow.h      | 47 +++++++++++------------
 10 files changed, 147 insertions(+), 142 deletions(-)

-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 1/6] journal: drop redundant declaration
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
                   ` (5 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

We declare journal_entry right below no need for
more declarations.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/journal.h | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/box/journal.h b/src/box/journal.h
index 1a10e66c3..9049a2ce0 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -40,7 +40,6 @@ extern "C" {
 #endif /* defined(__cplusplus) */
 
 struct xrow_header;
-struct journal_entry;
 
 /**
  * An entry for an abstract journal.
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 1/6] journal: drop redundant declaration Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 20:40   ` Vladislav Shpilevoy
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 3/6] journal: add journal_entry_create helper Cyrill Gorcunov
                   ` (4 subsequent siblings)
  6 siblings, 1 reply; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
wal journal operations such that asynchronous write completion
is a single instance per journal.

It turned out that such simplification is too tight and doesn't
allow us to pass entries into the journal with custom completions.

Thus lets allow back such ability. We will need it to be able
to write "confirm" records into wal directly without touching
transactions code at all.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc    | 15 ++++++++-------
 src/box/journal.c |  2 ++
 src/box/journal.h | 21 ++++++++++++---------
 src/box/txn.c     |  2 +-
 src/box/vy_log.c  |  2 +-
 src/box/wal.c     | 19 ++++++++-----------
 src/box/wal.h     |  4 ++--
 7 files changed, 34 insertions(+), 31 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..3667e1423 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
 	 * Since there're no actual writes, fire a
 	 * journal_async_complete callback right away.
 	 */
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
 	journal_create(&journal.base, recovery_journal_write,
-		       txn_complete_async, recovery_journal_write);
+		       recovery_journal_write);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
@@ -2193,8 +2193,10 @@ engine_init()
 static int
 bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
 {
+	(void)base;
+
 	entry->res = 0;
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -2580,8 +2582,8 @@ box_cfg_xc(void)
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
-		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
+		     &INSTANCE_UUID, on_wal_garbage_collection,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
@@ -2628,8 +2630,7 @@ box_cfg_xc(void)
 	}
 
 	struct journal bootstrap_journal;
-	journal_create(&bootstrap_journal, NULL, txn_complete_async,
-		       bootstrap_journal_write);
+	journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
 	journal_set(&bootstrap_journal);
 	auto bootstrap_journal_guard = make_scoped_guard([] {
 		journal_set(NULL);
diff --git a/src/box/journal.c b/src/box/journal.c
index f1e89aaa2..a2854ed8c 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
 
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_cb_t write_async_cb,
 		  void *complete_data)
 {
 	struct journal_entry *entry;
@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		return NULL;
 	}
 
+	entry->write_async_cb = write_async_cb;
 	entry->complete_data = complete_data;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
diff --git a/src/box/journal.h b/src/box/journal.h
index 9049a2ce0..759eea1da 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -39,8 +39,11 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+struct journal_entry;
 struct xrow_header;
 
+typedef void (*journal_write_async_cb_t)(struct journal_entry *entry);
+
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -60,6 +63,10 @@ struct journal_entry {
 	 * A journal entry completion callback argument.
 	 */
 	void *complete_data;
+	/**
+	 * Asynchronous write completion function.
+	 */
+	journal_write_async_cb_t write_async_cb;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -83,6 +90,7 @@ struct region;
  */
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_cb_t write_async_cb,
 		  void *complete_data);
 
 /**
@@ -95,22 +103,19 @@ struct journal {
 	int (*write_async)(struct journal *journal,
 			   struct journal_entry *entry);
 
-	/** Asynchronous write completion */
-	void (*write_async_cb)(struct journal_entry *entry);
-
 	/** Synchronous write */
 	int (*write)(struct journal *journal,
 		     struct journal_entry *entry);
 };
 
 /**
- * Finalize a single entry.
+ * Complete asynchronous write.
  */
 static inline void
-journal_async_complete(struct journal *journal, struct journal_entry *entry)
+journal_async_complete(struct journal_entry *entry)
 {
-	assert(journal->write_async_cb != NULL);
-	journal->write_async_cb(entry);
+	assert(entry->write_async_cb != NULL);
+	entry->write_async_cb(entry);
 }
 
 /**
@@ -172,12 +177,10 @@ static inline void
 journal_create(struct journal *journal,
 	       int (*write_async)(struct journal *journal,
 				  struct journal_entry *entry),
-	       void (*write_async_cb)(struct journal_entry *entry),
 	       int (*write)(struct journal *journal,
 			    struct journal_entry *entry))
 {
 	journal->write_async	= write_async;
-	journal->write_async_cb	= write_async_cb;
 	journal->write		= write;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index 9c21258c5..cc1f496c5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
 
 	/* Save space for an additional NOP row just in case. */
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
-				&txn->region, txn);
+				&txn->region, txn_complete_async, txn);
 	if (req == NULL)
 		return NULL;
 
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 311985c72..de4c5205c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 	size_t used = region_used(&fiber()->gc);
 
 	struct journal_entry *entry;
-	entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
+	entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 37a8bd483..f22f2a6c5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_async_complete(&writer->base, req);
+		journal_async_complete(req);
 }
 
 /**
@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
-		  void (*wall_async_cb)(struct journal_entry *entry),
-		  const char *wal_dirname,
-		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+		  const char *wal_dirname, int64_t wal_max_size,
+		  const struct tt_uuid *instance_uuid,
 		  wal_on_garbage_collection_f on_garbage_collection,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	journal_create(&writer->base,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none_async : wal_write_async,
-		       wall_async_cb,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none : wal_write);
 
@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
 }
 
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
-	wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
-			  wal_max_size, instance_uuid, on_garbage_collection,
+	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
+			  instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
@@ -1304,7 +1301,7 @@ wal_write_none_async(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_async_complete(journal, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
diff --git a/src/box/wal.h b/src/box/wal.h
index f348dc636..9d0cada46 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
  * Start WAL thread and initialize WAL writer.
  */
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 3/6] journal: add journal_entry_create helper
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 1/6] journal: drop redundant declaration Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 4/6] qsync: provide a binary form of syncro entries Cyrill Gorcunov
                   ` (3 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

To create raw journal entries. We will use it
to write confirm/rollback entries.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/journal.c |  8 ++------
 src/box/journal.h | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/src/box/journal.c b/src/box/journal.c
index a2854ed8c..79f35beb1 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -51,11 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		return NULL;
 	}
 
-	entry->write_async_cb = write_async_cb;
-	entry->complete_data = complete_data;
-	entry->approx_len = 0;
-	entry->n_rows = n_rows;
-	entry->res = -1;
-
+	journal_entry_create(entry, n_rows, 0, write_async_cb,
+			     complete_data);
 	return entry;
 }
diff --git a/src/box/journal.h b/src/box/journal.h
index 759eea1da..916cb074d 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -83,6 +83,22 @@ struct journal_entry {
 
 struct region;
 
+/**
+ * Initialize a new journal entry.
+ */
+static inline void
+journal_entry_create(struct journal_entry *entry, size_t n_rows,
+		     size_t approx_len,
+		     journal_write_async_cb_t write_async_cb,
+		     void *complete_data)
+{
+	entry->write_async_cb	= write_async_cb;
+	entry->complete_data	= complete_data;
+	entry->approx_len	= approx_len;
+	entry->n_rows		= n_rows;
+	entry->res		= -1;
+}
+
 /**
  * Create a new journal entry.
  *
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 4/6] qsync: provide a binary form of syncro entries
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (2 preceding siblings ...)
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 3/6] journal: add journal_entry_create helper Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
                   ` (2 subsequent siblings)
  6 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

These msgpack entries will be needed to write them
down to a journal without involving txn engine. Same
time we would like to be able to allocate them on stack,
for this sake the binary form is predefined.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/xrow.c | 62 +++++++++++++++++++++++++++-----------------------
 src/box/xrow.h | 25 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 28 deletions(-)

diff --git a/src/box/xrow.c b/src/box/xrow.c
index 0c797a9d5..3609dae0f 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -893,51 +893,57 @@ xrow_encode_dml(const struct request *request, struct region *region,
 	return iovcnt;
 }
 
-static int
-xrow_encode_confirm_rollback(struct xrow_header *row, struct region *region,
-			     uint32_t replica_id, int64_t lsn, int type)
+void
+xrow_encode_synchro(struct xrow_header *row,
+		    struct synchro_body_bin *body,
+		    uint32_t replica_id, int64_t lsn,
+		    int type)
 {
-	size_t len = mp_sizeof_map(2) + mp_sizeof_uint(IPROTO_REPLICA_ID) +
-		     mp_sizeof_uint(replica_id) + mp_sizeof_uint(IPROTO_LSN) +
-		     mp_sizeof_uint(lsn);
-	char *buf = (char *)region_alloc(region, len);
-	if (buf == NULL) {
-		diag_set(OutOfMemory, len, "region_alloc", "buf");
-		return -1;
-	}
-	char *pos = buf;
-
-	pos = mp_encode_map(pos, 2);
-	pos = mp_encode_uint(pos, IPROTO_REPLICA_ID);
-	pos = mp_encode_uint(pos, replica_id);
-	pos = mp_encode_uint(pos, IPROTO_LSN);
-	pos = mp_encode_uint(pos, lsn);
+	/*
+	 * A map with two elements. We don't compress
+	 * numbers to have this structure constant in size,
+	 * which will allow us to preallocate it on stack.
+	 */
+	body->m_body = 0x80 | 2;
+	body->k_replica_id = IPROTO_REPLICA_ID;
+	body->m_replica_id = 0xce;
+	body->v_replica_id = mp_bswap_u32(replica_id);
+	body->k_lsn = IPROTO_LSN;
+	body->m_lsn = 0xcf;
+	body->v_lsn = mp_bswap_u64(lsn);
 
 	memset(row, 0, sizeof(*row));
 
-	row->body[0].iov_base = buf;
-	row->body[0].iov_len = len;
-	row->bodycnt = 1;
-
 	row->type = type;
-
-	return 0;
+	row->body[0].iov_base = (void *)body;
+	row->body[0].iov_len = sizeof(*body);
+	row->bodycnt = 1;
 }
 
 int
 xrow_encode_confirm(struct xrow_header *row, struct region *region,
 		    uint32_t replica_id, int64_t lsn)
 {
-	return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
-					    IPROTO_CONFIRM);
+	struct synchro_body_bin *body = region_alloc(region, sizeof(*body));
+	if (body == NULL) {
+		diag_set(OutOfMemory, sizeof(*body), "region_alloc", "body");
+		return -1;
+	}
+	xrow_encode_synchro(row, body, replica_id, lsn, IPROTO_CONFIRM);
+	return 0;
 }
 
 int
 xrow_encode_rollback(struct xrow_header *row, struct region *region,
 		     uint32_t replica_id, int64_t lsn)
 {
-	return xrow_encode_confirm_rollback(row, region, replica_id, lsn,
-					    IPROTO_ROLLBACK);
+	struct synchro_body_bin *body = region_alloc(region, sizeof(*body));
+	if (body == NULL) {
+		diag_set(OutOfMemory, sizeof(*body), "region_alloc", "body");
+		return -1;
+	}
+	xrow_encode_synchro(row, body, replica_id, lsn, IPROTO_ROLLBACK);
+	return 0;
 }
 
 static int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index e21ede5a3..522bab259 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -215,6 +215,31 @@ int
 xrow_encode_dml(const struct request *request, struct region *region,
 		struct iovec *iov);
 
+/** Synchro request xrow's body in MsgPack format. */
+struct PACKED synchro_body_bin {
+	uint8_t m_body;
+	uint8_t k_replica_id;
+	uint8_t m_replica_id;
+	uint32_t v_replica_id;
+	uint8_t k_lsn;
+	uint8_t m_lsn;
+	uint64_t v_lsn;
+};
+
+/**
+ * Encode synchro request body into the xrow.
+ * @param xrow Destination xrow.
+ * @param body Destination body.
+ * @param replica_id Master's instance id.
+ * @param lsn Last confirmed lsn.
+ * @param type Request type.
+ */
+void
+xrow_encode_synchro(struct xrow_header *row,
+		    struct synchro_body_bin *body,
+		    uint32_t replica_id, int64_t lsn,
+		    int type);
+
 /**
  * Encode the CONFIRM to row body and set row type to
  * IPROTO_CONFIRM.
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (3 preceding siblings ...)
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 4/6] qsync: provide a binary form of syncro entries Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 20:41   ` Vladislav Shpilevoy
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 6/6] qsync: drop no longer used encoding helpers Cyrill Gorcunov
  2020-07-27 14:12 ` [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  6 siblings, 1 reply; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

When we need to write CONFIRM or ROLLBACK message (which is just
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.

Instead lets encode a record on the stack and write it to the journal
directly.

Closes #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn_limbo.c | 94 ++++++++++++++++++++++++++-------------------
 1 file changed, 54 insertions(+), 40 deletions(-)

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a74bfe244..d8cf6a6f6 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,9 @@
 #include "txn_limbo.h"
 #include "replication.h"
 
+#include "iproto_constants.h"
+#include "journal.h"
+
 struct txn_limbo txn_limbo;
 
 static inline void
@@ -237,62 +240,73 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return 0;
 }
 
+/**
+ * A callback for synchronous write: txn_limbo_write fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	fiber_wakeup(entry->complete_data);
+}
+
+/**
+ * Write CONFIRM or ROLLBACK message to a journal directly
+ * without involving transaction engine because using txn
+ * engine is far from being cheap while we only need to
+ * write a small journal entry.
+ */
 static int
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
-				 bool is_confirm)
+txn_limbo_write(uint32_t replica_id, int64_t lsn, int type)
 {
+	assert(replica_id != REPLICA_ID_NIL);
+	assert(type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK);
 	assert(lsn > 0);
 
+	/*
+	 * When allocated statically some compilers (such as
+	 * clang + asan) requires the journal_entry::rows to
+	 * be last in a container structure. So it it simplier
+	 * just to create a cummulative buffer.
+	 */
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+
+	struct synchro_body_bin body_bin;
 	struct xrow_header row;
-	struct request request = {
-		.header = &row,
-	};
 
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
 
-	int res = 0;
-	if (is_confirm) {
-		res = xrow_encode_confirm(&row, &txn->region,
-					  limbo->instance_id, lsn);
-	} else {
-		/*
-		 * This LSN is the first to be rolled back, so
-		 * the last "safe" lsn is lsn - 1.
-		 */
-		res = xrow_encode_rollback(&row, &txn->region,
-					   limbo->instance_id, lsn);
+	xrow_encode_synchro(&row, &body_bin, replica_id, lsn, type);
+
+	journal_entry_create(entry, 1, xrow_approx_len(&row),
+			     txn_limbo_write_cb, fiber());
+
+	if (journal_write(entry) != 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
 	}
-	if (res == -1)
-		goto rollback;
-	/*
-	 * This is not really a transaction. It just uses txn API
-	 * to put the data into WAL. And obviously it should not
-	 * go to the limbo and block on the very same sync
-	 * transaction which it tries to confirm now.
-	 */
-	txn_set_flag(txn, TXN_FORCE_ASYNC);
 
-	if (txn_begin_stmt(txn, NULL) != 0)
-		goto rollback;
-	if (txn_commit_stmt(txn, &request) != 0)
-		goto rollback;
+	if (entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
 
-	return txn_commit(txn);
-rollback:
-	txn_rollback(txn);
-	return -1;
+	return 0;
 }
 
 /**
  * Write a confirmation entry to WAL. After it's written all the
  * transactions waiting for confirmation may be finished.
  */
-static int
+static inline int
 txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
+	return txn_limbo_write(limbo->instance_id, lsn, IPROTO_CONFIRM);
 }
 
 void
@@ -338,10 +352,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
  * transactions following the current one and waiting for
  * confirmation must be rolled back.
  */
-static int
+static inline int
 txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
+	return txn_limbo_write(limbo->instance_id, lsn, IPROTO_ROLLBACK);
 }
 
 void
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [Tarantool-patches] [PATCH v3 6/6] qsync: drop no longer used encoding helpers
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (4 preceding siblings ...)
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-07-27 14:06 ` Cyrill Gorcunov
  2020-07-27 14:12 ` [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
  6 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:06 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/xrow.c | 26 --------------------------
 src/box/xrow.h | 28 ----------------------------
 2 files changed, 54 deletions(-)

diff --git a/src/box/xrow.c b/src/box/xrow.c
index 3609dae0f..21d949ba5 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -920,32 +920,6 @@ xrow_encode_synchro(struct xrow_header *row,
 	row->bodycnt = 1;
 }
 
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
-		    uint32_t replica_id, int64_t lsn)
-{
-	struct synchro_body_bin *body = region_alloc(region, sizeof(*body));
-	if (body == NULL) {
-		diag_set(OutOfMemory, sizeof(*body), "region_alloc", "body");
-		return -1;
-	}
-	xrow_encode_synchro(row, body, replica_id, lsn, IPROTO_CONFIRM);
-	return 0;
-}
-
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
-		     uint32_t replica_id, int64_t lsn)
-{
-	struct synchro_body_bin *body = region_alloc(region, sizeof(*body));
-	if (body == NULL) {
-		diag_set(OutOfMemory, sizeof(*body), "region_alloc", "body");
-		return -1;
-	}
-	xrow_encode_synchro(row, body, replica_id, lsn, IPROTO_ROLLBACK);
-	return 0;
-}
-
 static int
 xrow_decode_confirm_rollback(struct xrow_header *row, uint32_t *replica_id,
 			     int64_t *lsn)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 522bab259..2ecbe3ce0 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -240,20 +240,6 @@ xrow_encode_synchro(struct xrow_header *row,
 		    uint32_t replica_id, int64_t lsn,
 		    int type);
 
-/**
- * Encode the CONFIRM to row body and set row type to
- * IPROTO_CONFIRM.
- * @param row xrow header.
- * @param region Region to use to encode the confirmation body.
- * @param replica_id master's instance id.
- * @param lsn last confirmed lsn.
- * @retval -1 on error.
- * @retval 0 success.
- */
-int
-xrow_encode_confirm(struct xrow_header *row, struct region *region,
-		    uint32_t replica_id, int64_t lsn);
-
 /**
  * Decode the CONFIRM request body.
  * @param row xrow header.
@@ -265,20 +251,6 @@ xrow_encode_confirm(struct xrow_header *row, struct region *region,
 int
 xrow_decode_confirm(struct xrow_header *row, uint32_t *replica_id, int64_t *lsn);
 
-/**
- * Encode the ROLLBACK row body and set row type to
- * IPROTO_ROLLBACK.
- * @param row xrow header.
- * @param region Region to use to encode the rollback body.
- * @param replica_id master's instance id.
- * @param lsn lsn to rollback from, including it.
- * @retval -1  on error.
- * @retval 0 success.
- */
-int
-xrow_encode_rollback(struct xrow_header *row, struct region *region,
-		     uint32_t replica_id, int64_t lsn);
-
 /**
  * Decode the ROLLBACK row body.
  * @param row xrow header.
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine
  2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
                   ` (5 preceding siblings ...)
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 6/6] qsync: drop no longer used encoding helpers Cyrill Gorcunov
@ 2020-07-27 14:12 ` Cyrill Gorcunov
  6 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 14:12 UTC (permalink / raw)
  To: tml; +Cc: Vladislav Shpilevoy

On Mon, Jul 27, 2020 at 05:06:44PM +0300, Cyrill Gorcunov wrote:
> In this series we write CONFIRM/ROLLBACK messages into the WAL directly
> without involving the txn engine.
> 
> Vlad, take a look please, once time permit.
> 

And I remember about txn_add_redo. Looking into it now...

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
@ 2020-07-27 20:40   ` Vladislav Shpilevoy
  2020-07-27 21:37     ` Cyrill Gorcunov
  0 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-27 20:40 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Hi! Thanks for the patch!

> diff --git a/src/box/journal.h b/src/box/journal.h
> index 9049a2ce0..759eea1da 100644
> --- a/src/box/journal.h
> +++ b/src/box/journal.h
> @@ -39,8 +39,11 @@
>  extern "C" {
>  #endif /* defined(__cplusplus) */
>  
> +struct journal_entry;

1. You removed this declaration in the previous commit. I guess you
don't need the latter now.

>  struct xrow_header;
>  
> +typedef void (*journal_write_async_cb_t)(struct journal_entry *entry);

2. Better replace _cb_t with _f. Or with just _t. Look at key_def.h and
xrow_update_field.h for examples.

> +
>  /**
>   * An entry for an abstract journal.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal
  2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
@ 2020-07-27 20:41   ` Vladislav Shpilevoy
  2020-07-27 21:39     ` Cyrill Gorcunov
  0 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-27 20:41 UTC (permalink / raw)
  To: Cyrill Gorcunov, tml

Thanks for the patch!

> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index a74bfe244..d8cf6a6f6 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -237,62 +240,73 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>  	return 0;
>  }
>  
> +/**
> + * A callback for synchronous write: txn_limbo_write fiber
> + * waiting to proceed once a record is written to WAL.
> + */
> +static void
> +txn_limbo_write_cb(struct journal_entry *entry)
> +{
> +	assert(entry->complete_data != NULL);
> +	fiber_wakeup(entry->complete_data);
> +}
> +
> +/**
> + * Write CONFIRM or ROLLBACK message to a journal directly
> + * without involving transaction engine because using txn
> + * engine is far from being cheap while we only need to
> + * write a small journal entry.
> + */
>  static int
> -txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
> -				 bool is_confirm)
> +txn_limbo_write(uint32_t replica_id, int64_t lsn, int type)
>  {
> +	assert(replica_id != REPLICA_ID_NIL);
> +	assert(type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK);
>  	assert(lsn > 0);
>  
> +	/*
> +	 * When allocated statically some compilers (such as
> +	 * clang + asan) requires the journal_entry::rows to
> +	 * be last in a container structure. So it it simplier
> +	 * just to create a cummulative buffer.
> +	 */
> +	char buf[sizeof(struct journal_entry) +
> +		 sizeof(struct xrow_header *)];
> +
> +	struct synchro_body_bin body_bin;
>  	struct xrow_header row;
> -	struct request request = {
> -		.header = &row,
> -	};
>  
> -	struct txn *txn = txn_begin();
> -	if (txn == NULL)
> -		return -1;
> +	struct journal_entry *entry = (struct journal_entry *)buf;
> +	entry->rows[0] = &row;
>  
> -	int res = 0;
> -	if (is_confirm) {
> -		res = xrow_encode_confirm(&row, &txn->region,
> -					  limbo->instance_id, lsn);
> -	} else {
> -		/*
> -		 * This LSN is the first to be rolled back, so
> -		 * the last "safe" lsn is lsn - 1.
> -		 */
> -		res = xrow_encode_rollback(&row, &txn->region,
> -					   limbo->instance_id, lsn);
> +	xrow_encode_synchro(&row, &body_bin, replica_id, lsn, type);
> +
> +	journal_entry_create(entry, 1, xrow_approx_len(&row),
> +			     txn_limbo_write_cb, fiber());
> +
> +	if (journal_write(entry) != 0) {
> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		return -1;
>  	}
> -	if (res == -1)
> -		goto rollback;
> -	/*
> -	 * This is not really a transaction. It just uses txn API
> -	 * to put the data into WAL. And obviously it should not
> -	 * go to the limbo and block on the very same sync
> -	 * transaction which it tries to confirm now.
> -	 */
> -	txn_set_flag(txn, TXN_FORCE_ASYNC);
>  
> -	if (txn_begin_stmt(txn, NULL) != 0)
> -		goto rollback;
> -	if (txn_commit_stmt(txn, &request) != 0)
> -		goto rollback;
> +	if (entry->res < 0) {

I suggest simply add '|| entry->res < 0' to the check above. To
make a single error handling point.

> +		diag_set(ClientError, ER_WAL_IO);
> +		diag_log();
> +		return -1;
> +	}
>  

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry
  2020-07-27 20:40   ` Vladislav Shpilevoy
@ 2020-07-27 21:37     ` Cyrill Gorcunov
  0 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 21:37 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Mon, Jul 27, 2020 at 10:40:47PM +0200, Vladislav Shpilevoy wrote:
> Hi! Thanks for the patch!
> 
> > diff --git a/src/box/journal.h b/src/box/journal.h
> > index 9049a2ce0..759eea1da 100644
> > --- a/src/box/journal.h
> > +++ b/src/box/journal.h
> > @@ -39,8 +39,11 @@
> >  extern "C" {
> >  #endif /* defined(__cplusplus) */
> >  
> > +struct journal_entry;
> 
> 1. You removed this declaration in the previous commit. I guess you
> don't need the latter now.

True, happened during reworking sources. Thanks!

> 
> >  struct xrow_header;
> >  
> > +typedef void (*journal_write_async_cb_t)(struct journal_entry *entry);
> 
> 2. Better replace _cb_t with _f. Or with just _t. Look at key_def.h and
> xrow_update_field.h for examples.

Renamed to journal_write_async_f. Vlad, I've pushed the updated version
to gorcunov/gh-5129-journal-4 where these comments are addressed.

I append the patch here but since the series get shunk by first patch
I think better to not resend the whole series again.
---
From: Cyrill Gorcunov <gorcunov@gmail.com>
Date: Tue, 21 Jul 2020 15:34:38 +0300
Subject: [PATCH 1/5] journal: bind asynchronous write completion to an entry

In commit 77ba0e3504464131fe81c672d508d0275be2173a we've redesigned
wal journal operations such that asynchronous write completion
is a single instance per journal.

It turned out that such simplification is too tight and doesn't
allow us to pass entries into the journal with custom completions.

Thus lets allow back such ability. We will need it to be able
to write "confirm" records into wal directly without touching
transactions code at all.

Part-of #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/box.cc    | 15 ++++++++-------
 src/box/journal.c |  2 ++
 src/box/journal.h | 20 +++++++++++---------
 src/box/txn.c     |  2 +-
 src/box/vy_log.c  |  2 +-
 src/box/wal.c     | 19 ++++++++-----------
 src/box/wal.h     |  4 ++--
 7 files changed, 33 insertions(+), 31 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 83eef5d98..3667e1423 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -348,7 +348,7 @@ recovery_journal_write(struct journal *base,
 	 * Since there're no actual writes, fire a
 	 * journal_async_complete callback right away.
 	 */
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -357,7 +357,7 @@ recovery_journal_create(struct vclock *v)
 {
 	static struct recovery_journal journal;
 	journal_create(&journal.base, recovery_journal_write,
-		       txn_complete_async, recovery_journal_write);
+		       recovery_journal_write);
 	journal.vclock = v;
 	journal_set(&journal.base);
 }
@@ -2193,8 +2193,10 @@ engine_init()
 static int
 bootstrap_journal_write(struct journal *base, struct journal_entry *entry)
 {
+	(void)base;
+
 	entry->res = 0;
-	journal_async_complete(base, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
@@ -2580,8 +2582,8 @@ box_cfg_xc(void)
 
 	int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
-	if (wal_init(wal_mode, txn_complete_async, cfg_gets("wal_dir"),
-		     wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+	if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_size,
+		     &INSTANCE_UUID, on_wal_garbage_collection,
 		     on_wal_checkpoint_threshold) != 0) {
 		diag_raise();
 	}
@@ -2628,8 +2630,7 @@ box_cfg_xc(void)
 	}
 
 	struct journal bootstrap_journal;
-	journal_create(&bootstrap_journal, NULL, txn_complete_async,
-		       bootstrap_journal_write);
+	journal_create(&bootstrap_journal, NULL, bootstrap_journal_write);
 	journal_set(&bootstrap_journal);
 	auto bootstrap_journal_guard = make_scoped_guard([] {
 		journal_set(NULL);
diff --git a/src/box/journal.c b/src/box/journal.c
index f1e89aaa2..48af9157b 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -36,6 +36,7 @@ struct journal *current_journal = NULL;
 
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_f write_async_cb,
 		  void *complete_data)
 {
 	struct journal_entry *entry;
@@ -50,6 +51,7 @@ journal_entry_new(size_t n_rows, struct region *region,
 		return NULL;
 	}
 
+	entry->write_async_cb = write_async_cb;
 	entry->complete_data = complete_data;
 	entry->approx_len = 0;
 	entry->n_rows = n_rows;
diff --git a/src/box/journal.h b/src/box/journal.h
index 1a10e66c3..4b019fecf 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -42,6 +42,8 @@ extern "C" {
 struct xrow_header;
 struct journal_entry;
 
+typedef void (*journal_write_async_f)(struct journal_entry *entry);
+
 /**
  * An entry for an abstract journal.
  * Simply put, a write ahead log request.
@@ -61,6 +63,10 @@ struct journal_entry {
 	 * A journal entry completion callback argument.
 	 */
 	void *complete_data;
+	/**
+	 * Asynchronous write completion function.
+	 */
+	journal_write_async_f write_async_cb;
 	/**
 	 * Approximate size of this request when encoded.
 	 */
@@ -84,6 +90,7 @@ struct region;
  */
 struct journal_entry *
 journal_entry_new(size_t n_rows, struct region *region,
+		  journal_write_async_f write_async_cb,
 		  void *complete_data);
 
 /**
@@ -96,22 +103,19 @@ struct journal {
 	int (*write_async)(struct journal *journal,
 			   struct journal_entry *entry);
 
-	/** Asynchronous write completion */
-	void (*write_async_cb)(struct journal_entry *entry);
-
 	/** Synchronous write */
 	int (*write)(struct journal *journal,
 		     struct journal_entry *entry);
 };
 
 /**
- * Finalize a single entry.
+ * Complete asynchronous write.
  */
 static inline void
-journal_async_complete(struct journal *journal, struct journal_entry *entry)
+journal_async_complete(struct journal_entry *entry)
 {
-	assert(journal->write_async_cb != NULL);
-	journal->write_async_cb(entry);
+	assert(entry->write_async_cb != NULL);
+	entry->write_async_cb(entry);
 }
 
 /**
@@ -173,12 +177,10 @@ static inline void
 journal_create(struct journal *journal,
 	       int (*write_async)(struct journal *journal,
 				  struct journal_entry *entry),
-	       void (*write_async_cb)(struct journal_entry *entry),
 	       int (*write)(struct journal *journal,
 			    struct journal_entry *entry))
 {
 	journal->write_async	= write_async;
-	journal->write_async_cb	= write_async_cb;
 	journal->write		= write;
 }
 
diff --git a/src/box/txn.c b/src/box/txn.c
index 9c21258c5..cc1f496c5 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -551,7 +551,7 @@ txn_journal_entry_new(struct txn *txn)
 
 	/* Save space for an additional NOP row just in case. */
 	req = journal_entry_new(txn->n_new_rows + txn->n_applier_rows + 1,
-				&txn->region, txn);
+				&txn->region, txn_complete_async, txn);
 	if (req == NULL)
 		return NULL;
 
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 311985c72..de4c5205c 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -818,7 +818,7 @@ vy_log_tx_flush(struct vy_log_tx *tx)
 	size_t used = region_used(&fiber()->gc);
 
 	struct journal_entry *entry;
-	entry = journal_entry_new(tx_size, &fiber()->gc, NULL);
+	entry = journal_entry_new(tx_size, &fiber()->gc, NULL, NULL);
 	if (entry == NULL)
 		goto err;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 37a8bd483..f22f2a6c5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -266,10 +266,9 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	struct wal_writer *writer = &wal_writer_singleton;
 	struct journal_entry *req, *tmp;
 	stailq_foreach_entry_safe(req, tmp, queue, fifo)
-		journal_async_complete(&writer->base, req);
+		journal_async_complete(req);
 }
 
 /**
@@ -403,9 +402,8 @@ tx_notify_checkpoint(struct cmsg *msg)
  */
 static void
 wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
-		  void (*wall_async_cb)(struct journal_entry *entry),
-		  const char *wal_dirname,
-		  int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+		  const char *wal_dirname, int64_t wal_max_size,
+		  const struct tt_uuid *instance_uuid,
 		  wal_on_garbage_collection_f on_garbage_collection,
 		  wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
@@ -415,7 +413,6 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	journal_create(&writer->base,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none_async : wal_write_async,
-		       wall_async_cb,
 		       wal_mode == WAL_NONE ?
 		       wal_write_none : wal_write);
 
@@ -525,15 +522,15 @@ wal_open(struct wal_writer *writer)
 }
 
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold)
 {
 	/* Initialize the state. */
 	struct wal_writer *writer = &wal_writer_singleton;
-	wal_writer_create(writer, wal_mode, wall_async_cb, wal_dirname,
-			  wal_max_size, instance_uuid, on_garbage_collection,
+	wal_writer_create(writer, wal_mode, wal_dirname, wal_max_size,
+			  instance_uuid, on_garbage_collection,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
@@ -1304,7 +1301,7 @@ wal_write_none_async(struct journal *journal,
 	vclock_merge(&writer->vclock, &vclock_diff);
 	vclock_copy(&replicaset.vclock, &writer->vclock);
 	entry->res = vclock_sum(&writer->vclock);
-	journal_async_complete(journal, entry);
+	journal_async_complete(entry);
 	return 0;
 }
 
diff --git a/src/box/wal.h b/src/box/wal.h
index f348dc636..9d0cada46 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -81,8 +81,8 @@ typedef void (*wal_on_checkpoint_threshold_f)(void);
  * Start WAL thread and initialize WAL writer.
  */
 int
-wal_init(enum wal_mode wal_mode, void (*wall_async_cb)(struct journal_entry *entry),
-	 const char *wal_dirname, int64_t wal_max_size, const struct tt_uuid *instance_uuid,
+wal_init(enum wal_mode wal_mode, const char *wal_dirname,
+	 int64_t wal_max_size, const struct tt_uuid *instance_uuid,
 	 wal_on_garbage_collection_f on_garbage_collection,
 	 wal_on_checkpoint_threshold_f on_checkpoint_threshold);
 
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal
  2020-07-27 20:41   ` Vladislav Shpilevoy
@ 2020-07-27 21:39     ` Cyrill Gorcunov
  0 siblings, 0 replies; 12+ messages in thread
From: Cyrill Gorcunov @ 2020-07-27 21:39 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tml

On Mon, Jul 27, 2020 at 10:41:17PM +0200, Vladislav Shpilevoy wrote:
> > +	if (entry->res < 0) {
> 
> I suggest simply add '|| entry->res < 0' to the check above. To
> make a single error handling point.

Done. Pushed to gorcunov/gh-5129-journal-4
---
From: Cyrill Gorcunov <gorcunov@gmail.com>
Date: Thu, 23 Jul 2020 14:32:31 +0300
Subject: [PATCH 4/5] qsync: implement direct write of CONFIRM/ROLLBACK into a journal

When we need to write CONFIRM or ROLLBACK message (which is just
a binary record in msgpack format) into a journal we use txn code
to allocate a new transaction, encode there a message and pass it
to walk the long txn path before it hit the journal. This is not
only resource wasting but also somehow strange from architectural
point of view.

Instead lets encode a record on the stack and write it to the journal
directly.

Closes #5129

Signed-off-by: Cyrill Gorcunov <gorcunov@gmail.com>
---
 src/box/txn_limbo.c | 90 ++++++++++++++++++++++++---------------------
 1 file changed, 49 insertions(+), 41 deletions(-)

diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index a74bfe244..f1c54d824 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -32,6 +32,9 @@
 #include "txn_limbo.h"
 #include "replication.h"
 
+#include "iproto_constants.h"
+#include "journal.h"
+
 struct txn_limbo txn_limbo;
 
 static inline void
@@ -237,62 +240,67 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
 	return 0;
 }
 
+/**
+ * A callback for synchronous write: txn_limbo_write fiber
+ * waiting to proceed once a record is written to WAL.
+ */
+static void
+txn_limbo_write_cb(struct journal_entry *entry)
+{
+	assert(entry->complete_data != NULL);
+	fiber_wakeup(entry->complete_data);
+}
+
+/**
+ * Write CONFIRM or ROLLBACK message to a journal directly
+ * without involving transaction engine because using txn
+ * engine is far from being cheap while we only need to
+ * write a small journal entry.
+ */
 static int
-txn_limbo_write_confirm_rollback(struct txn_limbo *limbo, int64_t lsn,
-				 bool is_confirm)
+txn_limbo_write(uint32_t replica_id, int64_t lsn, int type)
 {
+	assert(replica_id != REPLICA_ID_NIL);
+	assert(type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK);
 	assert(lsn > 0);
 
+	/*
+	 * When allocated statically some compilers (such as
+	 * clang + asan) requires the journal_entry::rows to
+	 * be last in a container structure. So it it simplier
+	 * just to create a cummulative buffer.
+	 */
+	char buf[sizeof(struct journal_entry) +
+		 sizeof(struct xrow_header *)];
+
+	struct synchro_body_bin body_bin;
 	struct xrow_header row;
-	struct request request = {
-		.header = &row,
-	};
 
-	struct txn *txn = txn_begin();
-	if (txn == NULL)
-		return -1;
+	struct journal_entry *entry = (struct journal_entry *)buf;
+	entry->rows[0] = &row;
 
-	int res = 0;
-	if (is_confirm) {
-		res = xrow_encode_confirm(&row, &txn->region,
-					  limbo->instance_id, lsn);
-	} else {
-		/*
-		 * This LSN is the first to be rolled back, so
-		 * the last "safe" lsn is lsn - 1.
-		 */
-		res = xrow_encode_rollback(&row, &txn->region,
-					   limbo->instance_id, lsn);
-	}
-	if (res == -1)
-		goto rollback;
-	/*
-	 * This is not really a transaction. It just uses txn API
-	 * to put the data into WAL. And obviously it should not
-	 * go to the limbo and block on the very same sync
-	 * transaction which it tries to confirm now.
-	 */
-	txn_set_flag(txn, TXN_FORCE_ASYNC);
+	xrow_encode_synchro(&row, &body_bin, replica_id, lsn, type);
 
-	if (txn_begin_stmt(txn, NULL) != 0)
-		goto rollback;
-	if (txn_commit_stmt(txn, &request) != 0)
-		goto rollback;
+	journal_entry_create(entry, 1, xrow_approx_len(&row),
+			     txn_limbo_write_cb, fiber());
 
-	return txn_commit(txn);
-rollback:
-	txn_rollback(txn);
-	return -1;
+	if (journal_write(entry) != 0 || entry->res < 0) {
+		diag_set(ClientError, ER_WAL_IO);
+		diag_log();
+		return -1;
+	}
+
+	return 0;
 }
 
 /**
  * Write a confirmation entry to WAL. After it's written all the
  * transactions waiting for confirmation may be finished.
  */
-static int
+static inline int
 txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, lsn, true);
+	return txn_limbo_write(limbo->instance_id, lsn, IPROTO_CONFIRM);
 }
 
 void
@@ -338,10 +346,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn)
  * transactions following the current one and waiting for
  * confirmation must be rolled back.
  */
-static int
+static inline int
 txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn)
 {
-	return txn_limbo_write_confirm_rollback(limbo, lsn, false);
+	return txn_limbo_write(limbo->instance_id, lsn, IPROTO_ROLLBACK);
 }
 
 void
-- 
2.26.2

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2020-07-27 21:39 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-27 14:06 [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 1/6] journal: drop redundant declaration Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 2/6] journal: bind asynchronous write completion to an entry Cyrill Gorcunov
2020-07-27 20:40   ` Vladislav Shpilevoy
2020-07-27 21:37     ` Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 3/6] journal: add journal_entry_create helper Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 4/6] qsync: provide a binary form of syncro entries Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 5/6] qsync: implement direct write of CONFIRM/ROLLBACK into a journal Cyrill Gorcunov
2020-07-27 20:41   ` Vladislav Shpilevoy
2020-07-27 21:39     ` Cyrill Gorcunov
2020-07-27 14:06 ` [Tarantool-patches] [PATCH v3 6/6] qsync: drop no longer used encoding helpers Cyrill Gorcunov
2020-07-27 14:12 ` [Tarantool-patches] [PATCH v3 0/6] qsync: write CONFIRM/ROLLBACK without txn engine Cyrill Gorcunov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox