* [PATCH] vinyl: don't yield in on_commit and on_rollback triggers
@ 2019-06-15 18:00 Vladimir Davydov
2019-06-19 18:23 ` [tarantool-patches] " Konstantin Osipov
0 siblings, 1 reply; 3+ messages in thread
From: Vladimir Davydov @ 2019-06-15 18:00 UTC (permalink / raw)
To: tarantool-patches
To apply replicated rows in parallel, we need to be able to complete
transactions asynchronously, from the tx_prio callback. We can't yield
there so we must ensure that on_commit/on_rollback triggers don't yield.
The only place where we may yield in a completion trigger is vinyl DDL,
which submits vylog records and waits for them to complete.
Actually, there's no reason to wait for vylog write to complete, as we
can handle missing records on recovery. So this patch reworks vylog to
make vy_log_tx_try_commit() and hence on_commit/on_rollback triggers
using it non-yielding.
To achieve that, we need to:
- Use vy_log.latch only to sync log rotation vs writes. Don't protect
vylog buffer with it. This makes vy_log_tx_begin() non-yielding.
- Use a separate list and buffer for storing vylog records of each
transaction. We used to share them among transactions, but without
vy_log.latch we can't sync access to them anymore. Since vylog
transactions are rare events, this should be fine.
- Make vy_log_tx_try_commit() append the transaction to the list of
pending transactions and wake up a background fiber to flush all
pending transactions. This way it doesn't need to yield.
Closes #4218
---
https://github.com/tarantool/tarantool/issues/4218
https://github.com/tarantool/tarantool/commits/dv/gh-4218-vy-remove-yields-from-ddl-commit
src/box/vy_log.c | 289 +++++++++++++++++++++++++++++++++----------------
src/box/vy_log.h | 14 ++-
src/box/vy_scheduler.c | 5 +-
3 files changed, 211 insertions(+), 97 deletions(-)
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index bdc1cfa3..ae1d6234 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -51,6 +51,7 @@
#include "errcode.h"
#include "errinj.h"
#include "fiber.h"
+#include "fiber_cond.h"
#include "iproto_constants.h" /* IPROTO_INSERT */
#include "key_def.h"
#include "latch.h"
@@ -130,6 +131,16 @@ static const char *vy_log_type_name[] = {
[VY_LOG_ABORT_REBOOTSTRAP] = "abort_rebootstrap",
};
+/** Batch of vylog records that must be written in one go. */
+struct vy_log_tx {
+ /** Link in vy_log::pending_tx. */
+ struct stailq_entry in_pending;
+ /** Region used for allocating records. */
+ struct region region;
+ /** List of records, linked by vy_log_record::in_tx. */
+ struct stailq records;
+};
+
/** Metadata log object. */
struct vy_log {
/**
@@ -142,27 +153,30 @@ struct vy_log {
struct vclock last_checkpoint;
/** Recovery context. */
struct vy_recovery *recovery;
- /** Latch protecting the log buffer. */
+ /**
+ * Latch that syncs log writers against readers.
+ * Needed so that we don't miss any records during
+ * log rotation.
+ */
struct latch latch;
+ /** Background fiber flushing pending transactions. */
+ struct fiber *flusher;
+ /** Condition variable used for signalling the flusher. */
+ struct fiber_cond flusher_cond;
/**
* Next ID to use for a vinyl object.
* Used by vy_log_next_id().
*/
int64_t next_id;
- /** A region of struct vy_log_record entries. */
- struct region pool;
+ /** Pool of vy_log_tx objects. */
+ struct mempool tx_pool;
+ /** Current transaction or NULL. */
+ struct vy_log_tx *tx;
/**
- * Records awaiting to be written to disk.
- * Linked by vy_log_record::in_tx;
+ * List of transactions awaiting to be flushed to disk,
+ * linked by vy_log_tx::in_pending.
*/
- struct stailq tx;
- /** Start of the current transaction in the pool, for rollback */
- size_t tx_svp;
- /**
- * Last record in the queue at the time when the current
- * transaction was started. Used for rollback.
- */
- struct stailq_entry *tx_begin;
+ struct stailq pending_tx;
/**
* Flag set if vy_log_write() failed.
*
@@ -181,6 +195,9 @@ struct vy_log {
};
static struct vy_log vy_log;
+static int
+vy_log_flusher_func(va_list va);
+
static struct vy_recovery *
vy_recovery_new_locked(int64_t signature, int flags);
@@ -737,23 +754,47 @@ vy_log_init(const char *dir)
xdir_create(&vy_log.dir, dir, VYLOG, &INSTANCE_UUID,
&xlog_opts_default);
latch_create(&vy_log.latch);
- region_create(&vy_log.pool, cord_slab_cache());
- stailq_create(&vy_log.tx);
+ mempool_create(&vy_log.tx_pool, cord_slab_cache(),
+ sizeof(struct vy_log_tx));
+ stailq_create(&vy_log.pending_tx);
diag_create(&vy_log.tx_diag);
wal_init_vy_log();
+ fiber_cond_create(&vy_log.flusher_cond);
+ vy_log.flusher = fiber_new("vinyl.vylog_flusher",
+ vy_log_flusher_func);
+ if (vy_log.flusher == NULL)
+ panic("failed to allocate vylog flusher fiber");
+ fiber_wakeup(vy_log.flusher);
+}
+
+static struct vy_log_tx *
+vy_log_tx_new(void)
+{
+ struct vy_log_tx *tx = mempool_alloc(&vy_log.tx_pool);
+ if (tx == NULL) {
+ diag_set(OutOfMemory, sizeof(*tx), "mempool", "vy log tx");
+ return NULL;
+ }
+ region_create(&tx->region, cord_slab_cache());
+ stailq_create(&tx->records);
+ tx->in_pending.next = NULL;
+ return tx;
+}
+
+static void
+vy_log_tx_delete(struct vy_log_tx *tx)
+{
+ region_destroy(&tx->region);
+ mempool_free(&vy_log.tx_pool, tx);
}
/**
- * Try to flush the log buffer to disk.
- *
- * We always flush the entire vy_log buffer as a single xlog
- * transaction, since we do not track boundaries of @no_discard
- * buffered transactions, and want to avoid a partial write.
+ * Write a given transaction to disk.
*/
static int
-vy_log_flush(void)
+vy_log_tx_flush(struct vy_log_tx *tx)
{
- if (stailq_empty(&vy_log.tx))
+ if (stailq_empty(&tx->records))
return 0; /* nothing to do */
ERROR_INJECT(ERRINJ_VY_LOG_FLUSH, {
@@ -768,7 +809,7 @@ vy_log_flush(void)
int tx_size = 0;
struct vy_log_record *record;
- stailq_foreach_entry(record, &vy_log.tx, in_tx)
+ stailq_foreach_entry(record, &tx->records, in_tx)
tx_size++;
size_t used = region_used(&fiber()->gc);
@@ -787,7 +828,9 @@ vy_log_flush(void)
* Encode buffered records.
*/
int i = 0;
- stailq_foreach_entry(record, &vy_log.tx, in_tx) {
+ stailq_foreach_entry(record, &tx->records, in_tx) {
+ if (record->gc_lsn == VY_LOG_GC_LSN_CURRENT)
+ record->gc_lsn = vy_log_signature();
assert(i < tx_size);
struct xrow_header *row = &rows[i];
if (vy_log_record_encode(record, row) < 0)
@@ -804,9 +847,6 @@ vy_log_flush(void)
if (wal_write_vy_log(entry) != 0)
goto err;
- /* Success. Free flushed records. */
- region_reset(&vy_log.pool);
- stailq_create(&vy_log.tx);
region_truncate(&fiber()->gc, used);
return 0;
err:
@@ -814,11 +854,78 @@ err:
return -1;
}
+/**
+ * Write all pending transaction to disk.
+ */
+static int
+vy_log_flush(void)
+{
+ /*
+ * vy_log_tx_try_commit() can add a new transaction to
+ * the list while we are writing to disk. This is okay -
+ * we'll flush it next time. If we fail, we put remaining
+ * transactions back to the head of the list to preserve
+ * the commit order.
+ */
+ struct stailq pending;
+ stailq_create(&pending);
+ stailq_concat(&pending, &vy_log.pending_tx);
+
+ int rc = 0;
+ while (!stailq_empty(&pending)) {
+ struct vy_log_tx *tx = stailq_first_entry(&pending,
+ struct vy_log_tx, in_pending);
+ rc = vy_log_tx_flush(tx);
+ if (rc != 0)
+ break;
+ stailq_shift(&pending);
+ vy_log_tx_delete(tx);
+ }
+ stailq_concat(&pending, &vy_log.pending_tx);
+ stailq_concat(&vy_log.pending_tx, &pending);
+ return rc;
+}
+
+static int
+vy_log_flusher_func(va_list va)
+{
+ (void)va;
+ while (!fiber_is_cancelled()) {
+ /*
+ * Disable writes during local recovery.
+ * See vy_log_tx_commit().
+ */
+ if (vy_log.recovery != NULL ||
+ stailq_empty(&vy_log.pending_tx)) {
+ fiber_cond_wait(&vy_log.flusher_cond);
+ continue;
+ }
+ latch_lock(&vy_log.latch);
+ int rc = vy_log_flush();
+ latch_unlock(&vy_log.latch);
+ if (rc != 0) {
+ diag_log();
+ say_error("failed to flush vylog");
+ /*
+ * Don't retry immediately after a failure
+ * since the next write is likely to fail
+ * as well. Instead wait for the next signal.
+ */
+ fiber_cond_wait(&vy_log.flusher_cond);
+ }
+ }
+ return 0;
+}
+
void
vy_log_free(void)
{
+ struct vy_log_tx *tx, *next_tx;
+ stailq_foreach_entry_safe(tx, next_tx, &vy_log.pending_tx, in_pending)
+ vy_log_tx_delete(tx);
+ stailq_create(&vy_log.pending_tx);
+ mempool_destroy(&vy_log.tx_pool);
xdir_destroy(&vy_log.dir);
- region_destroy(&vy_log.pool);
diag_destroy(&vy_log.tx_diag);
}
@@ -998,9 +1105,12 @@ vy_log_end_recovery(void)
* Update the recovery context with records written during
* recovery - we will need them for garbage collection.
*/
- struct vy_log_record *record;
- stailq_foreach_entry(record, &vy_log.tx, in_tx)
- vy_recovery_process_record(vy_log.recovery, record);
+ struct vy_log_tx *tx;
+ stailq_foreach_entry(tx, &vy_log.pending_tx, in_pending) {
+ struct vy_log_record *record;
+ stailq_foreach_entry(record, &tx->records, in_tx)
+ vy_recovery_process_record(vy_log.recovery, record);
+ }
/* Flush all pending records. */
if (vy_log_flush() < 0) {
@@ -1123,102 +1233,95 @@ vy_log_backup_path(const struct vclock *vclock)
void
vy_log_tx_begin(void)
{
- latch_lock(&vy_log.latch);
- vy_log.tx_begin = stailq_last(&vy_log.tx);
- vy_log.tx_svp = region_used(&vy_log.pool);
- vy_log.tx_failed = false;
+ assert(!vy_log.tx_failed);
+ assert(vy_log.tx == NULL);
+ vy_log.tx = vy_log_tx_new();
+ if (vy_log.tx == NULL) {
+ diag_move(diag_get(), &vy_log.tx_diag);
+ vy_log.tx_failed = true;
+ }
say_verbose("begin vylog transaction");
}
-/**
- * Commit a transaction started with vy_log_tx_begin().
- *
- * If @no_discard is set, pending records won't be expunged from the
- * buffer on failure, so that the next transaction will retry to write
- * them to disk.
- */
-static int
-vy_log_tx_do_commit(bool no_discard)
+int
+vy_log_tx_commit(void)
{
- struct stailq rollback;
-
- assert(latch_owner(&vy_log.latch) == fiber());
-
- if (vy_log.tx_failed) {
- /*
- * vy_log_write() failed to append a record to tx.
- * @no_discard transactions can't handle this.
- */
- diag_move(&vy_log.tx_diag, diag_get());
- if (no_discard) {
- diag_log();
- panic("non-discardable vylog transaction failed");
- }
- goto rollback;
- }
-
/*
* During recovery, we may replay records we failed to commit
* before restart (e.g. drop LSM tree). Since the log isn't open
* yet, simply leave them in the tx buffer to be flushed upon
* recovery completion.
*/
- if (vy_log.recovery != NULL)
- goto done;
+ if (vy_log.recovery != NULL) {
+ vy_log_tx_try_commit();
+ return 0;
+ }
+
+ struct vy_log_tx *tx = vy_log.tx;
+ vy_log.tx = NULL;
- if (vy_log_flush() != 0) {
- if (!no_discard)
- goto rollback;
- /*
- * We were told not to discard the transaction on
- * failure so just warn and leave it in the buffer.
- */
- struct error *e = diag_last_error(diag_get());
- say_warn("failed to flush vylog: %s", e->errmsg);
+ if (vy_log.tx_failed) {
+ diag_move(&vy_log.tx_diag, diag_get());
+ vy_log.tx_failed = false;
+ if (tx != NULL)
+ vy_log_tx_delete(tx);
+ goto err;
}
-done:
- say_verbose("commit vylog transaction");
+ assert(tx != NULL);
+ /*
+ * Before writing this transaction, flush all pending ones
+ * if any, because they were committed first.
+ */
+ latch_lock(&vy_log.latch);
+ int rc = vy_log_flush();
+ if (rc == 0)
+ rc = vy_log_tx_flush(tx);
latch_unlock(&vy_log.latch);
+
+ vy_log_tx_delete(tx);
+ if (rc != 0)
+ goto err;
+
+ say_verbose("commit vylog transaction");
return 0;
-
-rollback:
- stailq_cut_tail(&vy_log.tx, vy_log.tx_begin, &rollback);
- region_truncate(&vy_log.pool, vy_log.tx_svp);
- vy_log.tx_svp = 0;
+err:
say_verbose("rollback vylog transaction");
- latch_unlock(&vy_log.latch);
return -1;
}
-int
-vy_log_tx_commit(void)
-{
- return vy_log_tx_do_commit(false);
-}
-
void
vy_log_tx_try_commit(void)
{
- if (vy_log_tx_do_commit(true) != 0)
- unreachable();
+ if (vy_log.tx_failed) {
+ diag_move(&vy_log.tx_diag, diag_get());
+ diag_log();
+ panic("non-discardable vylog transaction failed");
+ }
+ assert(vy_log.tx != NULL);
+ stailq_add_tail_entry(&vy_log.pending_tx, vy_log.tx, in_pending);
+ fiber_cond_signal(&vy_log.flusher_cond);
+ vy_log.tx = NULL;
+ say_verbose("commit vylog transaction");
}
void
vy_log_write(const struct vy_log_record *record)
{
- assert(latch_owner(&vy_log.latch) == fiber());
+ say_verbose("write vylog record: %s", vy_log_record_str(record));
- struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.pool,
+ if (vy_log.tx_failed)
+ return;
+
+ assert(vy_log.tx != NULL);
+ struct vy_log_record *tx_record = vy_log_record_dup(&vy_log.tx->region,
record);
if (tx_record == NULL) {
diag_move(diag_get(), &vy_log.tx_diag);
vy_log.tx_failed = true;
return;
}
-
- say_verbose("write vylog record: %s", vy_log_record_str(tx_record));
- stailq_add_tail_entry(&vy_log.tx, tx_record, in_tx);
+ stailq_add_tail_entry(&vy_log.tx->records, tx_record, in_tx);
}
/**
diff --git a/src/box/vy_log.h b/src/box/vy_log.h
index ee38c193..298a8ed4 100644
--- a/src/box/vy_log.h
+++ b/src/box/vy_log.h
@@ -221,6 +221,16 @@ enum vy_log_record_type {
vy_log_record_type_MAX
};
+/**
+ * Special value of vy_log_record::gc_lsn replaced with the signature
+ * of the vylog file the record will be written to. We need it so as
+ * to make sure we write the current vylog signature (not the previous
+ * one) when compaction completion races with vylog rotation. Writing
+ * the previous vylog signature would result in premature run file
+ * collection.
+ */
+enum { VY_LOG_GC_LSN_CURRENT = -1 };
+
/** Record in the metadata log. */
struct vy_log_record {
/** Type of the record. */
@@ -273,7 +283,7 @@ struct vy_log_record {
int64_t gc_lsn;
/** For runs: number of dumps it took to create the run. */
uint32_t dump_count;
- /** Link in vy_log::tx. */
+ /** Link in vy_log_tx::records. */
struct stailq_entry in_tx;
};
@@ -510,6 +520,8 @@ vy_log_tx_commit(void);
* buffered records to disk, but in case of failure pending records
* are not expunged from the buffer, so that the next transaction
* will retry to flush them.
+ *
+ * In contrast to vy_log_tx_commit(), this function doesn't yield.
*/
void
vy_log_tx_try_commit(void);
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 0180331e..85c1659b 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -1503,9 +1503,8 @@ vy_task_compaction_complete(struct vy_task *task)
if (slice == last_slice)
break;
}
- int64_t gc_lsn = vy_log_signature();
rlist_foreach_entry(run, &unused_runs, in_unused)
- vy_log_drop_run(run->id, gc_lsn);
+ vy_log_drop_run(run->id, VY_LOG_GC_LSN_CURRENT);
if (new_slice != NULL) {
vy_log_create_run(lsm->id, new_run->id, new_run->dump_lsn,
new_run->dump_count);
@@ -1530,7 +1529,7 @@ vy_task_compaction_complete(struct vy_task *task)
* next checkpoint.
*/
rlist_foreach_entry(run, &unused_runs, in_unused) {
- if (run->dump_lsn > gc_lsn)
+ if (run->dump_lsn > vy_log_signature())
vy_run_remove_files(lsm->env->path, lsm->space_id,
lsm->index_id, run->id);
}
--
2.11.0
^ permalink raw reply [flat|nested] 3+ messages in thread
* [tarantool-patches] Re: [PATCH] vinyl: don't yield in on_commit and on_rollback triggers
2019-06-15 18:00 [PATCH] vinyl: don't yield in on_commit and on_rollback triggers Vladimir Davydov
@ 2019-06-19 18:23 ` Konstantin Osipov
2019-06-20 11:18 ` Vladimir Davydov
0 siblings, 1 reply; 3+ messages in thread
From: Konstantin Osipov @ 2019-06-19 18:23 UTC (permalink / raw)
To: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/06/16 16:11]:
Basically the patch is LGTM but I would not use a mempool
vy_log_tx but more strongly a a region for vy_log records - these
are rare events (less than a handful a second per instance) and
using a malloc for such is entirely justified.
On the other hand I don't see any harm from using these (memory is
not pinned for along period of time, for example), so please feel
free to not bother with the comment.
> +vy_log_flusher_func(va_list va)
The convention is that we simply write _f for fiber functions at the
end.
I also did not find the place where you cancel or kill the fiber -
you assume it runs forever - please document it, since otherwise
the infinite fiber loop looks confusing.
--
Konstantin Osipov, Moscow, Russia
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [tarantool-patches] Re: [PATCH] vinyl: don't yield in on_commit and on_rollback triggers
2019-06-19 18:23 ` [tarantool-patches] " Konstantin Osipov
@ 2019-06-20 11:18 ` Vladimir Davydov
0 siblings, 0 replies; 3+ messages in thread
From: Vladimir Davydov @ 2019-06-20 11:18 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Wed, Jun 19, 2019 at 09:23:29PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [19/06/16 16:11]:
>
> Basically the patch is LGTM but I would not use a mempool
> vy_log_tx but more strongly a a region for vy_log records - these
> are rare events (less than a handful a second per instance) and
> using a malloc for such is entirely justified.
>
> On the other hand I don't see any harm from using these (memory is
> not pinned for along period of time, for example), so please feel
> free to not bother with the comment.
>
> > +vy_log_flusher_func(va_list va)
>
> The convention is that we simply write _f for fiber functions at the
> end.
>
> I also did not find the place where you cancel or kill the fiber -
> you assume it runs forever - please document it, since otherwise
> the infinite fiber loop looks confusing.
Pushed to master with the following changes:
diff --git a/src/box/vy_log.c b/src/box/vy_log.c
index ae1d6234..098a0141 100644
--- a/src/box/vy_log.c
+++ b/src/box/vy_log.c
@@ -159,7 +159,13 @@ struct vy_log {
* log rotation.
*/
struct latch latch;
- /** Background fiber flushing pending transactions. */
+ /**
+ * Background fiber flushing pending transactions.
+ * Lives throughout the vinyl engine lifetime. Note,
+ * we don't stop it in destructor, because the event
+ * loop is dead at that time so we can't properly
+ * join it.
+ */
struct fiber *flusher;
/** Condition variable used for signalling the flusher. */
struct fiber_cond flusher_cond;
@@ -196,7 +202,7 @@ struct vy_log {
static struct vy_log vy_log;
static int
-vy_log_flusher_func(va_list va);
+vy_log_flusher_f(va_list va);
static struct vy_recovery *
vy_recovery_new_locked(int64_t signature, int flags);
@@ -761,7 +767,7 @@ vy_log_init(const char *dir)
wal_init_vy_log();
fiber_cond_create(&vy_log.flusher_cond);
vy_log.flusher = fiber_new("vinyl.vylog_flusher",
- vy_log_flusher_func);
+ vy_log_flusher_f);
if (vy_log.flusher == NULL)
panic("failed to allocate vylog flusher fiber");
fiber_wakeup(vy_log.flusher);
@@ -887,7 +893,7 @@ vy_log_flush(void)
}
static int
-vy_log_flusher_func(va_list va)
+vy_log_flusher_f(va_list va)
{
(void)va;
while (!fiber_is_cancelled()) {
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2019-06-20 11:18 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-06-15 18:00 [PATCH] vinyl: don't yield in on_commit and on_rollback triggers Vladimir Davydov
2019-06-19 18:23 ` [tarantool-patches] " Konstantin Osipov
2019-06-20 11:18 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox