[PATCH 10/10] vinyl: get rid of the latch protecting vylog buffer

Vladimir Davydov vdavydov.dev at gmail.com
Fri May 17 17:52:44 MSK 2019


The latch is needed solely to synchronize access to the transaction
write buffer, which is shared by all transactions. We don't need it to
sync vylog readers vs writers as everything is done from the same
thread. So to get rid of the latch, which is a prerequisite for
transactional DDL, as it makes even simply Vinyl DDL operations
yielding, we need to rework the buffer management.

To achieve that, this patch introduces a separate object for each vylog
transaction, which stores the list of records to be committed. Once a
transaction is ready to be committed, it is sent to the vylog thread,
which writes it to vylog and then wakes up the awaiting fiber. For
non-discardable transactions, it doesn't wakeup any fiber, the fiber
returns immediately and the transaction is freed automatically.

To allow that, transactions are now allocated on lsregion, which acts
as a queue: new transactions are appended to the tail, committed
transactions are popped from the head.
---
 src/box/vy_log.c | 261 ++++++++++++++++++++++++++++++++++---------------------
 src/box/vy_log.h |   8 +-
 2 files changed, 169 insertions(+), 100 deletions(-)

diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index 7157bbf3..ca0f48c3 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -41,6 +41,7 @@
 #include <unistd.h>
 
 #include <msgpuck/msgpuck.h>
+#include <small/lsregion.h>
 #include <small/mempool.h>
 #include <small/region.h>
 #include <small/rlist.h>
@@ -53,7 +54,7 @@
 #include "fiber.h"
 #include "iproto_constants.h" /* IPROTO_INSERT */
 #include "key_def.h"
-#include "latch.h"
+#include "memory.h"
 #include "replication.h" /* INSTANCE_UUID */
 #include "salad/stailq.h"
 #include "say.h"
@@ -129,6 +130,31 @@ static const char *vy_log_type_name[] = {
 	[VY_LOG_ABORT_REBOOTSTRAP]	= "abort_rebootstrap",
 };
 
