[PATCH 04/10] vinyl: rework vylog transaction backlog implementation

Vladimir Davydov vdavydov.dev at gmail.com
Fri May 17 17:52:38 MSK 2019


Some vylog transactions don't tolerate failures. For example, it's too
late to fail in an on_commit trigger, after an index creation record was
written to WAL. So we keep an in-memory backlog for such transactions
and try to flush them to disk whenever we write anything else to the log.
This doesn't guarantee that the transaction will reach the vylog file
though, as the instance may be restarted before we manage to flush the
backlog. It's okay - we know how to handle this on recovery.

Currently, the backlog is implemented as follows: we have a list in the
tx thread to which we append all pending vylog records; if we fail to
flush a transaction to disk on commit, we delete records written by
the transaction from the list only if the transaction is discardable,
otherwise we leave the records in the list. The list is protected by
a global latch, which means that any vylog write may block. This blocks
implementation of transactional DDL.

Actually, it isn't necessary to store backlog in the list of pending
records. We can instead leave non-discardable records right in the xlog
buffer we failed to flush. Since the xlog is managed exclusively by the
vylog thread, this doesn't require any synchronization. Besides this
allows us to hand record encoding to the vylog thread, which is good.

Note, this patch doesn't remove the latch, not yet. We still use it to
sync access to the transaction buffer in tx. This will be reworked by
the following patches.
---
 src/box/vy_log.c | 185 ++++++++++++++++++++++++-------------------------------
 src/box/xlog.c   |  12 +++-
 src/box/xlog.h   |  31 ++++++++++
 3 files changed, 119 insertions(+), 109 deletions(-)

diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index cf967595..1bc35e82 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -52,7 +52,6 @@
 #include "errinj.h"
 #include "fiber.h"
 #include "iproto_constants.h" /* IPROTO_INSERT */
-#include "journal.h"
 #include "key_def.h"
 #include "latch.h"
 #include "replication.h" /* INSTANCE_UUID */
