[Tarantool-patches] [PATCH v4 06/11] wal: extract log write batch into a separate routine
Georgy Kirichenko
georgy at tarantool.org
Wed Feb 12 12:39:15 MSK 2020
Introduce a routine which transfers journal entries from an input
to an output queue writing them to a xlog file. On xlog output
the routine breaks transferring loop and returns writing result code.
After this the output queue containing entries which were written to
xlog (despite the disk write status) whereas the input queue contains
untouched entries. If an input queue is processed without actual
xlog write then a xlog file is flushed manually.
This refactoring helps to implement wal memory buffer.
Part of #980, #3794
---
src/box/wal.c | 87 ++++++++++++++++++++++++++++++++-------------------
1 file changed, 54 insertions(+), 33 deletions(-)
diff --git a/src/box/wal.c b/src/box/wal.c
index 0ae66ff32..ce15cb459 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -958,6 +958,36 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
}
}
+/*
+ * This function shifts entries from input queue and writes
+ * them to the current log file until the current log flushes
+ * or write error happened. All touched entries are moved to
+ * the output queue. The function returns count of written
+ * bytes or -1 in case of error.
+ */
+static ssize_t
+wal_write_xlog_batch(struct wal_writer *writer, struct stailq *input,
+ struct stailq *output, struct vclock *vclock_diff)
+{
+ struct xlog *l = &writer->current_wal;
+ ssize_t rc;
+ do {
+ struct journal_entry *entry =
+ stailq_shift_entry(input, struct journal_entry, fifo);
+ stailq_add_tail(output, &entry->fifo);
+
+ wal_assign_lsn(vclock_diff, &writer->vclock,
+ entry->rows, entry->rows + entry->n_rows);
+ entry->res = vclock_sum(vclock_diff) +
+ vclock_sum(&writer->vclock);
+ rc = xlog_write_entry(l, entry);
+ } while (rc == 0 && !stailq_empty(input));
+ /* If log was not flushed then flush it explicitly. */
+ if (rc == 0)
+ rc = xlog_flush(l);
+ return rc;
+}
+
static void
wal_write_to_disk(struct cmsg *msg)
{
@@ -1017,36 +1047,31 @@ wal_write_to_disk(struct cmsg *msg)
* of request in xlog file is stored inside `struct journal_entry`.
*/
- struct xlog *l = &writer->current_wal;
-
- /*
- * Iterate over requests (transactions)
- */
- int rc;
- struct journal_entry *entry;
- struct stailq_entry *last_committed = NULL;
- stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
- wal_assign_lsn(&vclock_diff, &writer->vclock,
- entry->rows, entry->rows + entry->n_rows);
- entry->res = vclock_sum(&vclock_diff) +
- vclock_sum(&writer->vclock);
- rc = xlog_write_entry(l, entry);
- if (rc < 0)
- goto done;
- if (rc > 0) {
+ struct stailq input;
+ stailq_create(&input);
+ stailq_concat(&input, &wal_msg->commit);
+ struct stailq output;
+ stailq_create(&output);
+ while (!stailq_empty(&input)) {
+ ssize_t rc = wal_write_xlog_batch(writer, &input, &output,
+ &vclock_diff);
+ if (rc < 0) {
+ /*
+ * Put processed entries and tail of write
+ * queue to a rollback list.
+ */
+ stailq_concat(&wal_msg->rollback, &output);
+ stailq_concat(&wal_msg->rollback, &input);
+ } else {
+ /*
+ * Schedule processed entries to commit
+ * and update the wal vclock.
+ */
+ stailq_concat(&wal_msg->commit, &output);
writer->checkpoint_wal_size += rc;
- last_committed = &entry->fifo;
vclock_merge(&writer->vclock, &vclock_diff);
}
- /* rc == 0: the write is buffered in xlog_tx */
}
- rc = xlog_flush(l);
- if (rc < 0)
- goto done;
-
- writer->checkpoint_wal_size += rc;
- last_committed = stailq_last(&wal_msg->commit);
- vclock_merge(&writer->vclock, &vclock_diff);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1070,7 +1095,6 @@ wal_write_to_disk(struct cmsg *msg)
}
}
-done:
error = diag_last_error(diag_get());
if (error) {
/* Until we can pass the error to tx, log it and clear. */
@@ -1090,15 +1114,12 @@ done:
* nothing, and need to start rollback from the first
* request. Otherwise we rollback from the first request.
*/
- struct stailq rollback;
- stailq_cut_tail(&wal_msg->commit, last_committed, &rollback);
-
- if (!stailq_empty(&rollback)) {
+ if (!stailq_empty(&wal_msg->rollback)) {
+ struct journal_entry *entry;
/* Update status of the successfully committed requests. */
- stailq_foreach_entry(entry, &rollback, fifo)
+ stailq_foreach_entry(entry, &wal_msg->rollback, fifo)
entry->res = -1;
/* Rollback unprocessed requests */
- stailq_concat(&wal_msg->rollback, &rollback);
wal_writer_begin_rollback(writer);
}
fiber_gc();
--
2.25.0
More information about the Tarantool-patches
mailing list