* [Tarantool-patches] [PATCH 1/3] box: make txn reference the limbo entry
2021-05-20 9:02 [Tarantool-patches] [PATCH 0/3] fix assertion failure in box.ctl.promote() Serge Petrenko via Tarantool-patches
@ 2021-05-20 9:02 ` Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 2/3] txn_limbo: move lsn assignment to journal completion callback Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 3/3] box: fix an assertion failure in box.ctl.promote() Serge Petrenko via Tarantool-patches
2 siblings, 0 replies; 6+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-05-20 9:02 UTC (permalink / raw)
To: v.shpilevoy, gorcunov; +Cc: tarantool-patches
When a transaction is synchronous it has the same lifespan as the
corresponding txn limbo entry. Moreover, txn limbo entry is already
used (and will be used more extensively) in some triggers where
struct txn is passed by default.
Avoid passing the limbo entry to triggers explicitly by referencing it
in struct txn.
Prerequisite #6032
---
src/box/txn.c | 25 +++++++++++--------------
src/box/txn.h | 2 ++
2 files changed, 13 insertions(+), 14 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 1d42c9113..1fa3e4367 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -765,12 +765,12 @@ static int
txn_limbo_on_rollback(struct trigger *trig, void *event)
{
(void) event;
+ (void) trig;
struct txn *txn = (struct txn *) event;
/* Check whether limbo has performed the cleanup. */
if (txn->signature != TXN_SIGNATURE_ROLLBACK)
return 0;
- struct txn_limbo_entry *entry = (struct txn_limbo_entry *) trig->data;
- txn_limbo_abort(&txn_limbo, entry);
+ txn_limbo_abort(&txn_limbo, txn->limbo_entry);
return 0;
}
@@ -801,7 +801,6 @@ txn_commit_try_async(struct txn *txn)
goto rollback;
bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC);
- struct txn_limbo_entry *limbo_entry;
if (is_sync) {
/*
* We'll need this trigger for sync transactions later,
@@ -819,8 +818,8 @@ txn_commit_try_async(struct txn *txn)
/* See txn_commit(). */
uint32_t origin_id = req->rows[0]->replica_id;
- limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
- if (limbo_entry == NULL)
+ txn->limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+ if (txn->limbo_entry == NULL)
goto rollback;
if (txn_has_flag(txn, TXN_WAIT_ACK)) {
@@ -832,15 +831,14 @@ txn_commit_try_async(struct txn *txn)
* assignment to let the limbo rule this
* out.
*/
- txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn);
+ txn_limbo_assign_lsn(&txn_limbo, txn->limbo_entry, lsn);
}
/*
* Set a trigger to abort waiting for confirm on
* WAL write failure.
*/
- trigger_create(trig, txn_limbo_on_rollback,
- limbo_entry, NULL);
+ trigger_create(trig, txn_limbo_on_rollback, NULL, NULL);
txn_on_rollback(txn, trig);
}
@@ -864,7 +862,6 @@ int
txn_commit(struct txn *txn)
{
struct journal_entry *req;
- struct txn_limbo_entry *limbo_entry = NULL;
txn->fiber = fiber();
@@ -893,15 +890,15 @@ txn_commit(struct txn *txn)
* After WAL write nothing should fail, even OOM
* wouldn't be acceptable.
*/
- limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
- if (limbo_entry == NULL)
+ txn->limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn);
+ if (txn->limbo_entry == NULL)
goto rollback;
}
fiber_set_txn(fiber(), NULL);
if (journal_write(req) != 0 || req->res < 0) {
if (is_sync)
- txn_limbo_abort(&txn_limbo, limbo_entry);
+ txn_limbo_abort(&txn_limbo, txn->limbo_entry);
diag_set(ClientError, ER_WAL_IO);
diag_log();
goto rollback;
@@ -914,12 +911,12 @@ txn_commit(struct txn *txn)
* blocking commit is used by local
* transactions only.
*/
- txn_limbo_assign_local_lsn(&txn_limbo, limbo_entry,
+ txn_limbo_assign_local_lsn(&txn_limbo, txn->limbo_entry,
lsn);
/* Local WAL write is a first 'ACK'. */
txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn);
}
- if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0)
+ if (txn_limbo_wait_complete(&txn_limbo, txn->limbo_entry) < 0)
goto rollback;
}
assert(txn_has_flag(txn, TXN_IS_DONE));
diff --git a/src/box/txn.h b/src/box/txn.h
index a06aaea23..6b16372f4 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -349,6 +349,8 @@ struct txn {
void *engine_tx;
/* A fiber to wake up when transaction is finished. */
struct fiber *fiber;
+ /** A limbo entry associated with this transaction, if any. */
+ struct txn_limbo_entry *limbo_entry;
/** Timestampt of entry write start. */
double start_tm;
/**
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [Tarantool-patches] [PATCH 2/3] txn_limbo: move lsn assignment to journal completion callback
2021-05-20 9:02 [Tarantool-patches] [PATCH 0/3] fix assertion failure in box.ctl.promote() Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 1/3] box: make txn reference the limbo entry Serge Petrenko via Tarantool-patches
@ 2021-05-20 9:02 ` Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 3/3] box: fix an assertion failure in box.ctl.promote() Serge Petrenko via Tarantool-patches
2 siblings, 0 replies; 6+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-05-20 9:02 UTC (permalink / raw)
To: v.shpilevoy, gorcunov; +Cc: tarantool-patches
Previously local lsn assignment for txn_limbo entries was done right in
txn_commit() body after the WAL write.
In order to make the entries lsn available to on_wal_write triggers, which are
called from the journal completion callback, assign it right in the
callback before the triggers are called.
Note, ACKing the entry is not moved to journal completion callback, since
it may trigger writing CONFIRM, which cannot be done from the sched fiber
(which executes the journal callback).
Prerequisite #6032
---
src/box/txn.c | 17 ++++++++++-------
1 file changed, 10 insertions(+), 7 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 1fa3e4367..dea824d78 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -545,6 +545,16 @@ txn_on_journal_write(struct journal_entry *entry)
txn_complete_fail(txn);
goto finish;
}
+
+ if (txn_has_flag(txn, TXN_WAIT_ACK) && txn->limbo_entry->lsn == -1) {
+ int64_t lsn = entry->rows[entry->n_rows - 1]->lsn;
+ /*
+ * Must be a local entry since its lsn wasn't known prior to
+ * the WAL write.
+ */
+ txn_limbo_assign_local_lsn(&txn_limbo, txn->limbo_entry, lsn);
+ }
+
double stop_tm = ev_monotonic_now(loop());
double delta = stop_tm - txn->start_tm;
if (delta > too_long_threshold) {
@@ -906,13 +916,6 @@ txn_commit(struct txn *txn)
if (is_sync) {
if (txn_has_flag(txn, TXN_WAIT_ACK)) {
int64_t lsn = req->rows[req->n_rows - 1]->lsn;
- /*
- * Use local LSN assignment. Because
- * blocking commit is used by local
- * transactions only.
- */
- txn_limbo_assign_local_lsn(&txn_limbo, txn->limbo_entry,
- lsn);
/* Local WAL write is a first 'ACK'. */
txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn);
}
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 6+ messages in thread
* [Tarantool-patches] [PATCH 3/3] box: fix an assertion failure in box.ctl.promote()
2021-05-20 9:02 [Tarantool-patches] [PATCH 0/3] fix assertion failure in box.ctl.promote() Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 1/3] box: make txn reference the limbo entry Serge Petrenko via Tarantool-patches
2021-05-20 9:02 ` [Tarantool-patches] [PATCH 2/3] txn_limbo: move lsn assignment to journal completion callback Serge Petrenko via Tarantool-patches
@ 2021-05-20 9:02 ` Serge Petrenko via Tarantool-patches
2021-05-23 12:18 ` Vladislav Shpilevoy via Tarantool-patches
2 siblings, 1 reply; 6+ messages in thread
From: Serge Petrenko via Tarantool-patches @ 2021-05-20 9:02 UTC (permalink / raw)
To: v.shpilevoy, gorcunov; +Cc: tarantool-patches
box.ctl.promote() used to assume that the last synchronous entry is
already written to WAL by the time it's called. This is not the case
when promote is executed on the limbo owner. The last synchronous entry
might still be en route to WAL.
In order to fix the issue, introduce a new method:
txn_limbo_wait_lsn_assigned(), which waits until the last synchronous
entry receives its lsn. After this happens, it's safe to proceed to
gathering quorum in promote.
Closes #6032
---
src/box/box.cc | 26 ++---
src/box/txn_limbo.c | 42 +++++++
src/box/txn_limbo.h | 8 ++
.../gh-6032-promote-wal-write.result | 108 ++++++++++++++++++
.../gh-6032-promote-wal-write.test.lua | 41 +++++++
test/replication/suite.cfg | 1 +
6 files changed, 213 insertions(+), 13 deletions(-)
create mode 100644 test/replication/gh-6032-promote-wal-write.result
create mode 100644 test/replication/gh-6032-promote-wal-write.test.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index c10e0d8bf..1b1e7eec0 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1442,17 +1442,22 @@ box_quorum_on_ack_f(struct trigger *trigger, void *event)
}
/**
- * Wait until at least @a quorum of nodes confirm @a target_lsn from the node
- * with id @a lead_id.
+ * Wait until at least @a quorum of nodes confirm the last available synchronous
+ * entry from the node with id @a lead_id.
*/
static int
-box_wait_quorum(uint32_t lead_id, int64_t target_lsn, int quorum,
+box_wait_quorum(uint32_t lead_id, struct txn_limbo_entry **entry, int quorum,
double timeout)
{
struct box_quorum_trigger t;
memset(&t, 0, sizeof(t));
vclock_create(&t.vclock);
+ *entry = txn_limbo_wait_lsn_assigned(&txn_limbo);
+ if (*entry == NULL)
+ return -1;
+ int64_t target_lsn = (*entry)->lsn;
+
/* Take this node into account immediately. */
int ack_count = vclock_get(box_vclock, lead_id) >= target_lsn;
replicaset_foreach(replica) {
@@ -1622,22 +1627,17 @@ box_promote(void)
}
}
- /*
- * promote() is a no-op on the limbo owner, so all the rows
- * in the limbo must've come through the applier meaning they already
- * have an lsn assigned, even if their WAL write hasn't finished yet.
- */
- wait_lsn = txn_limbo_last_synchro_entry(&txn_limbo)->lsn;
- assert(wait_lsn > 0);
-
- rc = box_wait_quorum(former_leader_id, wait_lsn, quorum,
+ struct txn_limbo_entry *last_entry;
+ rc = box_wait_quorum(former_leader_id,&last_entry, quorum,
replication_synchro_timeout);
if (rc == 0) {
+ wait_lsn = last_entry->lsn;
if (quorum < replication_synchro_quorum) {
diag_set(ClientError, ER_QUORUM_WAIT, quorum,
"quorum was increased while waiting");
rc = -1;
- } else if (wait_lsn < txn_limbo_last_synchro_entry(&txn_limbo)->lsn) {
+ } else if (last_entry !=
+ txn_limbo_last_synchro_entry(&txn_limbo)) {
diag_set(ClientError, ER_QUORUM_WAIT, quorum,
"new synchronous transactions appeared");
rc = -1;
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index f287369a2..406f2de89 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -69,6 +69,48 @@ txn_limbo_last_synchro_entry(struct txn_limbo *limbo)
return NULL;
}
+static int
+txn_limbo_wait_lsn_assigned_f(struct trigger *trig, void *event)
+{
+ (void)event;
+ struct fiber *fiber = trig->data;
+ fiber_wakeup(fiber);
+ return 0;
+}
+
+struct txn_limbo_entry *
+txn_limbo_wait_lsn_assigned(struct txn_limbo *limbo)
+{
+ assert(!txn_limbo_is_empty(limbo));
+ struct txn_limbo_entry *entry = txn_limbo_last_synchro_entry(limbo);
+ if (entry->lsn >= 0)
+ return entry;
+
+ struct trigger write_trigger, rollback_trigger;
+ trigger_create(&write_trigger, txn_limbo_wait_lsn_assigned_f, fiber(),
+ NULL);
+ trigger_create(&rollback_trigger, txn_limbo_wait_lsn_assigned_f,
+ fiber(), NULL);
+ txn_on_wal_write(entry->txn, &write_trigger);
+ txn_on_rollback(entry->txn, &rollback_trigger);
+ do {
+ fiber_yield();
+ if (fiber_is_cancelled()) {
+ diag_set(FiberIsCancelled);
+ entry = NULL;
+ break;
+ }
+ if (entry->txn->signature < 0) {
+ diag_set(ClientError, ER_SYNC_ROLLBACK);
+ entry = NULL;
+ break;
+ }
+ } while (entry->lsn == -1);
+ trigger_clear(&write_trigger);
+ trigger_clear(&rollback_trigger);
+ return entry;
+}
+
struct txn_limbo_entry *
txn_limbo_append(struct txn_limbo *limbo, uint32_t id, struct txn *txn)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index e409ac657..24aa68114 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -240,6 +240,14 @@ txn_limbo_is_replica_outdated(const struct txn_limbo *limbo,
struct txn_limbo_entry *
txn_limbo_last_synchro_entry(struct txn_limbo *limbo);
+/**
+ * Wait until the last synchronous transaction in the limbo gets its LSN
+ * (i.e. is written to WAL). Return the entry or NULL in case of error.
+ */
+struct txn_limbo_entry *
+txn_limbo_wait_lsn_assigned(struct txn_limbo *limbo);
+
+
/**
* Allocate, create, and append a new transaction to the limbo.
* The limbo entry is allocated on the transaction's region.
diff --git a/test/replication/gh-6032-promote-wal-write.result b/test/replication/gh-6032-promote-wal-write.result
new file mode 100644
index 000000000..461d6416e
--- /dev/null
+++ b/test/replication/gh-6032-promote-wal-write.result
@@ -0,0 +1,108 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+
+replication_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+box.cfg{\
+ replication_synchro_timeout = 0.001,\
+}
+ | ---
+ | ...
+
+_ = box.schema.create_space('sync', {is_sync = true}):create_index('pk')
+ | ---
+ | ...
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+ | ---
+ | - ok
+ | ...
+_ = fiber.create(function() box.space.sync:replace{1} end)
+ | ---
+ | ...
+ok, err = nil, nil
+ | ---
+ | ...
+
+-- Test that the fiber actually waits for a WAL write to happen.
+f = fiber.create(function() ok, err = pcall(box.ctl.promote) end)
+ | ---
+ | ...
+fiber.sleep(0.1)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+ | ---
+ | - ok
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok
+ | ---
+ | - true
+ | ...
+err
+ | ---
+ | - null
+ | ...
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+ | ---
+ | - ok
+ | ...
+_ = fiber.create(function() box.space.sync:replace{2} end)
+ | ---
+ | ...
+
+-- Test that the fiber is cancellable.
+f = fiber.create(function() ok, err = pcall(box.ctl.promote) end)
+ | ---
+ | ...
+fiber.sleep(0.1)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+f:cancel()
+ | ---
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok
+ | ---
+ | - false
+ | ...
+err
+ | ---
+ | - fiber is cancelled
+ | ...
+
+-- Cleanup.
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+ | ---
+ | - ok
+ | ...
+box.cfg{\
+ replication_synchro_timeout = replication_synchro_timeout,\
+}
+ | ---
+ | ...
+box.space.sync:drop()
+ | ---
+ | ...
diff --git a/test/replication/gh-6032-promote-wal-write.test.lua b/test/replication/gh-6032-promote-wal-write.test.lua
new file mode 100644
index 000000000..7725330f5
--- /dev/null
+++ b/test/replication/gh-6032-promote-wal-write.test.lua
@@ -0,0 +1,41 @@
+test_run = require('test_run').new()
+fiber = require('fiber')
+
+replication_synchro_timeout = box.cfg.replication_synchro_timeout
+box.cfg{\
+ replication_synchro_timeout = 0.001,\
+}
+
+_ = box.schema.create_space('sync', {is_sync = true}):create_index('pk')
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() box.space.sync:replace{1} end)
+ok, err = nil, nil
+
+-- Test that the fiber actually waits for a WAL write to happen.
+f = fiber.create(function() ok, err = pcall(box.ctl.promote) end)
+fiber.sleep(0.1)
+f:status()
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok
+err
+
+box.error.injection.set('ERRINJ_WAL_DELAY', true)
+_ = fiber.create(function() box.space.sync:replace{2} end)
+
+-- Test that the fiber is cancellable.
+f = fiber.create(function() ok, err = pcall(box.ctl.promote) end)
+fiber.sleep(0.1)
+f:status()
+f:cancel()
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok
+err
+
+-- Cleanup.
+box.error.injection.set('ERRINJ_WAL_DELAY', false)
+box.cfg{\
+ replication_synchro_timeout = replication_synchro_timeout,\
+}
+box.space.sync:drop()
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index dc39e2f74..dfe4be9ae 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -45,6 +45,7 @@
"gh-5435-qsync-clear-synchro-queue-commit-all.test.lua": {},
"gh-5536-wal-limit.test.lua": {},
"gh-5566-final-join-synchro.test.lua": {},
+ "gh-6032-promote-wal-write.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.30.1 (Apple Git-130)
^ permalink raw reply [flat|nested] 6+ messages in thread