[PATCH 5/6] wal: separate checkpoint and flush paths

Vladimir Davydov vdavydov.dev at gmail.com
Sun Nov 25 16:48:12 MSK 2018


Currently, wal_checkpoint() is used for two purposes. First, to make a
checkpoint (rotate = true). Second, to flush all pending WAL requests
(rotate = false). Since checkpointing has to fail if cascading rollback
is in progress so does flushing. This is confusing. Let's separate the
two paths.
---
 src/box/box.cc  |  5 ++--
 src/box/vinyl.c |  5 +---
 src/box/wal.c   | 93 ++++++++++++++++++++++++++-------------------------------
 src/box/wal.h   | 15 +++++++---
 4 files changed, 56 insertions(+), 62 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 8d6e966e..5ea2f014 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -2190,10 +2190,9 @@ box_checkpoint()
 		goto end;
 
 	struct vclock vclock;
-	if ((rc = wal_checkpoint(&vclock, true))) {
-		tnt_error(ClientError, ER_CHECKPOINT_ROLLBACK);
+	if ((rc = wal_checkpoint(&vclock)))
 		goto end;
-	}
+
 	rc = engine_commit_checkpoint(&vclock);
 end:
 	if (rc)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1794489d..05df1329 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1014,10 +1014,7 @@ vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm)
 	 * Wait for prepared transactions to complete
 	 * (we can't abort them as they reached WAL).
 	 */
-	struct vclock unused;
-	if (wal_checkpoint(&unused, false) != 0)
-		return -1;
-
+	wal_flush();
 	return 0;
 }
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 11aae5fc..3e6c1e7f 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -461,29 +461,35 @@ wal_thread_stop()
 		wal_writer_destroy(&wal_writer_singleton);
 }
 
-struct wal_checkpoint
+void
+wal_flush(void)
 {
-	struct cmsg base;
-	struct vclock *vclock;
-	struct fiber *fiber;
-	bool rotate;
-	int res;
+	cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
+}
+
+struct wal_checkpoint_msg {
+	struct cbus_call_msg base;
+	struct vclock vclock;
 };
 
-void
-wal_checkpoint_f(struct cmsg *data)
+static int
+wal_checkpoint_f(struct cbus_call_msg *data)
 {
-	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
+	struct wal_checkpoint_msg *msg = (struct wal_checkpoint_msg *)data;
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->in_rollback.route != NULL) {
-		/* We're rolling back a failed write. */
-		msg->res = -1;
-		return;
+		/*
+		 * We're rolling back a failed write and so
+		 * can't make a checkpoint - see the comment
+		 * in wal_checkpoint() for the explanation.
+		 */
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+		return -1;
 	}
 	/*
 	 * Avoid closing the current WAL if it has no rows (empty).
 	 */
-	if (msg->rotate && xlog_is_open(&writer->current_wal) &&
+	if (xlog_is_open(&writer->current_wal) &&
 	    vclock_sum(&writer->current_wal.meta.vclock) !=
 	    vclock_sum(&writer->vclock)) {
 
@@ -492,53 +498,38 @@ wal_checkpoint_f(struct cmsg *data)
 		 * The next WAL will be created on the first write.
 		 */
 	}
-	vclock_copy(msg->vclock, &writer->vclock);
-}
-
-void
-wal_checkpoint_done_f(struct cmsg *data)
-{
-	struct wal_checkpoint *msg = (struct wal_checkpoint *) data;
-	fiber_wakeup(msg->fiber);
+	vclock_copy(&msg->vclock, &writer->vclock);
+	return 0;
 }
 
 int
-wal_checkpoint(struct vclock *vclock, bool rotate)
+wal_checkpoint(struct vclock *vclock)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
-	if (! stailq_empty(&writer->rollback)) {
-		/*
-		 * The writer rollback queue is not empty,
-		 * roll back this transaction immediately.
-		 * This is to ensure we do not accidentally
-		 * commit a transaction which has seen changes
-		 * that will be rolled back.
-		 */
-		say_error("Aborting transaction %llu during "
-			  "cascading rollback",
-			  vclock_sum(&writer->vclock));
-		return -1;
-	}
 	if (writer->wal_mode == WAL_NONE) {
 		vclock_copy(vclock, &writer->vclock);
 		return 0;
 	}
-	static struct cmsg_hop wal_checkpoint_route[] = {
-		{wal_checkpoint_f, &wal_thread.tx_prio_pipe},
-		{wal_checkpoint_done_f, NULL},
-	};
-	vclock_create(vclock);
-	struct wal_checkpoint msg;
-	cmsg_init(&msg.base, wal_checkpoint_route);
-	msg.vclock = vclock;
-	msg.fiber = fiber();
-	msg.rotate = rotate;
-	msg.res = 0;
-	cpipe_push(&wal_thread.wal_pipe, &msg.base);
-	fiber_set_cancellable(false);
-	fiber_yield();
-	fiber_set_cancellable(true);
-	return msg.res;
+	if (!stailq_empty(&writer->rollback)) {
+		/*
+		 * If cascading rollback is in progress, in-memory
+		 * indexes can contain changes scheduled for rollback.
+		 * If we made a checkpoint, we could write them to
+		 * the snapshot. So we abort checkpointing in this
+		 * case.
+		 */
+		diag_set(ClientError, ER_CHECKPOINT_ROLLBACK);
+		return -1;
+	}
+	struct wal_checkpoint_msg msg;
+	bool cancellable = fiber_set_cancellable(false);
+	int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+			   &msg.base, wal_checkpoint_f, NULL, TIMEOUT_INFINITY);
+	fiber_set_cancellable(cancellable);
+	if (rc != 0)
+		return -1;
+	vclock_copy(vclock, &msg.vclock);
+	return 0;
 }
 
 struct wal_gc_msg
diff --git a/src/box/wal.h b/src/box/wal.h
index e4094b1e..7ca27f1a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -166,13 +166,20 @@ wal_mode();
 
 /**
  * Wait till all pending changes to the WAL are flushed.
- * Rotates the WAL.
- *
- * @param[out] vclock WAL vclock
+ */
+void
+wal_flush(void);
+
+/**
+ * Prepare WAL for checkpointing.
  *
+ * This function flushes all pending changes and rotates the
+ * current WAL. The vclock of the last record written to the
+ * rotated WAL is returned in @vclock. This is the vclock that
+ * is supposed to be used to identify the new checkpoint.
  */
 int
-wal_checkpoint(struct vclock *vclock, bool rotate);
+wal_checkpoint(struct vclock *vclock);
 
 /**
  * Remove WAL files that are not needed by consumers reading
-- 
2.11.0




More information about the Tarantool-patches mailing list