[Tarantool-patches] [PATCH v2 15/19] applier: send heartbeat not only on commit, but on any write

Serge Petrenko sergepetrenko at tarantool.org
Fri Jul 3 15:23:31 MSK 2020


30.06.2020 02:15, Vladislav Shpilevoy пишет:
> Concept of 'commit' becomes not 100% matching WAL write event,
> when synchro replication comes.
>
> And yet applier relied on commit event when sent periodic
> hearbeats to tell the master the replica's new vclock.
>
> The patch makes applier send heartbeats on any write event. Even
> if it was not commit. For example, when a sync transaction's
> data was written, and the replica needs to tell the master ACK
> using the heartbeat.
>
> Closes #5100
> ---
>   src/box/applier.cc                            | 25 +++++++-
>   .../sync_replication_sanity.result            | 59 ++++++++++++++++++-
>   .../sync_replication_sanity.test.lua          | 32 +++++++++-
>   3 files changed, 107 insertions(+), 9 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 635a9849c..a9baf0d69 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -755,6 +755,11 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
>   {
>   	(void) trigger;
>   	struct txn *txn = (struct txn *) event;
> +	/*
> +	 * Let the txn module free the transaction object. It is
> +	 * not needed for anything else.
> +	 */
> +	txn->fiber = NULL;
>   	/*
>   	 * Synchronous transaction rollback due to receiving a
>   	 * ROLLBACK entry is a normal event and requires no
> @@ -791,6 +796,14 @@ static int
>   applier_txn_commit_cb(struct trigger *trigger, void *event)
>   {
>   	(void) trigger;
> +	struct txn *txn = (struct txn *)event;
> +	assert(txn->fiber != NULL);
> +	assert(strncmp(txn->fiber->name, "applierw", 8) == 0);
> +	/*
> +	 * Let the txn module free the transaction object. It is
> +	 * not needed for anything else.
> +	 */
> +	txn->fiber = NULL;
>   	/* Broadcast the commit event across all appliers. */
>   	trigger_run(&replicaset.applier.on_commit, event);
>   	return 0;
> @@ -802,7 +815,7 @@ applier_txn_commit_cb(struct trigger *trigger, void *event)
>    * Return 0 for success or -1 in case of an error.
>    */
>   static int
> -applier_apply_tx(struct stailq *rows)
> +applier_apply_tx(struct stailq *rows, struct fiber *writer)
>   {
>   	struct xrow_header *first_row = &stailq_first_entry(rows,
>   					struct applier_tx_row, next)->row;
> @@ -894,7 +907,13 @@ applier_apply_tx(struct stailq *rows)
>   
>   	trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
>   	txn_on_commit(txn, on_commit);
> -
> +	/*
> +	 * Wakeup the writer fiber after the transaction is
> +	 * completed. To send ACK to the master. In case of async
> +	 * transaction it is the same as commit event. In case of
> +	 * sync it happens after the data is written to WAL.
> +	 */
> +	txn->fiber = writer;
>   	if (txn_commit_async(txn) < 0)
>   		goto fail;
>   
> @@ -1092,7 +1111,7 @@ applier_subscribe(struct applier *applier)
>   		if (stailq_first_entry(&rows, struct applier_tx_row,
>   				       next)->row.lsn == 0)
>   			fiber_wakeup(applier->writer);
> -		else if (applier_apply_tx(&rows) != 0)
> +		else if (applier_apply_tx(&rows, applier->writer) != 0)
>   			diag_raise();
>   
>   		if (ibuf_used(ibuf) == 0)
> diff --git a/test/replication/sync_replication_sanity.result b/test/replication/sync_replication_sanity.result
> index 4b9823d77..8b37ba6f5 100644
> --- a/test/replication/sync_replication_sanity.result
> +++ b/test/replication/sync_replication_sanity.result
> @@ -90,10 +90,10 @@ box.schema.user.grant('guest', 'replication')
>    | ---
>    | ...
>   -- Set up synchronous replication options.
> -quorum = box.cfg.replication_synchro_quorum
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
>    | ---
>    | ...
> -timeout = box.cfg.replication_synchro_timeout
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
>    | ---
>    | ...
>   box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
> @@ -178,13 +178,63 @@ box.space.sync:select{}
>    |   - [3]
>    | ...
>   
> +--
> +-- gh-5100: replica should send ACKs for sync transactions after
> +-- WAL write immediately, not waiting for replication timeout or
> +-- a CONFIRM.
> +--
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +test_run:switch('default')
> + | ---
> + | - true
> + | ...
> +old_timeout = box.cfg.replication_timeout
> + | ---
> + | ...
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> + | ---
> + | ...
> +-- Commit something non-sync. So as applier writer fiber would
> +-- flush the pending heartbeat and go to sleep with the new huge
> +-- replication timeout.
> +s = box.schema.create_space('test')
> + | ---
> + | ...
> +pk = s:create_index('pk')
> + | ---
> + | ...
> +s:replace{1}
> + | ---
> + | - [1]
> + | ...
> +-- Now commit something sync. It should return immediately even
> +-- though the replication timeout is huge.
> +box.space.sync:replace{4}
> + | ---
> + | - [4]
> + | ...
> +test_run:switch('replica')
> + | ---
> + | - true
> + | ...
> +box.space.sync:select{4}
> + | ---
> + | - - [4]
> + | ...
> +
>   -- Cleanup.
>   test_run:cmd('switch default')
>    | ---
>    | - true
>    | ...
>   
> -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
> +box.cfg{                                                                        \
> +    replication_synchro_quorum = old_synchro_quorum,                            \
> +    replication_synchro_timeout = old_synchro_timeout,                          \
> +    replication_timeout = old_timeout,                                          \
> +}
>    | ---
>    | ...
>   test_run:cmd('stop server replica')
> @@ -195,6 +245,9 @@ test_run:cmd('delete server replica')
>    | ---
>    | - true
>    | ...
> +box.space.test:drop()
> + | ---
> + | ...
>   box.space.sync:drop()
>    | ---
>    | ...
> diff --git a/test/replication/sync_replication_sanity.test.lua b/test/replication/sync_replication_sanity.test.lua
> index 8715a4600..b0326fd4b 100644
> --- a/test/replication/sync_replication_sanity.test.lua
> +++ b/test/replication/sync_replication_sanity.test.lua
> @@ -38,8 +38,8 @@ engine = test_run:get_cfg('engine')
>   
>   box.schema.user.grant('guest', 'replication')
>   -- Set up synchronous replication options.
> -quorum = box.cfg.replication_synchro_quorum
> -timeout = box.cfg.replication_synchro_timeout
> +old_synchro_quorum = box.cfg.replication_synchro_quorum
> +old_synchro_timeout = box.cfg.replication_synchro_timeout
>   box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
>   
>   test_run:cmd('create server replica with rpl_master=default,\
> @@ -71,11 +71,37 @@ box.space.sync:select{}
>   test_run:cmd('restart server replica')
>   box.space.sync:select{}
>   
> +--
> +-- gh-5100: replica should send ACKs for sync transactions after
> +-- WAL write immediately, not waiting for replication timeout or
> +-- a CONFIRM.
> +--
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> +test_run:switch('default')
> +old_timeout = box.cfg.replication_timeout
> +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
> +-- Commit something non-sync. So as applier writer fiber would
> +-- flush the pending heartbeat and go to sleep with the new huge
> +-- replication timeout.
> +s = box.schema.create_space('test')
> +pk = s:create_index('pk')
> +s:replace{1}
> +-- Now commit something sync. It should return immediately even
> +-- though the replication timeout is huge.
> +box.space.sync:replace{4}
> +test_run:switch('replica')
> +box.space.sync:select{4}
> +
>   -- Cleanup.
>   test_run:cmd('switch default')
>   
> -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
> +box.cfg{                                                                        \
> +    replication_synchro_quorum = old_synchro_quorum,                            \
> +    replication_synchro_timeout = old_synchro_timeout,                          \
> +    replication_timeout = old_timeout,                                          \
> +}
>   test_run:cmd('stop server replica')
>   test_run:cmd('delete server replica')
> +box.space.test:drop()
>   box.space.sync:drop()
>   box.schema.user.revoke('guest', 'replication')

Thanks! LGTM.

-- 
Serge Petrenko



More information about the Tarantool-patches mailing list