[tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber

Georgy Kirichenko georgy at tarantool.org
Sun Jun 9 23:44:38 MSK 2019


In order to implement asynchronous transaction we need to run a
transaction completion handler but tx_prio is not able to yield.

Prerequisites: #1254
---
 src/box/wal.c | 48 +++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 41 insertions(+), 7 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 86d021896..e868a8e71 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -100,6 +100,12 @@ struct wal_writer
 	struct cpipe wal_pipe;
 	/** A memory pool for messages. */
 	struct mempool msg_pool;
+	/** A queue to schedule journal entry completions. */
+	struct stailq schedule_queue;
+	/** True if writer is in rollback state. */
+	bool is_in_rollback;
+	/** A condition to signal about new schedule queue entries. */
+	struct fiber_cond schedule_cond;
 	/* ----------------- wal ------------------- */
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
@@ -254,17 +260,36 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
 	return xlog_tx_commit(l);
 }
 
+/*
+ * Tx schedule fiber function.
+ */
+static int
+tx_schedule_f(va_list ap)
+{
+	struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+	while (!fiber_is_cancelled()) {
+		while (!stailq_empty(&writer->schedule_queue)) {
+			struct journal_entry *req =
+				stailq_shift_entry(&writer->schedule_queue,
+						   struct journal_entry, fifo);
+			req->done = true;
+			fiber_cond_broadcast(&req->done_cond);
+		}
+		writer->is_in_rollback = false;
+		fiber_cond_wait(&writer->schedule_cond);
+	}
+	return 0;
+}
+
 /**
- * Signal done condition.
+ * Attach requests to a scheduling queue.
  */
 static void
 tx_schedule_queue(struct stailq *queue)
 {
-	struct journal_entry *req;
-	stailq_foreach_entry(req, queue, fifo) {
-		req->done = true;
-		fiber_cond_broadcast(&req->done_cond);
-	}
+	struct wal_writer *writer = &wal_writer_singleton;
+	stailq_concat(&writer->schedule_queue, queue);
+	fiber_cond_signal(&writer->schedule_cond);
 }
 
 /**
@@ -309,6 +334,8 @@ tx_schedule_rollback(struct cmsg *msg)
 	/* Must not yield. */
 	tx_schedule_queue(&writer->rollback);
 	stailq_create(&writer->rollback);
+	writer->is_in_rollback = true;
+
 	if (msg != &writer->in_rollback)
 		mempool_free(&writer->msg_pool,
 			     container_of(msg, struct wal_msg, base));
@@ -359,6 +386,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 	writer->wal_mode = wal_mode;
 	writer->wal_max_rows = wal_max_rows;
 	writer->wal_max_size = wal_max_size;
+	writer->is_in_rollback = false;
 	journal_create(&writer->base, wal_mode == WAL_NONE ?
 		       wal_write_in_wal_mode_none : wal_write,
 		       wal_mode == WAL_NONE ?
@@ -374,6 +402,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	stailq_create(&writer->rollback);
 	cmsg_init(&writer->in_rollback, NULL);
+	stailq_create(&writer->schedule_queue);
+	fiber_cond_create(&writer->schedule_cond);
+	struct fiber *schedule_fiber = fiber_new("tx_schedule", tx_schedule_f);
+	if (schedule_fiber == NULL)
+		panic("Could not create schedule fiber");
+	fiber_start(schedule_fiber, writer);
 
 	writer->checkpoint_wal_size = 0;
 	writer->checkpoint_threshold = INT64_MAX;
@@ -1139,7 +1173,7 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
 
 	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
 
-	if (! stailq_empty(&writer->rollback)) {
+	if (writer->is_in_rollback) {
 		/*
 		 * The writer rollback queue is not empty,
 		 * roll back this transaction immediately.
-- 
2.21.0





More information about the Tarantool-patches mailing list