[tarantool-patches] [PATCH 1/7] Refactoring: wal writer fiber and queue

Georgy Kirichenko georgy at tarantool.org
Tue Aug 13 09:27:39 MSK 2019


As wal processes all writes in a cbus loop fiber it isn't possible to
yield while write. The patch introduces a wal write queue and a wal write
fiber which fetch a batch from queue and writes it out. Also checkpoint
requests are going now throw the queue to synchronize a tx checkpoint
status with wal.

This patch enables to put all garbage state into one gc object living in
tx cord and to asl gc to free space from wal in case of no space
error.
---
 src/box/wal.c | 187 +++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 146 insertions(+), 41 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 58a58e5b5..5d8dcc4f7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -92,6 +92,10 @@ struct wal_writer
 	/** A memory pool for messages. */
 	struct mempool msg_pool;
 	/* ----------------- wal ------------------- */
+	/** A write queue. */
+	struct stailq write_queue;
+	/** A write queue condition. */
+	struct fiber_cond write_cond;
 	/** A setting from instance configuration - rows_per_wal */
 	int64_t wal_max_rows;
 	/** A setting from instance configuration - wal_max_size */
@@ -158,19 +162,40 @@ struct wal_writer
 	struct rlist watchers;
 };
 
+enum wal_msg_type {
+	WAL_MSG_WRITE = 0,
+	WAL_MSG_CHECKPOINT = 1
+};
+
 struct wal_msg {
 	struct cmsg base;
-	/** Approximate size of this request when encoded. */
-	size_t approx_len;
-	/** Input queue, on output contains all committed requests. */
-	struct stailq commit;
-	/**
-	 * In case of rollback, contains the requests which must
-	 * be rolled back.
-	 */
-	struct stailq rollback;
-	/** vclock after the batch processed. */
-	struct vclock vclock;
+	/** A link to a wal writer queue. */
+	struct stailq_entry in_queue;
+	/** Wal messgae type. */
+	enum wal_msg_type type;
+	union {
+		struct {
+			/** Approximate size of this request when encoded. */
+			size_t approx_len;
+			/** Input queue, on output contains all committed requests. */
+			struct stailq commit;
+			/**
+			 * In case of rollback, contains the requests which must
+			 * be rolled back.
+			 */
+			struct stailq rollback;
+			/** vclock after the batch processed. */
+			struct vclock vclock;
+		};
+		struct {
+			/** A checkpoint structure. */
+			struct wal_checkpoint *checkpoint;
+			/** Fiber issued the batch. */
+			struct fiber *fiber;
+			/** return code. */
+			int *rc;
+		};
+	};
 };
 
 /**
@@ -197,7 +222,10 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-	{wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
+	{wal_write_to_disk, NULL}
+};
+
+static struct cmsg_hop wal_response_route[] = {
 	{tx_schedule_commit, NULL},
 };
 
@@ -214,7 +242,9 @@ wal_msg_create(struct wal_msg *batch)
 static struct wal_msg *
 wal_msg(struct cmsg *msg)
 {
-	return msg->route == wal_request_route ? (struct wal_msg *) msg : NULL;
+	return msg->route == wal_request_route &&
+	       ((struct wal_msg *)msg)->type == WAL_MSG_WRITE ?
+	       (struct wal_msg *) msg : NULL;
 }
 
 /** Write a request to a log in a single transaction. */
@@ -271,18 +301,22 @@ tx_schedule_commit(struct cmsg *msg)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_msg *batch = (struct wal_msg *) msg;
-	/*
-	 * Move the rollback list to the writer first, since
-	 * wal_msg memory disappears after the first
-	 * iteration of tx_schedule_queue loop.
-	 */
-	if (! stailq_empty(&batch->rollback)) {
-		/* Closes the input valve. */
-		stailq_concat(&writer->rollback, &batch->rollback);
+	if (batch->type == WAL_MSG_WRITE) {
+		/*
+		 * Move the rollback list to the writer first, since
+		 * wal_msg memory disappears after the first
+		 * iteration of tx_schedule_queue loop.
+		 */
+		if (! stailq_empty(&batch->rollback)) {
+			/* Closes the input valve. */
+			stailq_concat(&writer->rollback, &batch->rollback);
+		}
+		/* Update the tx vclock to the latest written by wal. */
+		vclock_copy(&replicaset.vclock, &batch->vclock);
+		tx_schedule_queue(&batch->commit);
+	} else {
+		fiber_wakeup(batch->fiber);
 	}
-	/* Update the tx vclock to the latest written by wal. */
-	vclock_copy(&replicaset.vclock, &batch->vclock);
-	tx_schedule_queue(&batch->commit);
 	mempool_free(&writer->msg_pool, container_of(msg, struct wal_msg, base));
 }
 
@@ -378,6 +412,9 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
 
 	mempool_create(&writer->msg_pool, &cord()->slabc,
 		       sizeof(struct wal_msg));
+
+	stailq_create(&writer->write_queue);
+	fiber_cond_create(&writer->write_cond);
 }
 
 /** Destroy a WAL writer structure. */
@@ -389,7 +426,7 @@ wal_writer_destroy(struct wal_writer *writer)
 
 /** WAL writer thread routine. */
 static int
-wal_writer_f(va_list ap);
+wal_cord_f(va_list ap);
 
 static int
 wal_open_f(struct cbus_call_msg *msg)
@@ -475,7 +512,7 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
 			  on_checkpoint_threshold);
 
 	/* Start WAL thread. */
-	if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
+	if (cord_costart(&writer->cord, "wal", wal_cord_f, NULL) != 0)
 		return -1;
 
 	/* Create a pipe to WAL thread. */
