[tarantool-patches] [PATCH v3 09/14] wal: a dedicated wal scheduling fiber
Georgy Kirichenko
georgy at tarantool.org
Sun Jun 9 23:44:38 MSK 2019
In order to implement asynchronous transaction we need to run a
transaction completion handler but tx_prio is not able to yield.
Prerequisites: #1254
---
src/box/wal.c | 48 +++++++++++++++++++++++++++++++++++++++++-------
1 file changed, 41 insertions(+), 7 deletions(-)
diff --git a/src/box/wal.c b/src/box/wal.c
index 86d021896..e868a8e71 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -100,6 +100,12 @@ struct wal_writer
struct cpipe wal_pipe;
/** A memory pool for messages. */
struct mempool msg_pool;
+ /** A queue to schedule journal entry completions. */
+ struct stailq schedule_queue;
+ /** True if writer is in rollback state. */
+ bool is_in_rollback;
+ /** A condition to signal about new schedule queue entries. */
+ struct fiber_cond schedule_cond;
/* ----------------- wal ------------------- */
/** A setting from instance configuration - rows_per_wal */
int64_t wal_max_rows;
@@ -254,17 +260,36 @@ xlog_write_entry(struct xlog *l, struct journal_entry *entry)
return xlog_tx_commit(l);
}
+/*
+ * Tx schedule fiber function.
+ */
+static int
+tx_schedule_f(va_list ap)
+{
+ struct wal_writer *writer = va_arg(ap, struct wal_writer *);
+ while (!fiber_is_cancelled()) {
+ while (!stailq_empty(&writer->schedule_queue)) {
+ struct journal_entry *req =
+ stailq_shift_entry(&writer->schedule_queue,
+ struct journal_entry, fifo);
+ req->done = true;
+ fiber_cond_broadcast(&req->done_cond);
+ }
+ writer->is_in_rollback = false;
+ fiber_cond_wait(&writer->schedule_cond);
+ }
+ return 0;
+}
+
/**
- * Signal done condition.
+ * Attach requests to a scheduling queue.
*/
static void
tx_schedule_queue(struct stailq *queue)
{
- struct journal_entry *req;
- stailq_foreach_entry(req, queue, fifo) {
- req->done = true;
- fiber_cond_broadcast(&req->done_cond);
- }
+ struct wal_writer *writer = &wal_writer_singleton;
+ stailq_concat(&writer->schedule_queue, queue);
+ fiber_cond_signal(&writer->schedule_cond);
}
/**
@@ -309,6 +334,8 @@ tx_schedule_rollback(struct cmsg *msg)
/* Must not yield. */
tx_schedule_queue(&writer->rollback);
stailq_create(&writer->rollback);
+ writer->is_in_rollback = true;
+
if (msg != &writer->in_rollback)
mempool_free(&writer->msg_pool,
container_of(msg, struct wal_msg, base));
@@ -359,6 +386,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->wal_mode = wal_mode;
writer->wal_max_rows = wal_max_rows;
writer->wal_max_size = wal_max_size;
+ writer->is_in_rollback = false;
journal_create(&writer->base, wal_mode == WAL_NONE ?
wal_write_in_wal_mode_none : wal_write,
wal_mode == WAL_NONE ?
@@ -374,6 +402,12 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
stailq_create(&writer->rollback);
cmsg_init(&writer->in_rollback, NULL);
+ stailq_create(&writer->schedule_queue);
+ fiber_cond_create(&writer->schedule_cond);
+ struct fiber *schedule_fiber = fiber_new("tx_schedule", tx_schedule_f);
+ if (schedule_fiber == NULL)
+ panic("Could not create schedule fiber");
+ fiber_start(schedule_fiber, writer);
writer->checkpoint_wal_size = 0;
writer->checkpoint_threshold = INT64_MAX;
@@ -1139,7 +1173,7 @@ wal_async_write(struct journal *journal, struct journal_entry *entry)
ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
- if (! stailq_empty(&writer->rollback)) {
+ if (writer->is_in_rollback) {
/*
* The writer rollback queue is not empty,
* roll back this transaction immediately.
--
2.21.0
More information about the Tarantool-patches
mailing list