[Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Jul 7 01:43:16 MSK 2020
Synchronous replication options - replication_synchro_quorum and
replication_synchro_timeout - were not updated for the existing
transactions on change. As a result, there could be weird
inconsistencies, when a new transaction could have required quorum
smaller than a previous transaction's, and could implicitly
confirm it. The same could be told about rollback on timeout - new
transactions could wake up earlier than older transactions.
This patch makes configuration dynamic. So if the mentioned
options are updated, they are applied to the existing transactions
too.
It opens wide administrative capabilities. For example, when
replica count becomes less than the quorum, an administrator can
lower the quorum dynamically, and it will be applied to all the
existing transactions.
Closes #5119
---
Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication
Issue 1: https://github.com/tarantool/tarantool/issues/4842
Issue 2: https://github.com/tarantool/tarantool/issues/5119
src/box/box.cc | 2 +
src/box/txn_limbo.c | 162 +++++++++++++++++---------
src/box/txn_limbo.h | 10 ++
test/replication/qsync_basic.result | 90 ++++++++++++++
test/replication/qsync_basic.test.lua | 32 +++++
5 files changed, 242 insertions(+), 54 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 5e28276f0..e15ae0e44 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void)
if (value < 0)
return -1;
replication_synchro_quorum = value;
+ txn_limbo_on_parameters_change(&txn_limbo);
return 0;
}
@@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void)
if (value < 0)
return -1;
replication_synchro_timeout = value;
+ txn_limbo_on_parameters_change(&txn_limbo);
return 0;
}
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index e28e2016f..2575f4c25 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo)
{
rlist_create(&limbo->queue);
limbo->instance_id = REPLICA_ID_NIL;
+ fiber_cond_create(&limbo->wait_cond);
vclock_create(&limbo->vclock);
limbo->rollback_count = 0;
}
@@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
assert(!txn_has_flag(txn, TXN_IS_DONE));
assert(txn_has_flag(txn, TXN_WAIT_SYNC));
- bool cancellable = fiber_set_cancellable(false);
- bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
- fiber_set_cancellable(cancellable);
- if (timed_out) {
- assert(!txn_limbo_is_empty(limbo));
- if (txn_limbo_first_entry(limbo) != entry) {
- /*
- * If this is not a first entry in the
- * limbo, it is definitely not a first
- * timed out entry. And since it managed
- * to time out too, it means there is
- * currently another fiber writing
- * rollback. Wait when it will finish and
- * wake us up.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield();
- fiber_set_cancellable(cancellable);
- assert(txn_limbo_entry_is_complete(entry));
+ double start_time = fiber_clock();
+ while (true) {
+ double deadline = start_time + txn_limbo_confirm_timeout(limbo);
+ bool cancellable = fiber_set_cancellable(false);
+ double timeout = deadline - fiber_clock();
+ bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
+ timeout);
+ fiber_set_cancellable(cancellable);
+ if (txn_limbo_entry_is_complete(entry))
goto complete;
- }
+ if (timed_out)
+ goto do_rollback;
+ }
- txn_limbo_write_rollback(limbo, entry->lsn);
- struct txn_limbo_entry *e, *tmp;
- rlist_foreach_entry_safe_reverse(e, &limbo->queue,
- in_queue, tmp) {
- e->is_rollback = true;
- e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
- txn_limbo_pop(limbo, e);
- txn_clear_flag(e->txn, TXN_WAIT_SYNC);
- txn_clear_flag(e->txn, TXN_WAIT_ACK);
- txn_complete(e->txn);
- if (e == entry)
- break;
- fiber_wakeup(e->txn->fiber);
- }
- diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
- return -1;
+do_rollback:
+ assert(!txn_limbo_is_empty(limbo));
+ if (txn_limbo_first_entry(limbo) != entry) {
+ /*
+ * If this is not a first entry in the limbo, it
+ * is definitely not a first timed out entry. And
+ * since it managed to time out too, it means
+ * there is currently another fiber writing
+ * rollback. Wait when it will finish and wake us
+ * up.
+ */
+ bool cancellable = fiber_set_cancellable(false);
+ do {
+ fiber_yield();
+ } while (!txn_limbo_entry_is_complete(entry));
+ fiber_set_cancellable(cancellable);
+ goto complete;
}
+
+ txn_limbo_write_rollback(limbo, entry->lsn);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+ in_queue, tmp) {
+ e->is_rollback = true;
+ e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_SYNC);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ txn_complete(e->txn);
+ if (e == entry)
+ break;
+ fiber_wakeup(e->txn->fiber);
+ }
+ diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+ return -1;
+
complete:
assert(txn_limbo_entry_is_complete(entry));
/*
@@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo)
* or array instead of the boolean.
*/
struct confirm_waitpoint {
- /**
- * Variable for wake up the fiber that is waiting for
- * the end of confirmation.
- */
- struct fiber_cond confirm_cond;
+ /** Fiber that is waiting for the end of confirmation. */
+ struct fiber *caller;
/**
* Result flag.
*/
bool is_confirm;
+ bool is_rollback;
};
static int
@@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
struct confirm_waitpoint *cwp =
(struct confirm_waitpoint *)trigger->data;
cwp->is_confirm = true;
- fiber_cond_signal(&cwp->confirm_cond);
+ fiber_wakeup(cwp->caller);
return 0;
}
@@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event)
(void)event;
struct confirm_waitpoint *cwp =
(struct confirm_waitpoint *)trigger->data;
- fiber_cond_signal(&cwp->confirm_cond);
+ cwp->is_rollback = true;
+ fiber_wakeup(cwp->caller);
return 0;
}
@@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
/* initialization of a waitpoint. */
struct confirm_waitpoint cwp;
- fiber_cond_create(&cwp.confirm_cond);
+ cwp.caller = fiber();
cwp.is_confirm = false;
+ cwp.is_rollback = false;
/* Set triggers for the last limbo transaction. */
struct trigger on_complete;
@@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
txn_on_commit(tle->txn, &on_complete);
txn_on_rollback(tle->txn, &on_rollback);
-
- int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
- txn_limbo_confirm_timeout(limbo));
- fiber_cond_destroy(&cwp.confirm_cond);
- if (rc != 0) {
- /* Clear the triggers if the timeout has been reached. */
- trigger_clear(&on_complete);
- trigger_clear(&on_rollback);
- diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
- return -1;
+ double start_time = fiber_clock();
+ while (true) {
+ double deadline = start_time + txn_limbo_confirm_timeout(limbo);
+ bool cancellable = fiber_set_cancellable(false);
+ double timeout = deadline - fiber_clock();
+ int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);
+ fiber_set_cancellable(cancellable);
+ if (cwp.is_confirm || cwp.is_rollback)
+ goto complete;
+ if (rc != 0)
+ goto timed_out;
}
+timed_out:
+ /* Clear the triggers if the timeout has been reached. */
+ trigger_clear(&on_complete);
+ trigger_clear(&on_rollback);
+ diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+ return -1;
+
+complete:
if (!cwp.is_confirm) {
/* The transaction has been rolled back. */
diag_set(ClientError, ER_SYNC_ROLLBACK);
@@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
}
}
+void
+txn_limbo_on_parameters_change(struct txn_limbo *limbo)
+{
+ if (rlist_empty(&limbo->queue))
+ return;
+ struct txn_limbo_entry *e;
+ int64_t confirm_lsn = -1;
+ rlist_foreach_entry(e, &limbo->queue, in_queue) {
+ assert(e->ack_count <= VCLOCK_MAX);
+ if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+ assert(e->lsn == -1);
+ if (confirm_lsn == -1)
+ continue;
+ } else if (e->ack_count < replication_synchro_quorum) {
+ continue;
+ } else {
+ confirm_lsn = e->lsn;
+ assert(confirm_lsn > 0);
+ }
+ e->is_commit = true;
+ }
+ if (confirm_lsn > 0 &&
+ txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
+ panic("Couldn't write CONFIRM to WAL");
+ return;
+ }
+ /*
+ * Wakeup all. Confirmed will be committed. Timed out will
+ * rollback.
+ */
+ fiber_cond_broadcast(&limbo->wait_cond);
+}
+
void
txn_limbo_init(void)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 0d56d0d69..1ee416231 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -103,6 +103,13 @@ struct txn_limbo {
* LSNs in their vclock components.
*/
uint32_t instance_id;
+ /**
+ * Condition to wait for completion. It is supposed to be
+ * signaled when the synchro parameters change. Allowing
+ * the sleeping fibers to reconsider their timeouts when
+ * the parameters are updated.
+ */
+ struct fiber_cond wait_cond;
/**
* All components of the vclock are versions of the limbo
* owner's LSN, how it is visible on other nodes. For
@@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
void
txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
+void
+txn_limbo_on_parameters_change(struct txn_limbo *limbo);
+
void
txn_limbo_init();
diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
index 3e28607b0..59f5d9123 100644
--- a/test/replication/qsync_basic.result
+++ b/test/replication/qsync_basic.result
@@ -475,6 +475,96 @@ box.space.sync:select{7}
| - - [7]
| ...
+--
+-- gh-5119: dynamic limbo configuration. Updated parameters should
+-- be applied even to existing transactions.
+--
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+ok, err = nil
+ | ---
+ | ...
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
+end)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+box.cfg{replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - false
+ | - Quorum collection for a synchronous transaction is timed out
+ | ...
+box.space.sync:select{11}
+ | ---
+ | - []
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{11}
+ | ---
+ | - []
+ | ...
+
+-- Test it is possible to early ACK a transaction with a new quorum.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+ok, err = nil
+ | ---
+ | ...
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
+end)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+box.cfg{replication_synchro_quorum = 2}
+ | ---
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - true
+ | - [12]
+ | ...
+box.space.sync:select{12}
+ | ---
+ | - - [12]
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{12}
+ | ---
+ | - - [12]
+ | ...
+
-- Cleanup.
test_run:cmd('switch default')
| ---
diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
index 860d6d6c4..1bb3ba87d 100644
--- a/test/replication/qsync_basic.test.lua
+++ b/test/replication/qsync_basic.test.lua
@@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2)
test_run:switch('replica')
box.space.sync:select{7}
+--
+-- gh-5119: dynamic limbo configuration. Updated parameters should
+-- be applied even to existing transactions.
+--
+test_run:switch('default')
+box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
+ok, err = nil
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
+end)
+f:status()
+box.cfg{replication_synchro_timeout = 0.001}
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok, err
+box.space.sync:select{11}
+test_run:switch('replica')
+box.space.sync:select{11}
+
+-- Test it is possible to early ACK a transaction with a new quorum.
+test_run:switch('default')
+ok, err = nil
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
+end)
+f:status()
+box.cfg{replication_synchro_quorum = 2}
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok, err
+box.space.sync:select{12}
+test_run:switch('replica')
+box.space.sync:select{12}
+
-- Cleanup.
test_run:cmd('switch default')
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list