[PATCH] vinyl: don't yield in on_commit and on_rollback triggers

Vladimir Davydov vdavydov.dev at gmail.com
Sat Jun 15 21:00:31 MSK 2019


To apply replicated rows in parallel, we need to be able to complete
transactions asynchronously, from the tx_prio callback. We can't yield
there so we must ensure that on_commit/on_rollback triggers don't yield.
The only place where we may yield in a completion trigger is vinyl DDL,
which submits vylog records and waits for them to complete.

Actually, there's no reason to wait for vylog write to complete, as we
can handle missing records on recovery. So this patch reworks vylog to
make vy_log_tx_try_commit() and hence on_commit/on_rollback triggers
using it non-yielding.

To achieve that, we need to:

 - Use vy_log.latch only to sync log rotation vs writes. Don't protect
   vylog buffer with it. This makes vy_log_tx_begin() non-yielding.

 - Use a separate list and buffer for storing vylog records of each
   transaction. We used to share them among transactions, but without
   vy_log.latch we can't sync access to them anymore. Since vylog
   transactions are rare events, this should be fine.

 - Make vy_log_tx_try_commit() append the transaction to the list of
   pending transactions and wake up a background fiber to flush all
   pending transactions. This way it doesn't need to yield.

Closes #4218
---
https://github.com/tarantool/tarantool/issues/4218
https://github.com/tarantool/tarantool/commits/dv/gh-4218-vy-remove-yields-from-ddl-commit

 src/box/vy_log.c       | 289 +++++++++++++++++++++++++++++++++----------------
 src/box/vy_log.h       |  14 ++-
 src/box/vy_scheduler.c |   5 +-
 3 files changed, 211 insertions(+), 97 deletions(-)

diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa3..ae1d6234 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -51,6 +51,7 @@
 #include "errcode.h"
 #include "errinj.h"
 #include "fiber.h"
+#include "fiber_cond.h"
 #include "iproto_constants.h" /* IPROTO_INSERT */
 #include "key_def.h"
 #include "latch.h"
@@ -130,6 +131,16 @@ static const char *vy_log_type_name[] = {
 	[VY_LOG_ABORT_REBOOTSTRAP]	= "abort_rebootstrap",
 };
 