+/** Metadata transaction. */
+struct vy_log_tx {
+	/** Message sent to vylog thread via cbus. */
+	struct cmsg cmsg;
+	/**
+	 * Unique, monotonically growing identifier.
+	 * Initialized with vy_log::last_tx_id.
+	 */
+	int64_t id;
+	/** List of records, linked by vy_log_record::in_tx. */
+	struct stailq records;
+	/**
+	 * Fiber waiting for transaction completion or NULL
+	 * if the transaction is non-discardable and hence
+	 * there's no need to wait for it - it will stay in
+	 * the xlog buffer on failure and gets flushed along
+	 * on the next write.
+	 */
+	struct fiber *fiber;
+	/** Set to true if write failed. */
+	bool failed;
+	/** Write error information. */
+	struct diag diag;
+};
+
 /** Metadata log object. */
 struct vy_log {
 	/**
@@ -141,35 +167,20 @@ struct vy_log {
 	struct vclock last_checkpoint;
 	/** Recovery context. */
 	struct vy_recovery *recovery;
-	/** Latch protecting the log buffer. */
-	struct latch latch;
 	/**
 	 * Next ID to use for a vinyl object.
 	 * Used by vy_log_next_id().
 	 */
 	int64_t next_id;
-	/** A region of struct vy_log_record entries. */
-	struct region pool;
 	/**
-	 * Records awaiting to be written to disk.
-	 * Linked by vy_log_record::in_tx;
+	 * Memory pool for allocating transaction records.
+	 * Committed transactions are collected by id.
 	 */
-	struct stailq tx;
-	/**
-	 * Flag set if vy_log_write() failed.
-	 *
-	 * It indicates that that the current transaction must be
-	 * aborted on vy_log_commit(). Thanks to this flag, we don't
-	 * need to add error handling code after each invocation of
-	 * vy_log_write(), instead we only check vy_log_commit()
-	 * return code.
-	 */
-	bool tx_failed;
-	/**
-	 * Diagnostic area where vy_log_write() error is stored,
-	 * only relevant if @tx_failed is set.
-	 */
-	struct diag tx_diag;
+	struct lsregion tx_pool;
+	/** Current transaction or NULL. */
+	struct vy_log_tx *tx;
+	/** Identifier of the last started transaction. */
+	int64_t last_tx_id;
 	/** Vylog file. */
 	struct xlog xlog;
 	/** Vylog IO thread. */
@@ -754,10 +765,7 @@ vy_log_init(const char *dir)
 {
 	xdir_create(&vy_log.dir, dir, VYLOG, &INSTANCE_UUID,
 		    &xlog_opts_default);
-	latch_create(&vy_log.latch);
-	region_create(&vy_log.pool, cord_slab_cache());
-	stailq_create(&vy_log.tx);
-	diag_create(&vy_log.tx_diag);
+	lsregion_create(&vy_log.tx_pool, &runtime);
 	xlog_clear(&vy_log.xlog);
 
 	if (cord_costart(&vy_log.cord, "vinyl.log", vy_log_thread_f, NULL) != 0)
@@ -766,18 +774,14 @@ vy_log_init(const char *dir)
 	cpipe_create(&vy_log.pipe, "vylog");
 }
 
-struct vy_log_flush_msg {
-	struct cbus_call_msg base;
-	bool no_discard;
-};
-
-static int
-vy_log_flush_f(struct cbus_call_msg *base)
+static void
+vy_log_tx_write_f(struct cmsg *msg)
 {
-	struct vy_log_flush_msg *msg = (struct vy_log_flush_msg *)base;
+	struct vy_log_tx *tx = (struct vy_log_tx *)msg;
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	struct xlog *xlog = &vy_log.xlog;
+	bool no_discard = (tx->fiber == NULL);
 
 	if (!xlog_is_open(xlog)) {
 		if (vy_log_open() < 0)
@@ -787,7 +791,7 @@ vy_log_flush_f(struct cbus_call_msg *base)
 	/* Encode the transaction and write it to the xlog buffer. */
 	xlog_tx_begin(xlog);
 	struct vy_log_record *record;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx) {
+	stailq_foreach_entry(record, &tx->records, in_tx) {
 		struct xrow_header row;
 		if (vy_log_record_encode(record, &row) < 0) {
 			xlog_tx_rollback(xlog);
@@ -801,7 +805,7 @@ vy_log_flush_f(struct cbus_call_msg *base)
 	}
 
 	/* Flush the xlog buffer to disk. */
-	if (msg->no_discard)
+	if (no_discard)
 		xlog_tx_set_rollback_svp(xlog);
 
 	int rc = 0;
@@ -815,7 +819,7 @@ vy_log_flush_f(struct cbus_call_msg *base)
 	if (rc >= 0)
 		rc = xlog_flush(xlog);
 
-	if (rc < 0 && msg->no_discard) {
+	if (rc < 0 && no_discard) {
 		/*
 		 * The message got appended to the xlog buffer so
 		 * it will get flushed along with the next message.
@@ -827,10 +831,13 @@ vy_log_flush_f(struct cbus_call_msg *base)
 		say_warn("failed to flush vylog: %s", e->errmsg);
 		rc = 0;
 	}
+	if (rc < 0)
+		goto fail;
+out:
 	region_truncate(region, region_svp);
-	return rc < 0 ? -1 : 0;
+	return;
 fail:
-	if (msg->no_discard) {
+	if (no_discard) {
 		/*
 		 * The caller doesn't tolerate failures so we have
 		 * no choice but panic. Good news is this shouldn't
@@ -843,36 +850,69 @@ fail:
 		diag_log();
 		panic("non-discardable vylog transaction failed");
 	}
-	region_truncate(region, region_svp);
-	return -1;
+	tx->failed = true;
+	diag_move(diag_get(), &tx->diag);
+	goto out;
+}
+
+static void
+vy_log_tx_free(struct vy_log_tx *tx)
+{
+	diag_destroy(&tx->diag);
+	lsregion_gc(&vy_log.tx_pool, tx->id);
+}
+
+static void
+vy_log_tx_write_complete_f(struct cmsg *cmsg)
+{
+	struct vy_log_tx *tx = container_of(cmsg, struct vy_log_tx, cmsg);
+	if (tx->fiber == NULL) {
+		/*
+		 * Nobody's waiting for the transaction.
+		 * It's our responsibility to free memory.
+		 */
+		vy_log_tx_free(tx);
+	} else {
+		fiber_wakeup(tx->fiber);
+	}
 }
 
-/**
- * Try to flush the log buffer to disk.
- *
- * We always flush the entire vy_log buffer as a single xlog
- * transaction, since we do not track boundaries of @no_discard
- * buffered transactions, and want to avoid a partial write.
- */
 static int
-vy_log_flush(bool no_discard)
+vy_log_tx_write(bool no_discard)
 {
-	if (stailq_empty(&vy_log.tx))
-		return 0; /* nothing to do */
+	struct vy_log_tx *tx = vy_log.tx;
+	assert(tx != NULL);
+	vy_log.tx = NULL;
 
 	/*
 	 * Do actual disk writes on behalf of the vylog thread
 	 * so as not to block the tx thread.
 	 */
-	struct vy_log_flush_msg msg;
-	msg.no_discard = no_discard;
+	static const struct cmsg_hop route[2] = {
+		{ vy_log_tx_write_f, &vy_log.tx_pipe },
+		{ vy_log_tx_write_complete_f, NULL },
+	};
+	cmsg_init(&tx->cmsg, route);
+	tx->fiber = no_discard ? NULL : fiber();
+	cpipe_push(&vy_log.pipe, &tx->cmsg);
+
+	/*
+	 * Non-discardable transactions can't fail so no need to wait
+	 * for it to complete. Memory will be freed on completion.
+	 */
+	if (no_discard)
+		return 0;
+
 	bool cancellable = fiber_set_cancellable(false);
-	int rc = cbus_call(&vy_log.pipe, &vy_log.tx_pipe, &msg.base,
-			   vy_log_flush_f, NULL, TIMEOUT_INFINITY);
+	fiber_yield();
 	fiber_set_cancellable(cancellable);
 
-	region_free(&vy_log.pool);
-	stailq_create(&vy_log.tx);
+	int rc = 0;
+	if (tx->failed) {
+		diag_move(&tx->diag, diag_get());
+		rc = -1;
+	}
+	vy_log_tx_free(tx);
 	return rc;
 }
 
@@ -883,8 +923,7 @@ vy_log_free(void)
 	if (cord_join(&vy_log.cord) != 0)
 		panic_syserror("failed to join vinyl log thread");
 	xdir_destroy(&vy_log.dir);
-	region_destroy(&vy_log.pool);
-	diag_destroy(&vy_log.tx_diag);
+	lsregion_destroy(&vy_log.tx_pool);
 }
 
 /**
@@ -1058,26 +1097,34 @@ vy_log_begin_recovery(const struct vclock *vclock)
 	return recovery;
 }
 
-int
-vy_log_end_recovery(void)
+static int
+vy_log_flush_recovered_records(void)
 {
-	assert(vy_log.recovery != NULL);
+	if (vy_log.tx == NULL)
+		return 0;
 
 	/*
 	 * Update the recovery context with records written during
 	 * recovery - we will need them for garbage collection.
 	 */
 	struct vy_log_record *record;
-	stailq_foreach_entry(record, &vy_log.tx, in_tx)
+	stailq_foreach_entry(record, &vy_log.tx->records, in_tx)
 		vy_recovery_process_record(vy_log.recovery, record);
 
 	/* Flush all pending records. */
-	if (vy_log_flush(false) != 0) {
+	if (vy_log_tx_write(false) != 0) {
 		diag_log();
 		say_error("failed to flush vylog after recovery");
 		return -1;
 	}
+	return 0;
+}
 
+int
+vy_log_end_recovery(void)
+{
+	assert(vy_log.recovery != NULL);
+	vy_log_flush_recovered_records();
 	xdir_collect_inprogress(&vy_log.dir);
 	vy_log.recovery = NULL;
 	return 0;
@@ -1110,6 +1157,8 @@ vy_log_rotate_f(struct cbus_call_msg *base)
 	 */
 	if (xlog_is_open(&vy_log.xlog))
 		xlog_close(&vy_log.xlog, false);
+
+	vclock_copy(&vy_log.last_checkpoint, &msg->vclock);
 	return 0;
 }
 
@@ -1134,17 +1183,16 @@ vy_log_rotate(const struct vclock *vclock)
 		    (long long)prev_signature, (long long)signature);
 
 	/*
-	 * Lock out all concurrent log writers while we are rotating it.
-	 * This effectively stalls the vinyl scheduler for a while, but
+	 * Do actual work from vylog thread so as not to stall tx thread.
+	 * Note, both rotation and writes are done by the same thread so
+	 * this effectively stalls the vinyl scheduler for a while, but
 	 * this is acceptable, because (1) the log file is small and
 	 * hence can be rotated fairly quickly so the stall isn't going
 	 * to take too long and (2) dumps/compactions, which are scheduled
 	 * by the scheduler, are rare events so there shouldn't be too
 	 * many of them piling up due to log rotation.
 	 */
-	latch_lock(&vy_log.latch);
 
-	/* Do actual work from coio so as not to stall tx thread. */
 	struct vy_log_rotate_msg msg;
 	vclock_copy(&msg.vclock, vclock);
 
@@ -1154,19 +1202,13 @@ vy_log_rotate(const struct vclock *vclock)
 	fiber_set_cancellable(cancellable);
 
 	if (rc != 0)
-		goto fail;
-
-	vclock_copy(&vy_log.last_checkpoint, vclock);
+		return -1;
 
 	/* Add the new vclock to the xdir so that we can track it. */
 	xdir_add_vclock(&vy_log.dir, vclock);
 
-	latch_unlock(&vy_log.latch);
 	say_verbose("done rotating vylog");
 	return 0;
-fail:
-	latch_unlock(&vy_log.latch);
-	return -1;
 }
 
 void
@@ -1201,8 +1243,26 @@ vy_log_backup_path(const struct vclock *vclock)
 void
 vy_log_tx_begin(void)
 {
-	latch_lock(&vy_log.latch);
-	vy_log.tx_failed = false;
+	if (vy_log.tx != NULL) {
+		/*
+		 * We don't commit vylog transactions until recovery
+		 * is complete, see vy_log_tx_do_commit().
+		 */
+		assert(vy_log.recovery != NULL);
+		goto done;
+	}
+	struct vy_log_tx *tx = lsregion_alloc(&vy_log.tx_pool, sizeof(*tx),
+					      ++vy_log.last_tx_id);
+	if (tx == NULL)
+		panic("failed to allocate memory for vylog transaction");
+
+	tx->id = vy_log.last_tx_id;
+	stailq_create(&tx->records);
+	diag_create(&tx->diag);
+	tx->failed = false;
+	tx->fiber = NULL;
+	vy_log.tx = tx;
+done:
 	say_verbose("begin vylog transaction");
 }
 
@@ -1216,18 +1276,21 @@ vy_log_tx_begin(void)
 static int
 vy_log_tx_do_commit(bool no_discard)
 {
-	assert(latch_owner(&vy_log.latch) == fiber());
+	struct vy_log_tx *tx = vy_log.tx;
+	assert(tx != NULL);
 
-	if (vy_log.tx_failed) {
+	if (tx->failed) {
 		/*
 		 * vy_log_write() failed to append a record to tx.
 		 * @no_discard transactions can't handle this.
 		 */
-		diag_move(&vy_log.tx_diag, diag_get());
+		diag_move(&tx->diag, diag_get());
 		if (no_discard) {
 			diag_log();
 			panic("non-discardable vylog transaction failed");
 		}
+		vy_log_tx_free(tx);
+		vy_log.tx = NULL;
 		goto fail;
 	}
 
@@ -1240,15 +1303,13 @@ vy_log_tx_do_commit(bool no_discard)
 	if (vy_log.recovery != NULL)
 		goto done;
 
-	if (vy_log_flush(no_discard) != 0)
+	if (vy_log_tx_write(no_discard) != 0)
 		goto fail;
 done:
 	say_verbose("commit vylog transaction");
-	latch_unlock(&vy_log.latch);
 	return 0;
 fail:
 	say_verbose("rollback vylog transaction");
-	latch_unlock(&vy_log.latch);
 	return -1;
 }
 
@@ -1265,21 +1326,29 @@ vy_log_tx_try_commit(void)
 		unreachable();
 }
 
+static void *
+vy_log_tx_alloc_cb(void *ctx, size_t size)
+{
+	(void)ctx;
+	return lsregion_alloc(&vy_log.tx_pool, size, vy_log.last_tx_id);
+}
+
 void
 vy_log_write(const struct vy_log_record *record)
 {
-	assert(latch_owner(&vy_log.latch) == fiber());
+	struct vy_log_tx *tx = vy_log.tx;
+	assert(tx != NULL);
 
 	struct vy_log_record *tx_record = vy_log_record_dup(record,
-					region_alloc_cb, &vy_log.pool);
+					vy_log_tx_alloc_cb, NULL);
 	if (tx_record == NULL) {
-		diag_move(diag_get(), &vy_log.tx_diag);
-		vy_log.tx_failed = true;
+		diag_move(diag_get(), &tx->diag);
+		tx->failed = true;
 		return;
 	}
 
 	say_verbose("write vylog record: %s", vy_log_record_str(tx_record));
-	stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx);
+	stailq_add_tail_entry(&tx->records, tx_record, in_tx);
 }
 
 /**
@@ -2382,9 +2451,11 @@ vy_recovery_new_f(struct cbus_call_msg *base)
 struct vy_recovery *
 vy_recovery_new(int64_t signature, int flags)
 {
-	/* Lock out concurrent writers while we are loading the log. */
-	latch_lock(&vy_log.latch);
-
+	/*
+	 * No need to lock out concurrent writers as loading is
+	 * done by vylog thread, the same thread that processes
+	 * writes.
+	 */
 	struct vy_recovery_msg msg;
 	msg.signature = signature;
 	msg.flags = flags;
