[Tarantool-patches] [PATCH v2 15/19] applier: send heartbeat not only on commit, but on any write
Serge Petrenko
sergepetrenko at tarantool.org
Fri Jul 3 15:23:31 MSK 2020
30.06.2020 02:15, Vladislav Shpilevoy пишет:
> Concept of 'commit' becomes not 100% matching WAL write event,
> when synchro replication comes.
>
> And yet applier relied on commit event when sent periodic
> hearbeats to tell the master the replica's new vclock.
>
> The patch makes applier send heartbeats on any write event. Even
> if it was not commit. For example, when a sync transaction's
> data was written, and the replica needs to tell the master ACK
> using the heartbeat.
>
> Closes #5100
> ---
> src/box/applier.cc | 25 +++++++-
> .../sync_replication_sanity.result | 59 ++++++++++++++++++-
> .../sync_replication_sanity.test.lua | 32 +++++++++-
> 3 files changed, 107 insertions(+), 9 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 635a9849c..a9baf0d69 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -755,6 +755,11 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
> {
> (void) trigger;
> struct txn *txn = (struct txn *) event;
> + /*
> + * Let the txn module free the transaction object. It is
> + * not needed for anything else.
> + */
> + txn->fiber = NULL;
> /*
> * Synchronous transaction rollback due to receiving a
> * ROLLBACK entry is a normal event and requires no
> @@ -791,6 +796,14 @@ static int
> applier_txn_commit_cb(struct trigger *trigger, void *event)
> {
> (void) trigger;
> + struct txn *txn = (struct txn *)event;
> + assert(txn->fiber != NULL);
> + assert(strncmp(txn->fiber->name, "applierw", 8) == 0);
> + /*
> + * Let the txn module free the transaction object. It is
> + * not needed for anything else.
> + */
> + txn->fiber = NULL;
> /* Broadcast the commit event across all appliers. */
> trigger_run(&replicaset.applier.on_commit, event);
> return 0;
> @@ -802,7 +815,7 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
> * Return 0 for success or -1 in case of an error.
> */
> static int
> -applier_apply_tx(struct stailq *rows)
> +applier_apply_tx(struct stailq *rows, struct fiber *writer)
> {
> struct xrow_header *first_row = &stailq_first_entry(rows,
> struct applier_tx_row, next)->row;
> @@ -894,7 +907,13 @@ applier_apply_tx(struct stailq *rows)
>
> trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
> txn_on_commit(txn, on_commit);
> -
> + /*
> + * Wakeup the writer fiber after the transaction is
> + * completed. To send ACK to the master. In case of async
> + * transaction it is the same as commit event. In case of
> + * sync it happens after the data is written to WAL.
> + */
> + txn->fiber = writer;
> if (txn_commit_async(txn) < 0)
> goto fail;
>
> @@ -1092,7 +1111,7 @@ applier_subscribe(struct applier *applier)
> if (stailq_first_entry(&rows, struct applier_tx_row,
> next)->row.lsn == 0)
> fiber_wakeup(applier->writer);
> - else if (applier_apply_tx(&rows) != 0)
> + else if (applier_apply_tx(&rows, applier->writer) != 0)
> diag_raise();
>
> if (ibuf_used(ibuf) == 0)
> diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
> index 4b9823d77..8b37ba6f5 100644
> --- a/test/replication/sync_replication_sanity.result
> +++ b/test/replication/sync_replication_sanity.result
> @@ -90,10 +90,10 @@ box.schema.user.grant('guest', 'replication')
> | ---
> | ...
> -- Set up synchronous replication options.
> -quorum = box.cfg.replication_synchro_quorum
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> | ---
> | ...
> -timeout = box.cfg.replication_synchro_timeout
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> | ---
> | ...
> box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
> @@ -178,13 +178,63 @@ box.space.sync:select{}
> | - [3]
> | ...
>
> +--
> +-- gh-5100: replica should send ACKs for sync transactions after
> +-- WAL write immediately, not waiting for replication timeout or
> +-- a CONFIRM.
> +--
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +old_timeout = box.cfg.replication_timeout
> + | ---
> + | ...
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +-- Commit something non-sync. So as applier writer fiber would
> +-- flush the pending heartbeat and go to sleep with the new huge
> +-- replication timeout.
> +s = box.schema.create_space('test')
> + | ---
> + | ...
> +pk = s:create_index('pk')
> + | ---
> + | ...
> +s:replace{1}
> + | ---
> + | - [1]
> + | ...
> +-- Now commit something sync. It should return immediately even
> +-- though the replication timeout is huge.
> +box.space.sync:replace{4}
> + | ---
> + | - [4]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{4}
> + | ---
> + | - - [4]
> + | ...
> +
> -- Cleanup.
> test_run:cmd('switch default')
> | ---
> | - true
> | ...
>
> -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
> +box.cfg{ \
> + replication_synchro_quorum = old_synchro_quorum, \
> + replication_synchro_timeout = old_synchro_timeout, \
> + replication_timeout = old_timeout, \
> +}
> | ---
> | ...
> test_run:cmd('stop server replica')
> @@ -195,6 +245,9 @@ test_run:cmd('delete server replica')
> | ---
> | - true
> | ...
> +box.space.test:drop()
> + | ---
> + | ...
> box.space.sync:drop()
> | ---
> | ...
> diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
> index 8715a4600..b0326fd4b 100644
> --- a/test/replication/sync_replication_sanity.test.lua
> +++ b/test/replication/sync_replication_sanity.test.lua
> @@ -38,8 +38,8 @@ engine = test_run:get_cfg('engine')
>
> box.schema.user.grant('guest', 'replication')
> -- Set up synchronous replication options.
> -quorum = box.cfg.replication_synchro_quorum
> -timeout = box.cfg.replication_synchro_timeout
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
> box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
>
> test_run:cmd('create server replica with rpl_master=default,\
> @@ -71,11 +71,37 @@ box.space.sync:select{}
> test_run:cmd('restart server replica')
> box.space.sync:select{}
>
> +--
> +-- gh-5100: replica should send ACKs for sync transactions after
> +-- WAL write immediately, not waiting for replication timeout or
> +-- a CONFIRM.
> +--
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> +test_run:switch('default')
> +old_timeout = box.cfg.replication_timeout
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> +-- Commit something non-sync. So as applier writer fiber would
> +-- flush the pending heartbeat and go to sleep with the new huge
> +-- replication timeout.
> +s = box.schema.create_space('test')
> +pk = s:create_index('pk')
> +s:replace{1}
> +-- Now commit something sync. It should return immediately even
> +-- though the replication timeout is huge.
> +box.space.sync:replace{4}
> +test_run:switch('replica')
> +box.space.sync:select{4}
> +
> -- Cleanup.
> test_run:cmd('switch default')
>
> -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
> +box.cfg{ \
> + replication_synchro_quorum = old_synchro_quorum, \
> + replication_synchro_timeout = old_synchro_timeout, \
> + replication_timeout = old_timeout, \
> +}
> test_run:cmd('stop server replica')
> test_run:cmd('delete server replica')
> +box.space.test:drop()
> box.space.sync:drop()
> box.schema.user.revoke('guest', 'replication')
Thanks! LGTM.
--
Serge Petrenko
More information about the Tarantool-patches
mailing list