+/** Batch of vylog records that must be written in one go. */
+struct vy_log_tx {
+	/** Link in vy_log::pending_tx. */
+	struct stailq_entry in_pending;
+	/** Region used for allocating records. */
+	struct region region;
+	/** List of records, linked by vy_log_record::in_tx. */
+	struct stailq records;
+};
+
 /** Metadata log object. */
 struct vy_log {
 	/**
@@ -142,27 +153,30 @@ struct vy_log {
 	struct vclock last_checkpoint;
 	/** Recovery context. */
 	struct vy_recovery *recovery;
-	/** Latch protecting the log buffer. */
+	/**
+	 * Latch that syncs log writers against readers.
+	 * Needed so that we don't miss any records during
+	 * log rotation.
+	 */
 	struct latch latch;
+	/** Background fiber flushing pending transactions. */
+	struct fiber *flusher;
+	/** Condition variable used for signalling the flusher. */
+	struct fiber_cond flusher_cond;
 	/**
 	 * Next ID to use for a vinyl object.
 	 * Used by vy_log_next_id().
 	 */
 	int64_t next_id;
-	/** A region of struct vy_log_record entries. */
-	struct region pool;
+	/** Pool of vy_log_tx objects. */
+	struct mempool tx_pool;
+	/** Current transaction or NULL. */
+	struct vy_log_tx *tx;
 	/**
-	 * Records awaiting to be written to disk.
-	 * Linked by vy_log_record::in_tx;
+	 * List of transactions awaiting to be flushed to disk,
+	 * linked by vy_log_tx::in_pending.
 	 */
-	struct stailq tx;
-	/** Start of the current transaction in the pool, for rollback */
-	size_t tx_svp;
-	/**
-	 * Last record in the queue at the time when the current
-	 * transaction was started. Used for rollback.
-	 */
-	struct stailq_entry *tx_begin;
+	struct stailq pending_tx;
 	/**
 	 * Flag set if vy_log_write() failed.
 	 *
@@ -181,6 +195,9 @@ struct vy_log {
 };
 static struct vy_log vy_log;
 
+static int
+vy_log_flusher_func(va_list va);
+
 static struct vy_recovery *
 vy_recovery_new_locked(int64_t signature, int flags);
 
@@ -737,23 +754,47 @@ vy_log_init(const char *dir)
 	xdir_create(&vy_log.dir, dir, VYLOG, &INSTANCE_UUID,
 		    &xlog_opts_default);
 	latch_create(&vy_log.latch);
-	region_create(&vy_log.pool, cord_slab_cache());
-	stailq_create(&vy_log.tx);
+	mempool_create(&vy_log.tx_pool, cord_slab_cache(),
+		       sizeof(struct vy_log_tx));
+	stailq_create(&vy_log.pending_tx);
 	diag_create(&vy_log.tx_diag);
 	wal_init_vy_log();
+	fiber_cond_create(&vy_log.flusher_cond);
+	vy_log.flusher = fiber_new("vinyl.vylog_flusher",
+				   vy_log_flusher_func);
+	if (vy_log.flusher == NULL)
+		panic("failed to allocate vylog flusher fiber");
+	fiber_wakeup(vy_log.flusher);
+}
+
+static struct vy_log_tx *
+vy_log_tx_new(void)
+{
+	struct vy_log_tx *tx = mempool_alloc(&vy_log.tx_pool);
+	if (tx == NULL) {
+		diag_set(OutOfMemory, sizeof(*tx), "mempool", "vy log tx");
+		return NULL;
+	}
+	region_create(&tx->region, cord_slab_cache());
+	stailq_create(&tx->records);
+	tx->in_pending.next = NULL;
+	return tx;
+}
+
+static void
+vy_log_tx_delete(struct vy_log_tx *tx)
+{
+	region_destroy(&tx->region);
+	mempool_free(&vy_log.tx_pool, tx);
 }
 
 /**
- * Try to flush the log buffer to disk.
- *
- * We always flush the entire vy_log buffer as a single xlog
- * transaction, since we do not track boundaries of @no_discard
- * buffered transactions, and want to avoid a partial write.
+ * Write a given transaction to disk.
  */
 static int
-vy_log_flush(void)
+vy_log_tx_flush(struct vy_log_tx *tx)
 {
-	if (stailq_empty(&vy_log.tx))
+	if (stailq_empty(&tx->records))
 		return 0; /* nothing to do */
 
 	ERROR_INJECT(ERRINJ_VY_LOG_FLUSH, {
@@ -768,7 +809,7 @@ vy_log_flush(void)
 
 	int tx_size = 0;
 	struct vy_log_record *record;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx)
+	stailq_foreach_entry(record, &tx->records, in_tx)
 		tx_size++;
 
 	size_t used = region_used(&fiber()->gc);
@@ -787,7 +828,9 @@ vy_log_flush(void)
 	 * Encode buffered records.
 	 */
 	int i = 0;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx) {
+	stailq_foreach_entry(record, &tx->records, in_tx) {
+		if (record->gc_lsn == VY_LOG_GC_LSN_CURRENT)
+			record->gc_lsn = vy_log_signature();
 		assert(i < tx_size);
 		struct xrow_header *row = &rows[i];
 		if (vy_log_record_encode(record, row) < 0)
@@ -804,9 +847,6 @@ vy_log_flush(void)
 	if (wal_write_vy_log(entry) != 0)
 		goto err;
 
-	/* Success. Free flushed records. */
-	region_reset(&vy_log.pool);
-	stailq_create(&vy_log.tx);
 	region_truncate(&fiber()->gc, used);
 	return 0;
 err:
@@ -814,11 +854,78 @@ err:
 	return -1;
 }
 
+/**
+ * Write all pending transaction to disk.
+ */
+static int
+vy_log_flush(void)
+{
+	/*
+	 * vy_log_tx_try_commit() can add a new transaction to
+	 * the list while we are writing to disk. This is okay -
+	 * we'll flush it next time. If we fail, we put remaining
+	 * transactions back to the head of the list to preserve
+	 * the commit order.
+	 */
+	struct stailq pending;
+	stailq_create(&pending);
+	stailq_concat(&pending, &vy_log.pending_tx);
+
+	int rc = 0;
+	while (!stailq_empty(&pending)) {
+		struct vy_log_tx *tx = stailq_first_entry(&pending,
+					struct vy_log_tx, in_pending);
+		rc = vy_log_tx_flush(tx);
+		if (rc != 0)
+			break;
+		stailq_shift(&pending);
+		vy_log_tx_delete(tx);
+	}
+	stailq_concat(&pending, &vy_log.pending_tx);
+	stailq_concat(&vy_log.pending_tx, &pending);
+	return rc;
+}
+
+static int
+vy_log_flusher_func(va_list va)
+{
+	(void)va;
+	while (!fiber_is_cancelled()) {
+		/*
+		 * Disable writes during local recovery.
+		 * See vy_log_tx_commit().
+		 */
+		if (vy_log.recovery != NULL ||
+		    stailq_empty(&vy_log.pending_tx)) {
+			fiber_cond_wait(&vy_log.flusher_cond);
+			continue;
+		}
+		latch_lock(&vy_log.latch);
+		int rc = vy_log_flush();
+		latch_unlock(&vy_log.latch);
+		if (rc != 0) {
+			diag_log();
+			say_error("failed to flush vylog");
+			/*
+			 * Don't retry immediately after a failure
+			 * since the next write is likely to fail
+			 * as well. Instead wait for the next signal.
+			 */
+			fiber_cond_wait(&vy_log.flusher_cond);
+		}
+	}
+	return 0;
+}
+
 void
 vy_log_free(void)
 {
+	struct vy_log_tx *tx, *next_tx;
+	stailq_foreach_entry_safe(tx, next_tx, &vy_log.pending_tx, in_pending)
+		vy_log_tx_delete(tx);
+	stailq_create(&vy_log.pending_tx);
+	mempool_destroy(&vy_log.tx_pool);
 	xdir_destroy(&vy_log.dir);
-	region_destroy(&vy_log.pool);
 	diag_destroy(&vy_log.tx_diag);
 }
 
@@ -998,9 +1105,12 @@ vy_log_end_recovery(void)
 	 * Update the recovery context with records written during
 	 * recovery - we will need them for garbage collection.
 	 */
-	struct vy_log_record *record;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx)
-		vy_recovery_process_record(vy_log.recovery, record);
+	struct vy_log_tx *tx;
+	stailq_foreach_entry(tx, &vy_log.pending_tx, in_pending) {
+		struct vy_log_record *record;
+		stailq_foreach_entry(record, &tx->records, in_tx)
+			vy_recovery_process_record(vy_log.recovery, record);
+	}
 
 	/* Flush all pending records. */
 	if (vy_log_flush() < 0) {
@@ -1123,102 +1233,95 @@ vy_log_backup_path(const struct vclock *vclock)
 void
 vy_log_tx_begin(void)
 {
-	latch_lock(&vy_log.latch);
-	vy_log.tx_begin = stailq_last(&vy_log.tx);
-	vy_log.tx_svp = region_used(&vy_log.pool);
-	vy_log.tx_failed = false;
+	assert(!vy_log.tx_failed);
+	assert(vy_log.tx == NULL);
+	vy_log.tx = vy_log_tx_new();
+	if (vy_log.tx == NULL) {
+		diag_move(diag_get(), &vy_log.tx_diag);
+		vy_log.tx_failed = true;
+	}
 	say_verbose("begin vylog transaction");
 }
 
-/**
- * Commit a transaction started with vy_log_tx_begin().
- *
- * If @no_discard is set, pending records won't be expunged from the
- * buffer on failure, so that the next transaction will retry to write
- * them to disk.
- */
-static int
-vy_log_tx_do_commit(bool no_discard)
+int
+vy_log_tx_commit(void)
 {
-	struct stailq rollback;
-
-	assert(latch_owner(&vy_log.latch) == fiber());
-
-	if (vy_log.tx_failed) {
-		/*
-		 * vy_log_write() failed to append a record to tx.
-		 * @no_discard transactions can't handle this.
-		 */
-		diag_move(&vy_log.tx_diag, diag_get());
-		if (no_discard) {
-			diag_log();
-			panic("non-discardable vylog transaction failed");
-		}
-		goto rollback;
-	}
-
 	/*
 	 * During recovery, we may replay records we failed to commit
 	 * before restart (e.g. drop LSM tree). Since the log isn't open
 	 * yet, simply leave them in the tx buffer to be flushed upon
 	 * recovery completion.
 	 */
-	if (vy_log.recovery != NULL)
-		goto done;
+	if (vy_log.recovery != NULL) {
+		vy_log_tx_try_commit();
+		return 0;
+	}
+
+	struct vy_log_tx *tx = vy_log.tx;
+	vy_log.tx = NULL;
 
-	if (vy_log_flush() != 0) {
-		if (!no_discard)
-			goto rollback;
-		/*
-		 * We were told not to discard the transaction on
-		 * failure so just warn and leave it in the buffer.
-		 */
-		struct error *e = diag_last_error(diag_get());
-		say_warn("failed to flush vylog: %s", e->errmsg);
+	if (vy_log.tx_failed) {
+		diag_move(&vy_log.tx_diag, diag_get());
+		vy_log.tx_failed = false;
+		if (tx != NULL)
+			vy_log_tx_delete(tx);
+		goto err;
 	}
 
-done:
-	say_verbose("commit vylog transaction");
+	assert(tx != NULL);
+	/*
+	 * Before writing this transaction, flush all pending ones
+	 * if any, because they were committed first.
+	 */
+	latch_lock(&vy_log.latch);
+	int rc = vy_log_flush();
+	if (rc == 0)
+		rc = vy_log_tx_flush(tx);
 	latch_unlock(&vy_log.latch);
+
+	vy_log_tx_delete(tx);
+	if (rc != 0)
+		goto err;
+
+	say_verbose("commit vylog transaction");
 	return 0;
-
-rollback:
-	stailq_cut_tail(&vy_log.tx, vy_log.tx_begin, &rollback);
-	region_truncate(&vy_log.pool, vy_log.tx_svp);
-	vy_log.tx_svp = 0;
+err:
 	say_verbose("rollback vylog transaction");
-	latch_unlock(&vy_log.latch);
 	return -1;
 }
 
-int
-vy_log_tx_commit(void)
-{
-	return vy_log_tx_do_commit(false);
-}
-
 void
 vy_log_tx_try_commit(void)
 {
-	if (vy_log_tx_do_commit(true) != 0)
-		unreachable();
+	if (vy_log.tx_failed) {
+		diag_move(&vy_log.tx_diag, diag_get());
+		diag_log();
+		panic("non-discardable vylog transaction failed");
+	}
+	assert(vy_log.tx != NULL);
+	stailq_add_tail_entry(&vy_log.pending_tx, vy_log.tx, in_pending);
+	fiber_cond_signal(&vy_log.flusher_cond);
+	vy_log.tx = NULL;
+	say_verbose("commit vylog transaction");
 }
 
 void
 vy_log_write(const struct vy_log_record *record)
 {
-	assert(latch_owner(&vy_log.latch) == fiber());
+	say_verbose("write vylog record: %s", vy_log_record_str(record));
 
-	struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool,
+	if (vy_log.tx_failed)
+		return;
+
+	assert(vy_log.tx != NULL);
+	struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.tx->region,
 							    record);
 	if (tx_record == NULL) {
 		diag_move(diag_get(), &vy_log.tx_diag);
 		vy_log.tx_failed = true;
 		return;
 	}
-
-	say_verbose("write vylog record: %s", vy_log_record_str(tx_record));
-	stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx);
+	stailq_add_tail_entry(&vy_log.tx->records, tx_record, in_tx);
 }
 
 /**
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index ee38c193..298a8ed4 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -221,6 +221,16 @@ enum vy_log_record_type {
 	vy_log_record_type_MAX
 };
 
+/**
+ * Special value of vy_log_record::gc_lsn replaced with the signature
+ * of the vylog file the record will be written to. We need it so as
+ * to make sure we write the current vylog signature (not the previous
+ * one) when compaction completion races with vylog rotation. Writing
+ * the previous vylog signature would result in premature run file
+ * collection.
+ */
+enum { VY_LOG_GC_LSN_CURRENT = -1 };
+
 /** Record in the metadata log. */
 struct vy_log_record {
 	/** Type of the record. */
@@ -273,7 +283,7 @@ struct vy_log_record {
 	int64_t gc_lsn;
 	/** For runs: number of dumps it took to create the run. */
 	uint32_t dump_count;
-	/** Link in vy_log::tx. */
+	/** Link in vy_log_tx::records. */
 	struct stailq_entry in_tx;
 };
 
@@ -510,6 +520,8 @@ vy_log_tx_commit(void);
  * buffered records to disk, but in case of failure pending records
  * are not expunged from the buffer, so that the next transaction
  * will retry to flush them.
+ *
+ * In contrast to vy_log_tx_commit(), this function doesn't yield.
  */
 void
 vy_log_tx_try_commit(void);
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 0180331e..85c1659b 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -1503,9 +1503,8 @@ vy_task_compaction_complete(struct vy_task *task)
 		if (slice == last_slice)
 			break;
 	}
-	int64_t gc_lsn = vy_log_signature();
 	rlist_foreach_entry(run, &unused_runs, in_unused)
-		vy_log_drop_run(run->id, gc_lsn);
+		vy_log_drop_run(run->id, VY_LOG_GC_LSN_CURRENT);
 	if (new_slice != NULL) {
 		vy_log_create_run(lsm->id, new_run->id, new_run->dump_lsn,
 				  new_run->dump_count);
@@ -1530,7 +1529,7 @@ vy_task_compaction_complete(struct vy_task *task)
 	 * next checkpoint.
 	 */
 	rlist_foreach_entry(run, &unused_runs, in_unused) {
-		if (run->dump_lsn > gc_lsn)
+		if (run->dump_lsn > vy_log_signature())
 			vy_run_remove_files(lsm->env->path, lsm->space_id,
 					    lsm->index_id, run->id);
 	}
-- 
2.11.0




More information about the Tarantool-patches mailing list