@@ -2396,13 +2467,9 @@ vy_recovery_new(int64_t signature, int flags)
 	fiber_set_cancellable(cancellable);
 
 	if (rc != 0)
-		goto fail;
+		return NULL;
 
-	latch_unlock(&vy_log.latch);
 	return msg.recovery;
-fail:
-	latch_unlock(&vy_log.latch);
-	return NULL;
 }
 
 void
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index bd6a4a0d..4fb6da33 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -273,7 +273,7 @@ struct vy_log_record {
 	int64_t gc_lsn;
 	/** For runs: number of dumps it took to create the run. */
 	uint32_t dump_count;
-	/** Link in vy_log::tx. */
+	/** Link in vy_log_tx::records. */
 	struct stailq_entry in_tx;
 };
 
@@ -482,7 +482,8 @@ vy_log_tx_begin(void);
  * Commit a transaction started with vy_log_tx_begin().
  *
  * This function flushes all buffered records to disk. If it fails,
- * all records of the current transaction are discarded.
+ * all records of the current transaction are discarded. Note, it
+ * waits for the transaction to be written, i.e. yields.
  *
  * See also vy_log_tx_try_commit().
  *
@@ -497,7 +498,8 @@ vy_log_tx_commit(void);
  * Similarly to vy_log_tx_commit(), this function tries to write all
  * buffered records to disk, but in case of failure pending records
  * are not expunged from the buffer, so that the next transaction
- * will retry to flush them.
+ * will retry to flush them. In contrast to vy_log_tx_commit(),
+ * this function doesn't yield.
  */
 void
 vy_log_tx_try_commit(void);
-- 
2.11.0




More information about the Tarantool-patches mailing list