* [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
@ 2020-07-06 22:43 Vladislav Shpilevoy
2020-07-06 22:55 ` Vladislav Shpilevoy
2020-07-09 21:40 ` Leonid Vasiliev
0 siblings, 2 replies; 4+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-06 22:43 UTC (permalink / raw)
To: tarantool-patches
Synchronous replication options - replication_synchro_quorum and
replication_synchro_timeout - were not updated for the existing
transactions on change. As a result, there could be weird
inconsistencies, when a new transaction could have required quorum
smaller than a previous transaction's, and could implicitly
confirm it. The same could be told about rollback on timeout - new
transactions could wake up earlier than older transactions.
This patch makes configuration dynamic. So if the mentioned
options are updated, they are applied to the existing transactions
too.
It opens wide administrative capabilities. For example, when
replica count becomes less than the quorum, an administrator can
lower the quorum dynamically, and it will be applied to all the
existing transactions.
Closes #5119
---
Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication
Issue 1: https://github.com/tarantool/tarantool/issues/4842
Issue 2: https://github.com/tarantool/tarantool/issues/5119
src/box/box.cc | 2 +
src/box/txn_limbo.c | 162 +++++++++++++++++---------
src/box/txn_limbo.h | 10 ++
test/replication/qsync_basic.result | 90 ++++++++++++++
test/replication/qsync_basic.test.lua | 32 +++++
5 files changed, 242 insertions(+), 54 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index 5e28276f0..e15ae0e44 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void)
if (value < 0)
return -1;
replication_synchro_quorum = value;
+ txn_limbo_on_parameters_change(&txn_limbo);
return 0;
}
@@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void)
if (value < 0)
return -1;
replication_synchro_timeout = value;
+ txn_limbo_on_parameters_change(&txn_limbo);
return 0;
}
diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
index e28e2016f..2575f4c25 100644
--- a/src/box/txn_limbo.c
+++ b/src/box/txn_limbo.c
@@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo)
{
rlist_create(&limbo->queue);
limbo->instance_id = REPLICA_ID_NIL;
+ fiber_cond_create(&limbo->wait_cond);
vclock_create(&limbo->vclock);
limbo->rollback_count = 0;
}
@@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
assert(!txn_has_flag(txn, TXN_IS_DONE));
assert(txn_has_flag(txn, TXN_WAIT_SYNC));
- bool cancellable = fiber_set_cancellable(false);
- bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
- fiber_set_cancellable(cancellable);
- if (timed_out) {
- assert(!txn_limbo_is_empty(limbo));
- if (txn_limbo_first_entry(limbo) != entry) {
- /*
- * If this is not a first entry in the
- * limbo, it is definitely not a first
- * timed out entry. And since it managed
- * to time out too, it means there is
- * currently another fiber writing
- * rollback. Wait when it will finish and
- * wake us up.
- */
- bool cancellable = fiber_set_cancellable(false);
- fiber_yield();
- fiber_set_cancellable(cancellable);
- assert(txn_limbo_entry_is_complete(entry));
+ double start_time = fiber_clock();
+ while (true) {
+ double deadline = start_time + txn_limbo_confirm_timeout(limbo);
+ bool cancellable = fiber_set_cancellable(false);
+ double timeout = deadline - fiber_clock();
+ bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
+ timeout);
+ fiber_set_cancellable(cancellable);
+ if (txn_limbo_entry_is_complete(entry))
goto complete;
- }
+ if (timed_out)
+ goto do_rollback;
+ }
- txn_limbo_write_rollback(limbo, entry->lsn);
- struct txn_limbo_entry *e, *tmp;
- rlist_foreach_entry_safe_reverse(e, &limbo->queue,
- in_queue, tmp) {
- e->is_rollback = true;
- e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
- txn_limbo_pop(limbo, e);
- txn_clear_flag(e->txn, TXN_WAIT_SYNC);
- txn_clear_flag(e->txn, TXN_WAIT_ACK);
- txn_complete(e->txn);
- if (e == entry)
- break;
- fiber_wakeup(e->txn->fiber);
- }
- diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
- return -1;
+do_rollback:
+ assert(!txn_limbo_is_empty(limbo));
+ if (txn_limbo_first_entry(limbo) != entry) {
+ /*
+ * If this is not a first entry in the limbo, it
+ * is definitely not a first timed out entry. And
+ * since it managed to time out too, it means
+ * there is currently another fiber writing
+ * rollback. Wait when it will finish and wake us
+ * up.
+ */
+ bool cancellable = fiber_set_cancellable(false);
+ do {
+ fiber_yield();
+ } while (!txn_limbo_entry_is_complete(entry));
+ fiber_set_cancellable(cancellable);
+ goto complete;
}
+
+ txn_limbo_write_rollback(limbo, entry->lsn);
+ struct txn_limbo_entry *e, *tmp;
+ rlist_foreach_entry_safe_reverse(e, &limbo->queue,
+ in_queue, tmp) {
+ e->is_rollback = true;
+ e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
+ txn_limbo_pop(limbo, e);
+ txn_clear_flag(e->txn, TXN_WAIT_SYNC);
+ txn_clear_flag(e->txn, TXN_WAIT_ACK);
+ txn_complete(e->txn);
+ if (e == entry)
+ break;
+ fiber_wakeup(e->txn->fiber);
+ }
+ diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+ return -1;
+
complete:
assert(txn_limbo_entry_is_complete(entry));
/*
@@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo)
* or array instead of the boolean.
*/
struct confirm_waitpoint {
- /**
- * Variable for wake up the fiber that is waiting for
- * the end of confirmation.
- */
- struct fiber_cond confirm_cond;
+ /** Fiber that is waiting for the end of confirmation. */
+ struct fiber *caller;
/**
* Result flag.
*/
bool is_confirm;
+ bool is_rollback;
};
static int
@@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
struct confirm_waitpoint *cwp =
(struct confirm_waitpoint *)trigger->data;
cwp->is_confirm = true;
- fiber_cond_signal(&cwp->confirm_cond);
+ fiber_wakeup(cwp->caller);
return 0;
}
@@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event)
(void)event;
struct confirm_waitpoint *cwp =
(struct confirm_waitpoint *)trigger->data;
- fiber_cond_signal(&cwp->confirm_cond);
+ cwp->is_rollback = true;
+ fiber_wakeup(cwp->caller);
return 0;
}
@@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
/* initialization of a waitpoint. */
struct confirm_waitpoint cwp;
- fiber_cond_create(&cwp.confirm_cond);
+ cwp.caller = fiber();
cwp.is_confirm = false;
+ cwp.is_rollback = false;
/* Set triggers for the last limbo transaction. */
struct trigger on_complete;
@@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
txn_on_commit(tle->txn, &on_complete);
txn_on_rollback(tle->txn, &on_rollback);
-
- int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
- txn_limbo_confirm_timeout(limbo));
- fiber_cond_destroy(&cwp.confirm_cond);
- if (rc != 0) {
- /* Clear the triggers if the timeout has been reached. */
- trigger_clear(&on_complete);
- trigger_clear(&on_rollback);
- diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
- return -1;
+ double start_time = fiber_clock();
+ while (true) {
+ double deadline = start_time + txn_limbo_confirm_timeout(limbo);
+ bool cancellable = fiber_set_cancellable(false);
+ double timeout = deadline - fiber_clock();
+ int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);
+ fiber_set_cancellable(cancellable);
+ if (cwp.is_confirm || cwp.is_rollback)
+ goto complete;
+ if (rc != 0)
+ goto timed_out;
}
+timed_out:
+ /* Clear the triggers if the timeout has been reached. */
+ trigger_clear(&on_complete);
+ trigger_clear(&on_rollback);
+ diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
+ return -1;
+
+complete:
if (!cwp.is_confirm) {
/* The transaction has been rolled back. */
diag_set(ClientError, ER_SYNC_ROLLBACK);
@@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
}
}
+void
+txn_limbo_on_parameters_change(struct txn_limbo *limbo)
+{
+ if (rlist_empty(&limbo->queue))
+ return;
+ struct txn_limbo_entry *e;
+ int64_t confirm_lsn = -1;
+ rlist_foreach_entry(e, &limbo->queue, in_queue) {
+ assert(e->ack_count <= VCLOCK_MAX);
+ if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
+ assert(e->lsn == -1);
+ if (confirm_lsn == -1)
+ continue;
+ } else if (e->ack_count < replication_synchro_quorum) {
+ continue;
+ } else {
+ confirm_lsn = e->lsn;
+ assert(confirm_lsn > 0);
+ }
+ e->is_commit = true;
+ }
+ if (confirm_lsn > 0 &&
+ txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
+ panic("Couldn't write CONFIRM to WAL");
+ return;
+ }
+ /*
+ * Wakeup all. Confirmed will be committed. Timed out will
+ * rollback.
+ */
+ fiber_cond_broadcast(&limbo->wait_cond);
+}
+
void
txn_limbo_init(void)
{
diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
index 0d56d0d69..1ee416231 100644
--- a/src/box/txn_limbo.h
+++ b/src/box/txn_limbo.h
@@ -103,6 +103,13 @@ struct txn_limbo {
* LSNs in their vclock components.
*/
uint32_t instance_id;
+ /**
+ * Condition to wait for completion. It is supposed to be
+ * signaled when the synchro parameters change. Allowing
+ * the sleeping fibers to reconsider their timeouts when
+ * the parameters are updated.
+ */
+ struct fiber_cond wait_cond;
/**
* All components of the vclock are versions of the limbo
* owner's LSN, how it is visible on other nodes. For
@@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
void
txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
+void
+txn_limbo_on_parameters_change(struct txn_limbo *limbo);
+
void
txn_limbo_init();
diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
index 3e28607b0..59f5d9123 100644
--- a/test/replication/qsync_basic.result
+++ b/test/replication/qsync_basic.result
@@ -475,6 +475,96 @@ box.space.sync:select{7}
| - - [7]
| ...
+--
+-- gh-5119: dynamic limbo configuration. Updated parameters should
+-- be applied even to existing transactions.
+--
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+ok, err = nil
+ | ---
+ | ...
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
+end)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+box.cfg{replication_synchro_timeout = 0.001}
+ | ---
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - false
+ | - Quorum collection for a synchronous transaction is timed out
+ | ...
+box.space.sync:select{11}
+ | ---
+ | - []
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{11}
+ | ---
+ | - []
+ | ...
+
+-- Test it is possible to early ACK a transaction with a new quorum.
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+ok, err = nil
+ | ---
+ | ...
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
+end)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+box.cfg{replication_synchro_quorum = 2}
+ | ---
+ | ...
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ | ---
+ | - true
+ | ...
+ok, err
+ | ---
+ | - true
+ | - [12]
+ | ...
+box.space.sync:select{12}
+ | ---
+ | - - [12]
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{12}
+ | ---
+ | - - [12]
+ | ...
+
-- Cleanup.
test_run:cmd('switch default')
| ---
diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
index 860d6d6c4..1bb3ba87d 100644
--- a/test/replication/qsync_basic.test.lua
+++ b/test/replication/qsync_basic.test.lua
@@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2)
test_run:switch('replica')
box.space.sync:select{7}
+--
+-- gh-5119: dynamic limbo configuration. Updated parameters should
+-- be applied even to existing transactions.
+--
+test_run:switch('default')
+box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
+ok, err = nil
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
+end)
+f:status()
+box.cfg{replication_synchro_timeout = 0.001}
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok, err
+box.space.sync:select{11}
+test_run:switch('replica')
+box.space.sync:select{11}
+
+-- Test it is possible to early ACK a transaction with a new quorum.
+test_run:switch('default')
+ok, err = nil
+f = fiber.create(function() \
+ ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
+end)
+f:status()
+box.cfg{replication_synchro_quorum = 2}
+test_run:wait_cond(function() return f:status() == 'dead' end)
+ok, err
+box.space.sync:select{12}
+test_run:switch('replica')
+box.space.sync:select{12}
+
-- Cleanup.
test_run:cmd('switch default')
--
2.21.1 (Apple Git-122.3)
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
2020-07-06 22:43 [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config Vladislav Shpilevoy
@ 2020-07-06 22:55 ` Vladislav Shpilevoy
2020-07-09 21:40 ` Leonid Vasiliev
1 sibling, 0 replies; 4+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-06 22:55 UTC (permalink / raw)
To: tarantool-patches
I force pushed deletion of replication/gh-5119-sync_replication.test.lua.
It passed, but was not strictly related to the actual bug. An alternative
test is inside qsync_basic.test.lua.
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
2020-07-06 22:43 [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config Vladislav Shpilevoy
2020-07-06 22:55 ` Vladislav Shpilevoy
@ 2020-07-09 21:40 ` Leonid Vasiliev
2020-07-10 0:23 ` Vladislav Shpilevoy
1 sibling, 1 reply; 4+ messages in thread
From: Leonid Vasiliev @ 2020-07-09 21:40 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
Hi! Thank you for the patch.
See some questions below.
On 07.07.2020 01:43, Vladislav Shpilevoy wrote:
> Synchronous replication options - replication_synchro_quorum and
> replication_synchro_timeout - were not updated for the existing
> transactions on change. As a result, there could be weird
> inconsistencies, when a new transaction could have required quorum
> smaller than a previous transaction's, and could implicitly
> confirm it. The same could be told about rollback on timeout - new
> transactions could wake up earlier than older transactions.
>
> This patch makes configuration dynamic. So if the mentioned
> options are updated, they are applied to the existing transactions
> too.
>
> It opens wide administrative capabilities. For example, when
> replica count becomes less than the quorum, an administrator can
> lower the quorum dynamically, and it will be applied to all the
> existing transactions.
>
> Closes #5119
> ---
> Branch: http://github.com/tarantool/tarantool/tree/gh-4842-sync-replication
> Issue 1: https://github.com/tarantool/tarantool/issues/4842
> Issue 2: https://github.com/tarantool/tarantool/issues/5119
>
> src/box/box.cc | 2 +
> src/box/txn_limbo.c | 162 +++++++++++++++++---------
> src/box/txn_limbo.h | 10 ++
> test/replication/qsync_basic.result | 90 ++++++++++++++
> test/replication/qsync_basic.test.lua | 32 +++++
> 5 files changed, 242 insertions(+), 54 deletions(-)
>
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 5e28276f0..e15ae0e44 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -865,6 +865,7 @@ box_set_replication_synchro_quorum(void)
> if (value < 0)
> return -1;
> replication_synchro_quorum = value;
> + txn_limbo_on_parameters_change(&txn_limbo);
> return 0;
> }
>
> @@ -875,6 +876,7 @@ box_set_replication_synchro_timeout(void)
> if (value < 0)
> return -1;
> replication_synchro_timeout = value;
> + txn_limbo_on_parameters_change(&txn_limbo);
> return 0;
> }
>
> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
> index e28e2016f..2575f4c25 100644
> --- a/src/box/txn_limbo.c
> +++ b/src/box/txn_limbo.c
> @@ -39,6 +39,7 @@ txn_limbo_create(struct txn_limbo *limbo)
> {
> rlist_create(&limbo->queue);
> limbo->instance_id = REPLICA_ID_NIL;
> + fiber_cond_create(&limbo->wait_cond);
> vclock_create(&limbo->vclock);
> limbo->rollback_count = 0;
> }
> @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>
> assert(!txn_has_flag(txn, TXN_IS_DONE));
> assert(txn_has_flag(txn, TXN_WAIT_SYNC));
> - bool cancellable = fiber_set_cancellable(false);
> - bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
> - fiber_set_cancellable(cancellable);
> - if (timed_out) {
> - assert(!txn_limbo_is_empty(limbo));
> - if (txn_limbo_first_entry(limbo) != entry) {
> - /*
> - * If this is not a first entry in the
> - * limbo, it is definitely not a first
> - * timed out entry. And since it managed
> - * to time out too, it means there is
> - * currently another fiber writing
> - * rollback. Wait when it will finish and
> - * wake us up.
> - */
> - bool cancellable = fiber_set_cancellable(false);
> - fiber_yield();
> - fiber_set_cancellable(cancellable);
> - assert(txn_limbo_entry_is_complete(entry));
> + double start_time = fiber_clock();
> + while (true) {
> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> + bool cancellable = fiber_set_cancellable(false);
> + double timeout = deadline - fiber_clock();
Why not just timeout = txn_limbo_confirm_timeout(limbo) ?
It's look like
fiber_clock()(old) + txn_limbo_confirm_timeout(limbo) -
fiber_clock()(new) ~= txn_limbo_confirm_timeout(limbo)
> + bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
> + timeout);
> + fiber_set_cancellable(cancellable);
> + if (txn_limbo_entry_is_complete(entry))
> goto complete;
> - }
> + if (timed_out)
> + goto do_rollback;
> + }
>
> - txn_limbo_write_rollback(limbo, entry->lsn);
> - struct txn_limbo_entry *e, *tmp;
> - rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> - in_queue, tmp) {
> - e->is_rollback = true;
> - e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> - txn_limbo_pop(limbo, e);
> - txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> - txn_clear_flag(e->txn, TXN_WAIT_ACK);
> - txn_complete(e->txn);
> - if (e == entry)
> - break;
> - fiber_wakeup(e->txn->fiber);
> - }
> - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> - return -1;
> +do_rollback:
> + assert(!txn_limbo_is_empty(limbo));
> + if (txn_limbo_first_entry(limbo) != entry) {
> + /*
> + * If this is not a first entry in the limbo, it
> + * is definitely not a first timed out entry. And
> + * since it managed to time out too, it means
> + * there is currently another fiber writing
> + * rollback. Wait when it will finish and wake us
> + * up.
> + */
> + bool cancellable = fiber_set_cancellable(false);
> + do {
> + fiber_yield();
> + } while (!txn_limbo_entry_is_complete(entry));
> + fiber_set_cancellable(cancellable);
> + goto complete;
> }
> +
> + txn_limbo_write_rollback(limbo, entry->lsn);
> + struct txn_limbo_entry *e, *tmp;
> + rlist_foreach_entry_safe_reverse(e, &limbo->queue,
> + in_queue, tmp) {
> + e->is_rollback = true;
> + e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT;
> + txn_limbo_pop(limbo, e);
> + txn_clear_flag(e->txn, TXN_WAIT_SYNC);
> + txn_clear_flag(e->txn, TXN_WAIT_ACK);
> + txn_complete(e->txn);
> + if (e == entry)
> + break;
> + fiber_wakeup(e->txn->fiber);
> + }
> + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> + return -1;
> +
> complete:
> assert(txn_limbo_entry_is_complete(entry));
> /*
> @@ -421,15 +433,13 @@ txn_limbo_confirm_timeout(struct txn_limbo *limbo)
> * or array instead of the boolean.
> */
> struct confirm_waitpoint {
> - /**
> - * Variable for wake up the fiber that is waiting for
> - * the end of confirmation.
> - */
> - struct fiber_cond confirm_cond;
> + /** Fiber that is waiting for the end of confirmation. */
> + struct fiber *caller;
> /**
> * Result flag.
> */
> bool is_confirm;
> + bool is_rollback;
> };
>
> static int
> @@ -439,7 +449,7 @@ txn_commit_cb(struct trigger *trigger, void *event)
> struct confirm_waitpoint *cwp =
> (struct confirm_waitpoint *)trigger->data;
> cwp->is_confirm = true;
> - fiber_cond_signal(&cwp->confirm_cond);
> + fiber_wakeup(cwp->caller);
> return 0;
> }
>
> @@ -449,7 +459,8 @@ txn_rollback_cb(struct trigger *trigger, void *event)
> (void)event;
> struct confirm_waitpoint *cwp =
> (struct confirm_waitpoint *)trigger->data;
> - fiber_cond_signal(&cwp->confirm_cond);
> + cwp->is_rollback = true;
> + fiber_wakeup(cwp->caller);
> return 0;
> }
>
> @@ -461,8 +472,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
>
> /* initialization of a waitpoint. */
> struct confirm_waitpoint cwp;
> - fiber_cond_create(&cwp.confirm_cond);
> + cwp.caller = fiber();
> cwp.is_confirm = false;
> + cwp.is_rollback = false;
>
> /* Set triggers for the last limbo transaction. */
> struct trigger on_complete;
> @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
> struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
> txn_on_commit(tle->txn, &on_complete);
> txn_on_rollback(tle->txn, &on_rollback);
> -
> - int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
> - txn_limbo_confirm_timeout(limbo));
> - fiber_cond_destroy(&cwp.confirm_cond);
> - if (rc != 0) {
> - /* Clear the triggers if the timeout has been reached. */
> - trigger_clear(&on_complete);
> - trigger_clear(&on_rollback);
> - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> - return -1;
> + double start_time = fiber_clock(); > + while (true) {
> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
> + bool cancellable = fiber_set_cancellable(false);
> + double timeout = deadline - fiber_clock();
Maybe add a comment about the possible reasons of wake up
(reconfiguration, triggers, timeout).
> + int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);
If I understand correctly, you use a trick for wake up from triggers.
In this case fiber_wakeup invoke manually without using the fiber_cond
API. In this context, I have few questions/comments:
- IMHO, this is not true way to use implemantation details of fiber_cond
passing over the fiber_cond API.
- In the case, we don't interact with waiters list inside condition
variable. Are you sure this is ok?
> + fiber_set_cancellable(cancellable);
> + if (cwp.is_confirm || cwp.is_rollback)
> + goto complete;
> + if (rc != 0)
> + goto timed_out;
> }
> +timed_out:
> + /* Clear the triggers if the timeout has been reached. */
> + trigger_clear(&on_complete);
> + trigger_clear(&on_rollback);
> + diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
> + return -1;
> +
> +complete:
> if (!cwp.is_confirm) {
> /* The transaction has been rolled back. */
> diag_set(ClientError, ER_SYNC_ROLLBACK);
> @@ -517,6 +538,39 @@ txn_limbo_force_empty(struct txn_limbo *limbo, int64_t confirm_lsn)
> }
> }
>
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo)
> +{
> + if (rlist_empty(&limbo->queue))
> + return;
> + struct txn_limbo_entry *e;
> + int64_t confirm_lsn = -1;
> + rlist_foreach_entry(e, &limbo->queue, in_queue) {
> + assert(e->ack_count <= VCLOCK_MAX);
> + if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) {
> + assert(e->lsn == -1);
> + if (confirm_lsn == -1)
> + continue;
> + } else if (e->ack_count < replication_synchro_quorum) {
> + continue;
> + } else {
> + confirm_lsn = e->lsn;
> + assert(confirm_lsn > 0);
> + }
> + e->is_commit = true;
> + }
> + if (confirm_lsn > 0 &&
> + txn_limbo_write_confirm(limbo, confirm_lsn) != 0) {
> + panic("Couldn't write CONFIRM to WAL");
> + return;
> + }
> + /*
> + * Wakeup all. Confirmed will be committed. Timed out will
> + * rollback.
> + */
> + fiber_cond_broadcast(&limbo->wait_cond);
> +}
> +
> void
> txn_limbo_init(void)
> {
> diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h
> index 0d56d0d69..1ee416231 100644
> --- a/src/box/txn_limbo.h
> +++ b/src/box/txn_limbo.h
> @@ -103,6 +103,13 @@ struct txn_limbo {
> * LSNs in their vclock components.
> */
> uint32_t instance_id;
> + /**
> + * Condition to wait for completion. It is supposed to be
> + * signaled when the synchro parameters change. Allowing
> + * the sleeping fibers to reconsider their timeouts when
> + * the parameters are updated.
> + */
> + struct fiber_cond wait_cond;
> /**
> * All components of the vclock are versions of the limbo
> * owner's LSN, how it is visible on other nodes. For
> @@ -219,6 +226,9 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo);
> void
> txn_limbo_force_empty(struct txn_limbo *limbo, int64_t last_confirm);
>
> +void
> +txn_limbo_on_parameters_change(struct txn_limbo *limbo);
> +
> void
> txn_limbo_init();
>
> diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
> index 3e28607b0..59f5d9123 100644
> --- a/test/replication/qsync_basic.result
> +++ b/test/replication/qsync_basic.result
> @@ -475,6 +475,96 @@ box.space.sync:select{7}
> | - - [7]
> | ...
>
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_timeout = 0.001}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - false
> + | - Quorum collection for a synchronous transaction is timed out
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{11}
> + | ---
> + | - []
> + | ...
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +ok, err = nil
> + | ---
> + | ...
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
> +end)
> + | ---
> + | ...
> +f:status()
> + | ---
> + | - suspended
> + | ...
> +box.cfg{replication_synchro_quorum = 2}
> + | ---
> + | ...
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> + | ---
> + | - true
> + | ...
> +ok, err
> + | ---
> + | - true
> + | - [12]
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{12}
> + | ---
> + | - - [12]
> + | ...
> +
> -- Cleanup.
> test_run:cmd('switch default')
> | ---
> diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
> index 860d6d6c4..1bb3ba87d 100644
> --- a/test/replication/qsync_basic.test.lua
> +++ b/test/replication/qsync_basic.test.lua
> @@ -198,6 +198,38 @@ assert(newlsn >= oldlsn + 2)
> test_run:switch('replica')
> box.space.sync:select{7}
>
> +--
> +-- gh-5119: dynamic limbo configuration. Updated parameters should
> +-- be applied even to existing transactions.
> +--
> +test_run:switch('default')
> +box.cfg{replication_synchro_quorum = 3, replication_synchro_timeout = 1000}
> +ok, err = nil
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {11}) \
> +end)
> +f:status()
> +box.cfg{replication_synchro_timeout = 0.001}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{11}
> +test_run:switch('replica')
> +box.space.sync:select{11}
> +
> +-- Test it is possible to early ACK a transaction with a new quorum.
> +test_run:switch('default')
> +ok, err = nil
> +f = fiber.create(function() \
> + ok, err = pcall(box.space.sync.insert, box.space.sync, {12}) \
> +end)
> +f:status()
> +box.cfg{replication_synchro_quorum = 2}
> +test_run:wait_cond(function() return f:status() == 'dead' end)
> +ok, err
> +box.space.sync:select{12}
> +test_run:switch('replica')
> +box.space.sync:select{12}
> +
> -- Cleanup.
> test_run:cmd('switch default')
>
>
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config
2020-07-09 21:40 ` Leonid Vasiliev
@ 2020-07-10 0:23 ` Vladislav Shpilevoy
0 siblings, 0 replies; 4+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-10 0:23 UTC (permalink / raw)
To: Leonid Vasiliev, tarantool-patches
>> diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c
>> index e28e2016f..2575f4c25 100644
>> --- a/src/box/txn_limbo.c
>> +++ b/src/box/txn_limbo.c
>> @@ -159,45 +160,56 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry)
>> assert(!txn_has_flag(txn, TXN_IS_DONE));
>> assert(txn_has_flag(txn, TXN_WAIT_SYNC));
>> - bool cancellable = fiber_set_cancellable(false);
>> - bool timed_out = fiber_yield_timeout(txn_limbo_confirm_timeout(limbo));
>> - fiber_set_cancellable(cancellable);
>> - if (timed_out) {
>> - assert(!txn_limbo_is_empty(limbo));
>> - if (txn_limbo_first_entry(limbo) != entry) {
>> - /*
>> - * If this is not a first entry in the
>> - * limbo, it is definitely not a first
>> - * timed out entry. And since it managed
>> - * to time out too, it means there is
>> - * currently another fiber writing
>> - * rollback. Wait when it will finish and
>> - * wake us up.
>> - */
>> - bool cancellable = fiber_set_cancellable(false);
>> - fiber_yield();
>> - fiber_set_cancellable(cancellable);
>> - assert(txn_limbo_entry_is_complete(entry));
>> + double start_time = fiber_clock();
>> + while (true) {
>> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
>> + bool cancellable = fiber_set_cancellable(false);
>> + double timeout = deadline - fiber_clock();
>
> Why not just timeout = txn_limbo_confirm_timeout(limbo) ?
> It's look like
> fiber_clock()(old) + txn_limbo_confirm_timeout(limbo) - fiber_clock()(new) ~= txn_limbo_confirm_timeout(limbo)
Because it will wait infinitely in case I will constantly update
the parameters. Deadline will move forward and forward. It
shouldn't.
>> + bool timed_out = fiber_cond_wait_timeout(&limbo->wait_cond,
>> + timeout);
>> + fiber_set_cancellable(cancellable);
>> + if (txn_limbo_entry_is_complete(entry))
>> goto complete;
>> - }
>> + if (timed_out)
>> + goto do_rollback;
>> + }
>> @@ -472,17 +484,26 @@ txn_limbo_wait_confirm(struct txn_limbo *limbo)
>> struct txn_limbo_entry *tle = txn_limbo_last_entry(limbo);
>> txn_on_commit(tle->txn, &on_complete);
>> txn_on_rollback(tle->txn, &on_rollback);
>> -
>> - int rc = fiber_cond_wait_timeout(&cwp.confirm_cond,
>> - txn_limbo_confirm_timeout(limbo));
>> - fiber_cond_destroy(&cwp.confirm_cond);
>> - if (rc != 0) {
>> - /* Clear the triggers if the timeout has been reached. */
>> - trigger_clear(&on_complete);
>> - trigger_clear(&on_rollback);
>> - diag_set(ClientError, ER_SYNC_QUORUM_TIMEOUT);
>> - return -1;
>> + double start_time = fiber_clock(); > + while (true) {
>> + double deadline = start_time + txn_limbo_confirm_timeout(limbo);
>> + bool cancellable = fiber_set_cancellable(false);
>> + double timeout = deadline - fiber_clock();
>
> Maybe add a comment about the possible reasons of wake up
> (reconfiguration, triggers, timeout).
Wait_cond has a comment about that already.
>> + int rc = fiber_cond_wait_timeout(&limbo->wait_cond, timeout);
>
> If I understand correctly, you use a trick for wake up from triggers.
> In this case fiber_wakeup invoke manually without using the fiber_cond
> API. In this context, I have few questions/comments:
> - IMHO, this is not true way to use implemantation details of fiber_cond
> passing over the fiber_cond API.
> - In the case, we don't interact with waiters list inside condition
> variable. Are you sure this is ok?
I am sure that it is ok to use fiber_wakeup(). Moreover, you can't use
fiber_cond API here, because you will wakeup other fibers, not related
to the waiter. So the direct wakeup is the only way.
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2020-07-10 0:23 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-07-06 22:43 [Tarantool-patches] [PATCH 1/1] txn_limbo: introduce dynamic synchro config Vladislav Shpilevoy
2020-07-06 22:55 ` Vladislav Shpilevoy
2020-07-09 21:40 ` Leonid Vasiliev
2020-07-10 0:23 ` Vladislav Shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox