[Tarantool-patches] [PATCH v2 15/19] applier: send heartbeat not only on commit, but on any write
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Tue Jun 30 02:15:16 MSK 2020
Concept of 'commit' becomes not 100% matching WAL write event,
when synchro replication comes.
And yet applier relied on commit event when sent periodic
hearbeats to tell the master the replica's new vclock.
The patch makes applier send heartbeats on any write event. Even
if it was not commit. For example, when a sync transaction's
data was written, and the replica needs to tell the master ACK
using the heartbeat.
Closes #5100
---
src/box/applier.cc | 25 +++++++-
.../sync_replication_sanity.result | 59 ++++++++++++++++++-
.../sync_replication_sanity.test.lua | 32 +++++++++-
3 files changed, 107 insertions(+), 9 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 635a9849c..a9baf0d69 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -755,6 +755,11 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
{
(void) trigger;
struct txn *txn = (struct txn *) event;
+ /*
+ * Let the txn module free the transaction object. It is
+ * not needed for anything else.
+ */
+ txn->fiber = NULL;
/*
* Synchronous transaction rollback due to receiving a
* ROLLBACK entry is a normal event and requires no
@@ -791,6 +796,14 @@ static int
applier_txn_commit_cb(struct trigger *trigger, void *event)
{
(void) trigger;
+ struct txn *txn = (struct txn *)event;
+ assert(txn->fiber != NULL);
+ assert(strncmp(txn->fiber->name, "applierw", 8) == 0);
+ /*
+ * Let the txn module free the transaction object. It is
+ * not needed for anything else.
+ */
+ txn->fiber = NULL;
/* Broadcast the commit event across all appliers. */
trigger_run(&replicaset.applier.on_commit, event);
return 0;
@@ -802,7 +815,7 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
* Return 0 for success or -1 in case of an error.
*/
static int
-applier_apply_tx(struct stailq *rows)
+applier_apply_tx(struct stailq *rows, struct fiber *writer)
{
struct xrow_header *first_row = &stailq_first_entry(rows,
struct applier_tx_row, next)->row;
@@ -894,7 +907,13 @@ applier_apply_tx(struct stailq *rows)
trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
txn_on_commit(txn, on_commit);
-
+ /*
+ * Wakeup the writer fiber after the transaction is
+ * completed. To send ACK to the master. In case of async
+ * transaction it is the same as commit event. In case of
+ * sync it happens after the data is written to WAL.
+ */
+ txn->fiber = writer;
if (txn_commit_async(txn) < 0)
goto fail;
@@ -1092,7 +1111,7 @@ applier_subscribe(struct applier *applier)
if (stailq_first_entry(&rows, struct applier_tx_row,
next)->row.lsn == 0)
fiber_wakeup(applier->writer);
- else if (applier_apply_tx(&rows) != 0)
+ else if (applier_apply_tx(&rows, applier->writer) != 0)
diag_raise();
if (ibuf_used(ibuf) == 0)
diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
index 4b9823d77..8b37ba6f5 100644
--- a/test/replication/sync_replication_sanity.result
+++ b/test/replication/sync_replication_sanity.result
@@ -90,10 +90,10 @@ box.schema.user.grant('guest', 'replication')
| ---
| ...
-- Set up synchronous replication options.
-quorum = box.cfg.replication_synchro_quorum
+old_synchro_quorum = box.cfg.replication_synchro_quorum
| ---
| ...
-timeout = box.cfg.replication_synchro_timeout
+old_synchro_timeout = box.cfg.replication_synchro_timeout
| ---
| ...
box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
@@ -178,13 +178,63 @@ box.space.sync:select{}
| - [3]
| ...
+--
+-- gh-5100: replica should send ACKs for sync transactions after
+-- WAL write immediately, not waiting for replication timeout or
+-- a CONFIRM.
+--
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+old_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+-- Commit something non-sync. So as applier writer fiber would
+-- flush the pending heartbeat and go to sleep with the new huge
+-- replication timeout.
+s = box.schema.create_space('test')
+ | ---
+ | ...
+pk = s:create_index('pk')
+ | ---
+ | ...
+s:replace{1}
+ | ---
+ | - [1]
+ | ...
+-- Now commit something sync. It should return immediately even
+-- though the replication timeout is huge.
+box.space.sync:replace{4}
+ | ---
+ | - [4]
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{4}
+ | ---
+ | - - [4]
+ | ...
+
-- Cleanup.
test_run:cmd('switch default')
| ---
| - true
| ...
-box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
+box.cfg{ \
+ replication_synchro_quorum = old_synchro_quorum, \
+ replication_synchro_timeout = old_synchro_timeout, \
+ replication_timeout = old_timeout, \
+}
| ---
| ...
test_run:cmd('stop server replica')
@@ -195,6 +245,9 @@ test_run:cmd('delete server replica')
| ---
| - true
| ...
+box.space.test:drop()
+ | ---
+ | ...
box.space.sync:drop()
| ---
| ...
diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
index 8715a4600..b0326fd4b 100644
--- a/test/replication/sync_replication_sanity.test.lua
+++ b/test/replication/sync_replication_sanity.test.lua
@@ -38,8 +38,8 @@ engine = test_run:get_cfg('engine')
box.schema.user.grant('guest', 'replication')
-- Set up synchronous replication options.
-quorum = box.cfg.replication_synchro_quorum
-timeout = box.cfg.replication_synchro_timeout
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
test_run:cmd('create server replica with rpl_master=default,\
@@ -71,11 +71,37 @@ box.space.sync:select{}
test_run:cmd('restart server replica')
box.space.sync:select{}
+--
+-- gh-5100: replica should send ACKs for sync transactions after
+-- WAL write immediately, not waiting for replication timeout or
+-- a CONFIRM.
+--
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+test_run:switch('default')
+old_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+-- Commit something non-sync. So as applier writer fiber would
+-- flush the pending heartbeat and go to sleep with the new huge
+-- replication timeout.
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+-- Now commit something sync. It should return immediately even
+-- though the replication timeout is huge.
+box.space.sync:replace{4}
+test_run:switch('replica')
+box.space.sync:select{4}
+
-- Cleanup.
test_run:cmd('switch default')
-box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
+box.cfg{ \
+ replication_synchro_quorum = old_synchro_quorum, \
+ replication_synchro_timeout = old_synchro_timeout, \
+ replication_timeout = old_timeout, \
+}
test_run:cmd('stop server replica')
test_run:cmd('delete server replica')
+box.space.test:drop()
box.space.sync:drop()
box.schema.user.revoke('guest', 'replication')
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list