[PATCH 5/6] wal: separate checkpoint and flush paths
Vladimir Davydov
vdavydov.dev at gmail.com
Sun Nov 25 16:48:12 MSK 2018
Currently, wal_checkpoint() is used for two purposes. First, to make a
checkpoint (rotate = true). Second, to flush all pending WAL requests
(rotate = false). Since checkpointing has to fail if cascading rollback
is in progress so does flushing. This is confusing. Let's separate the
two paths.
---
src/box/box.cc | 5 ++--
src/box/vinyl.c | 5 +---
src/box/wal.c | 93 ++++++++++++++++++++++++++-------------------------------
src/box/wal.h | 15 +++++++---
4 files changed, 56 insertions(+), 62 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 8d6e966e..5ea2f014 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2190,10 +2190,9 @@ box_checkpoint()
goto end;
struct vclock vclock;
- if ((rc = wal_checkpoint(&vclock, true))) {
- tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK);
+ if ((rc = wal_checkpoint(&vclock)))
goto end;
- }
+
rc = engine_commit_checkpoint(&vclock);
end:
if (rc)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1794489d..05df1329 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm)
* Wait for prepared transactions to complete
* (we can't abort them as they reached WAL).
*/
- struct vclock unused;
- if (wal_checkpoint(&unused, false) != 0)
- return -1;
-
+ wal_flush();
return 0;
}
diff --git a/src/box/wal.c b/src/box/wal.c
index 11aae5fc..3e6c1e7f 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -461,29 +461,35 @@ wal_thread_stop()
wal_writer_destroy(&wal_writer_singleton);
}
-struct wal_checkpoint
+void
+wal_flush(void)
{
- struct cmsg base;
- struct vclock *vclock;
- struct fiber *fiber;
- bool rotate;
- int res;
+ cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
+}
+
+struct wal_checkpoint_msg {
+ struct cbus_call_msg base;
+ struct vclock vclock;
};
-void
-wal_checkpoint_f(struct cmsg *data)
+static int
+wal_checkpoint_f(struct cbus_call_msg *data)
{
- struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
+ struct wal_checkpoint_msg *msg = (struct wal_checkpoint_msg *)data;
struct wal_writer *writer = &wal_writer_singleton;
if (writer->in_rollback.route != NULL) {
- /* We're rolling back a failed write. */
- msg->res = -1;
- return;
+ /*
+ * We're rolling back a failed write and so
+ * can't make a checkpoint - see the comment
+ * in wal_checkpoint() for the explanation.
+ */
+ diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+ return -1;
}
/*
* Avoid closing the current WAL if it has no rows (empty).
*/
- if (msg->rotate && xlog_is_open(&writer->current_wal) &&
+ if (xlog_is_open(&writer->current_wal) &&
vclock_sum(&writer->current_wal.meta.vclock) !=
vclock_sum(&writer->vclock)) {
@@ -492,53 +498,38 @@ wal_checkpoint_f(struct cmsg *data)
* The next WAL will be created on the first write.
*/
}
- vclock_copy(msg->vclock, &writer->vclock);
-}
-
-void
-wal_checkpoint_done_f(struct cmsg *data)
-{
- struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
- fiber_wakeup(msg->fiber);
+ vclock_copy(&msg->vclock, &writer->vclock);
+ return 0;
}
int
-wal_checkpoint(struct vclock *vclock, bool rotate)
+wal_checkpoint(struct vclock *vclock)
{
struct wal_writer *writer = &wal_writer_singleton;
- if (! stailq_empty(&writer->rollback)) {
- /*
- * The writer rollback queue is not empty,
- * roll back this transaction immediately.
- * This is to ensure we do not accidentally
- * commit a transaction which has seen changes
- * that will be rolled back.
- */
- say_error("Aborting transaction %llu during "
- "cascading rollback",
- vclock_sum(&writer->vclock));
- return -1;
- }
if (writer->wal_mode == WAL_NONE) {
vclock_copy(vclock, &writer->vclock);
return 0;
}
- static struct cmsg_hop wal_checkpoint_route[] = {
- {wal_checkpoint_f, &wal_thread.tx_prio_pipe},
- {wal_checkpoint_done_f, NULL},
- };
- vclock_create(vclock);
- struct wal_checkpoint msg;
- cmsg_init(&msg.base, wal_checkpoint_route);
- msg.vclock = vclock;
- msg.fiber = fiber();
- msg.rotate = rotate;
- msg.res = 0;
- cpipe_push(&wal_thread.wal_pipe, &msg.base);
- fiber_set_cancellable(false);
- fiber_yield();
- fiber_set_cancellable(true);
- return msg.res;
+ if (!stailq_empty(&writer->rollback)) {
+ /*
+ * If cascading rollback is in progress, in-memory
+ * indexes can contain changes scheduled for rollback.
+ * If we made a checkpoint, we could write them to
+ * the snapshot. So we abort checkpointing in this
+ * case.
+ */
+ diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+ return -1;
+ }
+ struct wal_checkpoint_msg msg;
+ bool cancellable = fiber_set_cancellable(false);
+ int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY);
+ fiber_set_cancellable(cancellable);
+ if (rc != 0)
+ return -1;
+ vclock_copy(vclock, &msg.vclock);
+ return 0;
}
struct wal_gc_msg
diff --git a/src/box/wal.h b/src/box/wal.h
index e4094b1e..7ca27f1a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -166,13 +166,20 @@ wal_mode();
/**
* Wait till all pending changes to the WAL are flushed.
- * Rotates the WAL.
- *
- * @param[out] vclock WAL vclock
+ */
+void
+wal_flush(void);
+
+/**
+ * Prepare WAL for checkpointing.
*
+ * This function flushes all pending changes and rotates the
+ * current WAL. The vclock of the last record written to the
+ * rotated WAL is returned in @vclock. This is the vclock that
+ * is supposed to be used to identify the new checkpoint.
*/
int
-wal_checkpoint(struct vclock *vclock, bool rotate);
+wal_checkpoint(struct vclock *vclock);
/**
* Remove WAL files that are not needed by consumers reading
--
2.11.0
More information about the Tarantool-patches
mailing list