@@ -533,19 +570,19 @@ wal_sync(void)
 	cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL);
 }
 
-static int
-wal_begin_checkpoint_f(struct cbus_call_msg *data)
+static void
+wal_begin_checkpoint_f(struct wal_msg *wal_msg)
 {
-	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
+
 	if (writer->in_rollback.route != NULL) {
 		/*
 		 * We're rolling back a failed write and so
 		 * can't make a checkpoint - see the comment
 		 * in wal_begin_checkpoint() for the explanation.
 		 */
-		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
-		return -1;
+		*wal_msg->rc = -1;
+		return;
 	}
 	/*
 	 * Avoid closing the current WAL if it has no rows (empty).
@@ -559,9 +596,9 @@ wal_begin_checkpoint_f(struct cbus_call_msg *data)
 		 * The next WAL will be created on the first write.
 		 */
 	}
-	vclock_copy(&msg->vclock, &writer->vclock);
-	msg->wal_size = writer->checkpoint_wal_size;
-	return 0;
+	vclock_copy(&wal_msg->checkpoint->vclock, &writer->vclock);
+	wal_msg->checkpoint->wal_size = writer->checkpoint_wal_size;
+	*wal_msg->rc = 0;
 }
 
 int
@@ -584,13 +621,29 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
 	}
+	struct wal_msg *batch = (struct wal_msg *)mempool_alloc(&writer->msg_pool);
+	if (batch == NULL) {
+		diag_set(OutOfMemory, sizeof(struct wal_msg),
+			 "region", "struct wal_msg");
+		return -1;
+	}
+	int rc = -1;
+	wal_msg_create(batch);
+	/* Issue a fake journal entry and catch batch vclock. */
+	batch->type = WAL_MSG_CHECKPOINT;
+	batch->checkpoint = checkpoint;
+	batch->fiber = fiber();
+	batch->rc = &rc;
+
+	cpipe_push(&writer->wal_pipe, &batch->base);
+
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
-			   &checkpoint->base, wal_begin_checkpoint_f, NULL,
-			   TIMEOUT_INFINITY);
+	fiber_yield();
 	fiber_set_cancellable(cancellable);
-	if (rc != 0)
+	if (rc != 0) {
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
 		return -1;
+	}
 	return 0;
 }
 
@@ -922,10 +975,18 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 }
 
 static void
-wal_write_to_disk(struct cmsg *msg)
+wal_write_to_disk(struct cmsg *base)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	struct wal_msg *wal_msg = (struct wal_msg *) msg;
+	struct wal_msg *wal_msg = container_of(base, struct wal_msg, base);
+	if (stailq_empty(&writer->write_queue))
+		fiber_cond_signal(&writer->write_cond);
+	stailq_add_tail(&writer->write_queue, &wal_msg->in_queue);
+}
+
+static void
+wal_write_batch(struct wal_writer *writer, struct wal_msg *wal_msg)
+{
 	struct error *error;
 
 	/*
@@ -1068,9 +1129,43 @@ done:
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
 }
 
-/** WAL writer main loop.  */
+/*
+ * Wal write fiber function.
+ */
 static int
 wal_writer_f(va_list ap)
+{
+	struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+	while (!fiber_is_cancelled()) {
+		if (stailq_empty(&writer->write_queue)) {
+			fiber_cond_wait(&writer->write_cond);
+			continue;
+		}
+		/* Fetch and dispatch a request. */
+		struct wal_msg *batch =
+			stailq_shift_entry(&writer->write_queue,
+					   struct wal_msg, in_queue);
+
+		switch (batch->type) {
+		case WAL_MSG_WRITE: wal_write_batch(writer, batch);
+			break;
+		case WAL_MSG_CHECKPOINT: wal_begin_checkpoint_f(batch);
+			break;
+		default:
+			assert(false);
+		}
+
+		/* Push a response back to tx cord. */
+		cmsg_init(&batch->base, wal_response_route);
+		cpipe_push(&writer->tx_prio_pipe, &batch->base);
+	}
+
+	return 0;
+}
+
+/** WAL writer main fiber. */
+static int
+wal_cord_f(va_list ap)
 {
 	(void) ap;
 	struct wal_writer *writer = &wal_writer_singleton;
@@ -1078,6 +1173,12 @@ wal_writer_f(va_list ap)
 	/** Initialize eio in this thread */
 	coio_enable();
 
+	struct fiber *wal_fiber = fiber_new("wal_writer", wal_writer_f);
+	if (wal_fiber == NULL)
+		panic("Could not create a wal fiber");
+	fiber_set_joinable(wal_fiber, true);
+	fiber_start(wal_fiber, writer);
+
 	struct cbus_endpoint endpoint;
 	cbus_endpoint_create(&endpoint, "wal", fiber_schedule_cb, fiber());
 	/*
@@ -1089,6 +1190,9 @@ wal_writer_f(va_list ap)
 
 	cbus_loop(&endpoint);
 
+	fiber_cancel(wal_fiber);
+	fiber_join(wal_fiber);
+
 	/*
 	 * Create a new empty WAL on shutdown so that we don't
 	 * have to rescan the last WAL to find the instance vclock.
@@ -1157,6 +1261,7 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 			goto fail;
 		}
 		wal_msg_create(batch);
+		batch->type = WAL_MSG_WRITE;
 		/*
 		 * Sic: first add a request, then push the batch,
 		 * since cpipe_push() may pass the batch to WAL
-- 
2.22.0





More information about the Tarantool-patches mailing list