@@ -156,13 +155,6 @@ struct vy_log {
 	 * Linked by vy_log_record::in_tx;
 	 */
 	struct stailq tx;
-	/** Start of the current transaction in the pool, for rollback */
-	size_t tx_svp;
-	/**
-	 * Last record in the queue at the time when the current
-	 * transaction was started. Used for rollback.
-	 */
-	struct stailq_entry *tx_begin;
 	/**
 	 * Flag set if vy_log_write() failed.
 	 *
@@ -202,9 +194,6 @@ vy_log_open(void);
 static int
 vy_log_create(const struct vclock *vclock, struct vy_recovery *recovery);
 
-static int
-vy_log_flush(void);
-
 /**
  * Return the name of the vylog file that has the given signature.
  */
@@ -781,35 +770,83 @@ vy_log_init(const char *dir)
 
 struct vy_log_flush_msg {
 	struct cbus_call_msg base;
-	struct journal_entry *entry;
+	bool no_discard;
 };
 
 static int
 vy_log_flush_f(struct cbus_call_msg *base)
 {
 	struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base;
-	struct journal_entry *entry = msg->entry;
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
 	struct xlog *xlog = &vy_log.xlog;
 
 	if (!xlog_is_open(xlog)) {
 		if (vy_log_open() < 0)
-			return -1;
+			goto fail;
 	}
 
+	/* Encode the transaction and write it to the xlog buffer. */
 	xlog_tx_begin(xlog);
-	for (int i = 0; i < entry->n_rows; ++i) {
-		entry->rows[i]->tm = ev_now(loop());
-		if (xlog_write_row(xlog, entry->rows[i]) < 0) {
+	struct vy_log_record *record;
+	stailq_foreach_entry(record, &vy_log.tx, in_tx) {
+		struct xrow_header row;
+		if (vy_log_record_encode(record, &row) < 0) {
 			xlog_tx_rollback(xlog);
-			return -1;
+			goto fail;
+		}
+		row.tm = ev_now(loop());
+		if (xlog_write_row(xlog, &row) < 0) {
+			xlog_tx_rollback(xlog);
+			goto fail;
 		}
 	}
 
-	if (xlog_tx_commit(xlog) < 0 ||
-	    xlog_flush(xlog) < 0)
-		return -1;
+	/* Flush the xlog buffer to disk. */
+	if (msg->no_discard)
+		xlog_tx_set_rollback_svp(xlog);
+
+	int rc = 0;
+	ERROR_INJECT(ERRINJ_VY_LOG_FLUSH, {
+		diag_set(ClientError, ER_INJECTION, "vinyl log flush");
+		xlog_tx_rollback(xlog);
+		rc = -1;
+	});
+	if (rc >= 0)
+		rc = xlog_tx_commit(xlog);
+	if (rc >= 0)
+		rc = xlog_flush(xlog);
 
-	return 0;
+	if (rc < 0 && msg->no_discard) {
+		/*
+		 * The message got appended to the xlog buffer so
+		 * it will get flushed along with the next message.
+		 * If it doesn't get flushed until restart, the
+		 * caller should be prepared for that so just warn
+		 * and go on.
+		 */
+		struct error *e = diag_last_error(diag_get());
+		say_warn("failed to flush vylog: %s", e->errmsg);
+		rc = 0;
+	}
+	region_truncate(region, region_svp);
+	return rc < 0 ? -1 : 0;
+fail:
+	if (msg->no_discard) {
+		/*
+		 * The caller doesn't tolerate failures so we have
+		 * no choice but panic. Good news is this shouldn't
+		 * normally happen. For example, we don't panic if
+		 * we failed to write the transaction to disk - we
+		 * leave it in the xlog buffer then (see above).
+		 * We get here on OOM, in which case there's no point
+		 * to continue anyway.
+		 */
+		diag_log();
+		panic("non-discardable vylog transaction failed");
+	}
+	region_truncate(region, region_svp);
+	return -1;
 }
 
 /**
@@ -820,73 +857,31 @@ vy_log_flush_f(struct cbus_call_msg *base)
  * buffered transactions, and want to avoid a partial write.
  */
 static int
-vy_log_flush(void)
+vy_log_flush(bool no_discard)
 {
 	if (stailq_empty(&vy_log.tx))
 		return 0; /* nothing to do */
 
-	ERROR_INJECT(ERRINJ_VY_LOG_FLUSH, {
-		diag_set(ClientError, ER_INJECTION, "vinyl log flush");
-		return -1;
-	});
 	struct errinj *delay = errinj(ERRINJ_VY_LOG_FLUSH_DELAY, ERRINJ_BOOL);
 	if (delay != NULL && delay->bparam) {
 		while (delay->bparam)
 			fiber_sleep(0.001);
 	}
 
-	int tx_size = 0;
-	struct vy_log_record *record;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx)
-		tx_size++;
-
-	size_t used = region_used(&fiber()->gc);
-	struct journal_entry *entry = journal_entry_new(tx_size, &fiber()->gc);
-	if (entry == NULL)
-		goto err;
-
-	struct xrow_header *rows;
-	rows = region_aligned_alloc(&fiber()->gc,
-				    tx_size * sizeof(struct xrow_header),
-				    alignof(struct xrow_header));
-	if (rows == NULL)
-		goto err;
-
-	/*
-	 * Encode buffered records.
-	 */
-	int i = 0;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx) {
-		assert(i < tx_size);
-		struct xrow_header *row = &rows[i];
-		if (vy_log_record_encode(record, row) < 0)
-			goto err;
-		entry->rows[i] = row;
-		i++;
-	}
-	assert(i == tx_size);
-
 	/*
 	 * Do actual disk writes on behalf of the vylog thread
 	 * so as not to block the tx thread.
 	 */
 	struct vy_log_flush_msg msg;
-	msg.entry = entry;
+	msg.no_discard = no_discard;
 	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
 			   vy_log_flush_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
-	if (rc != 0)
-		goto err;
 
-	/* Success. Free flushed records. */
-	region_reset(&vy_log.pool);
+	region_free(&vy_log.pool);
 	stailq_create(&vy_log.tx);
-	region_truncate(&fiber()->gc, used);
-	return 0;
-err:
-	region_truncate(&fiber()->gc, used);
-	return -1;
+	return rc;
 }
 
 void
@@ -1085,7 +1080,7 @@ vy_log_end_recovery(void)
 		vy_recovery_process_record(vy_log.recovery, record);
 
 	/* Flush all pending records. */
-	if (vy_log_flush() < 0) {
+	if (vy_log_flush(false) != 0) {
 		diag_log();
 		say_error("failed to flush vylog after recovery");
 		return -1;
@@ -1157,12 +1152,6 @@ vy_log_rotate(const struct vclock *vclock)
 	 */
 	latch_lock(&vy_log.latch);
 
-	if (vy_log_flush() != 0) {
-		diag_log();
-		say_error("failed to flush vylog for checkpoint");
-		goto fail;
-	}
-
 	/* Do actual work from coio so as not to stall tx thread. */
 	struct vy_log_rotate_msg msg;
 	vclock_copy(&msg.vclock, vclock);
@@ -1227,8 +1216,6 @@ void
 vy_log_tx_begin(void)
 {
 	latch_lock(&vy_log.latch);
-	vy_log.tx_begin = stailq_last(&vy_log.tx);
-	vy_log.tx_svp = region_used(&vy_log.pool);
 	vy_log.tx_failed = false;
 	say_verbose("begin vylog transaction");
 }
@@ -1243,8 +1230,6 @@ vy_log_tx_begin(void)
 static int
 vy_log_tx_do_commit(bool no_discard)
 {
-	struct stailq rollback;
-
 	assert(latch_owner(&vy_log.latch) == fiber());
 
 	if (vy_log.tx_failed) {
@@ -1257,7 +1242,7 @@ vy_log_tx_do_commit(bool no_discard)
 			diag_log();
 			panic("non-discardable vylog transaction failed");
 		}
-		goto rollback;
+		goto fail;
 	}
 
 	/*
@@ -1269,26 +1254,13 @@ vy_log_tx_do_commit(bool no_discard)
 	if (vy_log.recovery != NULL)
 		goto done;
 
-	if (vy_log_flush() != 0) {
-		if (!no_discard)
-			goto rollback;
-		/*
-		 * We were told not to discard the transaction on
-		 * failure so just warn and leave it in the buffer.
-		 */
-		struct error *e = diag_last_error(diag_get());
-		say_warn("failed to flush vylog: %s", e->errmsg);
-	}
-
+	if (vy_log_flush(no_discard) != 0)
+		goto fail;
 done:
 	say_verbose("commit vylog transaction");
 	latch_unlock(&vy_log.latch);
 	return 0;
-
-rollback:
-	stailq_cut_tail(&vy_log.tx, vy_log.tx_begin, &rollback);
-	region_truncate(&vy_log.pool, vy_log.tx_svp);
-	vy_log.tx_svp = 0;
+fail:
 	say_verbose("rollback vylog transaction");
 	latch_unlock(&vy_log.latch);
 	return -1;
@@ -2295,6 +2267,17 @@ vy_recovery_build_index_id_hash(struct vy_recovery *recovery)
 static struct vy_recovery *
 vy_recovery_load(int64_t signature, int flags)
 {
+	/*
+	 * Before proceeding to log recovery, make sure that all
+	 * pending records have been flushed out.
+	 */
+	if (xlog_is_open(&vy_log.xlog) &&
+	    xlog_flush(&vy_log.xlog) < 0) {
+		diag_log();
+		say_error("failed to flush vylog for recovery");
+		return NULL;
+	}
+
 	say_verbose("loading vylog %lld", (long long)signature);
 
 	struct vy_recovery *recovery = malloc(sizeof(*recovery));
@@ -2413,16 +2396,6 @@ vy_recovery_new(int64_t signature, int flags)
 	/* Lock out concurrent writers while we are loading the log. */
 	latch_lock(&vy_log.latch);
 
-	/*
-	 * Before proceeding to log recovery, make sure that all
-	 * pending records have been flushed out.
-	 */
-	if (vy_log_flush() != 0) {
-		diag_log();
-		say_error("failed to flush vylog for recovery");
-		goto fail;
-	}
-
 	struct vy_recovery_msg msg;
 	msg.signature = signature;
 	msg.flags = flags;
diff --git a/src/box/xlog.c b/src/box/xlog.c
index 82588682..8bddac38 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -770,6 +770,7 @@ xlog_init(struct xlog *xlog, const struct xlog_opts *opts)
 	xlog->is_autocommit = true;
 	obuf_create(&xlog->obuf, &cord()->slabc, XLOG_TX_AUTOCOMMIT_THRESHOLD);
 	obuf_create(&xlog->zbuf, &cord()->slabc, XLOG_TX_AUTOCOMMIT_THRESHOLD);
+	xlog->rollback_svp.obuf_svp = obuf_create_svp(&xlog->obuf);
 	if (!opts->no_compression) {
 		xlog->zctx = ZSTD_createCCtx();
 		if (xlog->zctx == NULL) {
@@ -1201,6 +1202,7 @@ xlog_tx_write(struct xlog *log)
 {
 	if (obuf_size(&log->obuf) == XLOG_FIXHEADER_SIZE)
 		return 0;
+
 	ssize_t written;
 
 	if (!log->opts.no_compression &&
@@ -1214,19 +1216,23 @@ xlog_tx_write(struct xlog *log)
 		written = -1;
 	});
 
-	obuf_reset(&log->obuf);
 	/*
 	 * Simplify recovery after a temporary write failure:
 	 * truncate the file to the best known good write
 	 * position.
 	 */
 	if (written < 0) {
+		log->tx_rows = log->rollback_svp.rows;
+		obuf_rollback_to_svp(&log->obuf, &log->rollback_svp.obuf_svp);
 		if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
 		    ftruncate(log->fd, log->offset) != 0)
 			panic_syserror("failed to truncate xlog after write error");
 		log->allocated = 0;
 		return -1;
 	}
+	obuf_reset(&log->obuf);
+	log->rollback_svp.rows = 0;
+	log->rollback_svp.obuf_svp = obuf_create_svp(&log->obuf);
 	if (log->allocated > (size_t)written)
 		log->allocated -= written;
 	else
@@ -1373,8 +1379,8 @@ void
 xlog_tx_rollback(struct xlog *log)
 {
 	log->is_autocommit = true;
-	log->tx_rows = 0;
-	obuf_reset(&log->obuf);
+	log->tx_rows = log->rollback_svp.rows;
+	obuf_rollback_to_svp(&log->obuf, &log->rollback_svp.obuf_svp);
 }
 
 /**
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 964d303e..5dba4264 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -381,6 +381,16 @@ struct xlog {
 	 * compression.
 	 */
 	struct obuf obuf;
+	/**
+	 * Marks the beginning of the first row that may be discarded
+	 * on write error. See xlog_tx_set_rollback_svp().
+	 */
+	struct {
+		/** Number of non-discardable rows in the buffer. */
+		int rows;
+		/** Position of the first discardable row in the buffer. */
+		struct obuf_svp obuf_svp;
+	} rollback_svp;
 	/** The context of zstd compression */
 	ZSTD_CCtx *zctx;
 	/**
@@ -525,6 +535,27 @@ void
 xlog_tx_rollback(struct xlog *log);
 
 /**
+ * Sets the position in the write buffer to rollback to in case
+ * of error or voluntary rollback. Useful for transactions that
+ * cannot be discarded even on write error, e.g. certain types
+ * of vylog transactions. Everything written before this function
+ * is called will stay in the buffer until eventually gets flushed
+ * along with another transaction or by an explicit invocation of
+ * xlog_flush(). Note, however, this doesn't guarantee that the
+ * transaction will reach the disk - the instance may be restarted
+ * before anything is written to the xlog file, in which case the
+ * data written to the buffer will be lost. The caller must be
+ * prepared for this.
+ */
+static inline void
+xlog_tx_set_rollback_svp(struct xlog *log)
+{
+	assert(!log->is_autocommit);
+	log->rollback_svp.rows = log->tx_rows;
+	log->rollback_svp.obuf_svp = obuf_create_svp(&log->obuf);
+}
+
+/**
  * Flush buffered rows and sync file
  */
 ssize_t
-- 
2.11.0




More information about the Tarantool-patches mailing list