From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> To: tarantool-patches@dev.tarantool.org, sergepetrenko@tarantool.org Subject: [Tarantool-patches] [PATCH 3/3] applier: use WAL write event instead of commit for ACK Date: Sun, 5 Jul 2020 17:20:16 +0200 [thread overview] Message-ID: <6b83ac7e6c7e2606bc3350efda5e165df366e5b5.1593962115.git.v.shpilevoy@tarantool.org> (raw) In-Reply-To: <cover.1593962115.git.v.shpilevoy@tarantool.org> Applier used to send ACKs to master when commit happens. But for sync transactions this is not enough - their ACK should be sent after WAL write. Master doesn't really care whether a commit will happen after WAL write on the replica. The only thing which matters is whether the replica managed to persist the sync transaction. Now applier uses WAL write event instead of commit to send ACKs. Nothing changed for async transactions (for them WAL write == commit). But sync transactions now send ACKs immediately, without waiting for heartbeat timeout. Closes #5100 Closes #5127 --- src/box/applier.cc | 36 ++++---- src/box/replication.cc | 2 +- src/box/replication.h | 4 +- src/lib/core/errinj.h | 1 + test/replication/qsync_basic.result | 59 ++++++++++++- test/replication/qsync_basic.test.lua | 32 ++++++- test/replication/qsync_errinj.result | 114 +++++++++++++++++++++++++ test/replication/qsync_errinj.test.lua | 46 ++++++++++ 8 files changed, 269 insertions(+), 25 deletions(-) create mode 100644 test/replication/qsync_errinj.result create mode 100644 test/replication/qsync_errinj.test.lua diff --git a/src/box/applier.cc b/src/box/applier.cc index fccde877d..d82613b56 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -45,6 +45,7 @@ #include "trigger.h" #include "xrow_io.h" #include "error.h" +#include "errinj.h" #include "session.h" #include "cfg.h" #include "schema.h" @@ -179,6 +180,9 @@ applier_writer_f(va_list ap) struct xrow_header xrow; xrow_encode_vclock(&xrow, &replicaset.vclock); coio_write_xrow(&io, &xrow); + ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, { + fiber_sleep(0.01); + }); /* * Even if new ACK is requested during the * write, don't send it again right away. @@ -807,11 +811,11 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event) } static int -applier_txn_commit_cb(struct trigger *trigger, void *event) +applier_txn_wal_write_cb(struct trigger *trigger, void *event) { (void) trigger; /* Broadcast the commit event across all appliers. */ - trigger_run(&replicaset.applier.on_commit, event); + trigger_run(&replicaset.applier.on_wal_write, event); return 0; } @@ -896,23 +900,23 @@ applier_apply_tx(struct stailq *rows) } /* We are ready to submit txn to wal. */ - struct trigger *on_rollback, *on_commit; + struct trigger *on_rollback, *on_wal_write; size_t size; on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback), &size); - on_commit = region_alloc_object(&txn->region, typeof(*on_commit), - &size); - if (on_rollback == NULL || on_commit == NULL) { + on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write), + &size); + if (on_rollback == NULL || on_wal_write == NULL) { diag_set(OutOfMemory, size, "region_alloc_object", - "on_rollback/on_commit"); + "on_rollback/on_wal_write"); goto rollback; } trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); txn_on_rollback(txn, on_rollback); - trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL); - txn_on_commit(txn, on_commit); + trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL); + txn_on_wal_write(txn, on_wal_write); if (txn_commit_async(txn) < 0) goto fail; @@ -949,10 +953,10 @@ applier_signal_ack(struct applier *applier) } /* - * A trigger to update an applier state after a replication commit. + * A trigger to update an applier state after WAL write. */ static int -applier_on_commit(struct trigger *trigger, void *event) +applier_on_wal_write(struct trigger *trigger, void *event) { (void) event; struct applier *applier = (struct applier *)trigger->data; @@ -1085,17 +1089,17 @@ applier_subscribe(struct applier *applier) applier->lag = TIMEOUT_INFINITY; - /* Register triggers to handle replication commits and rollbacks. */ - struct trigger on_commit; - trigger_create(&on_commit, applier_on_commit, applier, NULL); - trigger_add(&replicaset.applier.on_commit, &on_commit); + /* Register triggers to handle WAL writes and rollbacks. */ + struct trigger on_wal_write; + trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL); + trigger_add(&replicaset.applier.on_wal_write, &on_wal_write); struct trigger on_rollback; trigger_create(&on_rollback, applier_on_rollback, applier, NULL); trigger_add(&replicaset.applier.on_rollback, &on_rollback); auto trigger_guard = make_scoped_guard([&] { - trigger_clear(&on_commit); + trigger_clear(&on_wal_write); trigger_clear(&on_rollback); }); diff --git a/src/box/replication.cc b/src/box/replication.cc index 01e9e876a..ef0e2411d 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -96,7 +96,7 @@ replication_init(void) vclock_create(&replicaset.applier.vclock); vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); rlist_create(&replicaset.applier.on_rollback); - rlist_create(&replicaset.applier.on_commit); + rlist_create(&replicaset.applier.on_wal_write); diag_create(&replicaset.applier.diag); } diff --git a/src/box/replication.h b/src/box/replication.h index a081870f9..ddc2bddf4 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -259,8 +259,8 @@ struct replicaset { struct vclock vclock; /* Trigger to fire when a replication request failed to apply. */ struct rlist on_rollback; - /* Trigget to fire a replication request commited to a wal. */ - struct rlist on_commit; + /* Trigget to fire a replication request when WAL write happens. */ + struct rlist on_wal_write; /* Shared applier diagnostic area. */ struct diag diag; } applier; diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index 261ab22a8..76b453003 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -146,6 +146,7 @@ struct errinj { _(ERRINJ_VY_RUN_OPEN, ERRINJ_INT, {.iparam = -1})\ _(ERRINJ_AUTO_UPGRADE, ERRINJ_BOOL, {.bparam = false})\ _(ERRINJ_COIO_WRITE_CHUNK, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_APPLIER_SLOW_ACK, ERRINJ_BOOL, {.bparam = false}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result index 4b9823d77..8b37ba6f5 100644 --- a/test/replication/qsync_basic.result +++ b/test/replication/qsync_basic.result @@ -90,10 +90,10 @@ box.schema.user.grant('guest', 'replication') | --- | ... -- Set up synchronous replication options. -quorum = box.cfg.replication_synchro_quorum +old_synchro_quorum = box.cfg.replication_synchro_quorum | --- | ... -timeout = box.cfg.replication_synchro_timeout +old_synchro_timeout = box.cfg.replication_synchro_timeout | --- | ... box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1} @@ -178,13 +178,63 @@ box.space.sync:select{} | - [3] | ... +-- +-- gh-5100: replica should send ACKs for sync transactions after +-- WAL write immediately, not waiting for replication timeout or +-- a CONFIRM. +-- +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000} + | --- + | ... +test_run:switch('default') + | --- + | - true + | ... +old_timeout = box.cfg.replication_timeout + | --- + | ... +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000} + | --- + | ... +-- Commit something non-sync. So as applier writer fiber would +-- flush the pending heartbeat and go to sleep with the new huge +-- replication timeout. +s = box.schema.create_space('test') + | --- + | ... +pk = s:create_index('pk') + | --- + | ... +s:replace{1} + | --- + | - [1] + | ... +-- Now commit something sync. It should return immediately even +-- though the replication timeout is huge. +box.space.sync:replace{4} + | --- + | - [4] + | ... +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:select{4} + | --- + | - - [4] + | ... + -- Cleanup. test_run:cmd('switch default') | --- | - true | ... -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout} +box.cfg{ \ + replication_synchro_quorum = old_synchro_quorum, \ + replication_synchro_timeout = old_synchro_timeout, \ + replication_timeout = old_timeout, \ +} | --- | ... test_run:cmd('stop server replica') @@ -195,6 +245,9 @@ test_run:cmd('delete server replica') | --- | - true | ... +box.space.test:drop() + | --- + | ... box.space.sync:drop() | --- | ... diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua index 8715a4600..b0326fd4b 100644 --- a/test/replication/qsync_basic.test.lua +++ b/test/replication/qsync_basic.test.lua @@ -38,8 +38,8 @@ engine = test_run:get_cfg('engine') box.schema.user.grant('guest', 'replication') -- Set up synchronous replication options. -quorum = box.cfg.replication_synchro_quorum -timeout = box.cfg.replication_synchro_timeout +old_synchro_quorum = box.cfg.replication_synchro_quorum +old_synchro_timeout = box.cfg.replication_synchro_timeout box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1} test_run:cmd('create server replica with rpl_master=default,\ @@ -71,11 +71,37 @@ box.space.sync:select{} test_run:cmd('restart server replica') box.space.sync:select{} +-- +-- gh-5100: replica should send ACKs for sync transactions after +-- WAL write immediately, not waiting for replication timeout or +-- a CONFIRM. +-- +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000} +test_run:switch('default') +old_timeout = box.cfg.replication_timeout +box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000} +-- Commit something non-sync. So as applier writer fiber would +-- flush the pending heartbeat and go to sleep with the new huge +-- replication timeout. +s = box.schema.create_space('test') +pk = s:create_index('pk') +s:replace{1} +-- Now commit something sync. It should return immediately even +-- though the replication timeout is huge. +box.space.sync:replace{4} +test_run:switch('replica') +box.space.sync:select{4} + -- Cleanup. test_run:cmd('switch default') -box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout} +box.cfg{ \ + replication_synchro_quorum = old_synchro_quorum, \ + replication_synchro_timeout = old_synchro_timeout, \ + replication_timeout = old_timeout, \ +} test_run:cmd('stop server replica') test_run:cmd('delete server replica') +box.space.test:drop() box.space.sync:drop() box.schema.user.revoke('guest', 'replication') diff --git a/test/replication/qsync_errinj.result b/test/replication/qsync_errinj.result new file mode 100644 index 000000000..cd7f2bc8d --- /dev/null +++ b/test/replication/qsync_errinj.result @@ -0,0 +1,114 @@ +-- test-run result file version 2 +test_run = require('test_run').new() + | --- + | ... +engine = test_run:get_cfg('engine') + | --- + | ... + +old_synchro_quorum = box.cfg.replication_synchro_quorum + | --- + | ... +old_synchro_timeout = box.cfg.replication_synchro_timeout + | --- + | ... +old_timeout = box.cfg.replication_timeout + | --- + | ... +box.schema.user.grant('guest', 'super') + | --- + | ... + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') + | --- + | - true + | ... +test_run:cmd('start server replica with wait=True, wait_load=True') + | --- + | - true + | ... + +_ = box.schema.space.create('sync', {is_sync = true, engine = engine}) + | --- + | ... +_ = box.space.sync:create_index('pk') + | --- + | ... + +-- +-- gh-5100: slow ACK sending shouldn't stun replica for the +-- replication timeout seconds. +-- +test_run:cmd('switch default') + | --- + | - true + | ... +box.cfg{replication_timeout = 1000, replication_synchro_quorum = 2, replication_synchro_timeout = 1000} + | --- + | ... + +test_run:switch('replica') + | --- + | - true + | ... +box.cfg{replication_timeout = 1000} + | --- + | ... +box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', true) + | --- + | - ok + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... +for i = 1, 10 do box.space.sync:replace{i} end + | --- + | ... +box.space.sync:count() + | --- + | - 10 + | ... + +test_run:switch('replica') + | --- + | - true + | ... +box.space.sync:count() + | --- + | - 10 + | ... +box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', false) + | --- + | - ok + | ... + +test_run:cmd('switch default') + | --- + | - true + | ... + +box.cfg{ \ + replication_synchro_quorum = old_synchro_quorum, \ + replication_synchro_timeout = old_synchro_timeout, \ + replication_timeout = old_timeout, \ +} + | --- + | ... +test_run:cmd('stop server replica') + | --- + | - true + | ... +test_run:cmd('delete server replica') + | --- + | - true + | ... + +box.space.sync:drop() + | --- + | ... +box.schema.user.revoke('guest', 'super') + | --- + | ... diff --git a/test/replication/qsync_errinj.test.lua b/test/replication/qsync_errinj.test.lua new file mode 100644 index 000000000..2404b01aa --- /dev/null +++ b/test/replication/qsync_errinj.test.lua @@ -0,0 +1,46 @@ +test_run = require('test_run').new() +engine = test_run:get_cfg('engine') + +old_synchro_quorum = box.cfg.replication_synchro_quorum +old_synchro_timeout = box.cfg.replication_synchro_timeout +old_timeout = box.cfg.replication_timeout +box.schema.user.grant('guest', 'super') + +test_run:cmd('create server replica with rpl_master=default,\ + script="replication/replica.lua"') +test_run:cmd('start server replica with wait=True, wait_load=True') + +_ = box.schema.space.create('sync', {is_sync = true, engine = engine}) +_ = box.space.sync:create_index('pk') + +-- +-- gh-5100: slow ACK sending shouldn't stun replica for the +-- replication timeout seconds. +-- +test_run:cmd('switch default') +box.cfg{replication_timeout = 1000, replication_synchro_quorum = 2, replication_synchro_timeout = 1000} + +test_run:switch('replica') +box.cfg{replication_timeout = 1000} +box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', true) + +test_run:cmd('switch default') +for i = 1, 10 do box.space.sync:replace{i} end +box.space.sync:count() + +test_run:switch('replica') +box.space.sync:count() +box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', false) + +test_run:cmd('switch default') + +box.cfg{ \ + replication_synchro_quorum = old_synchro_quorum, \ + replication_synchro_timeout = old_synchro_timeout, \ + replication_timeout = old_timeout, \ +} +test_run:cmd('stop server replica') +test_run:cmd('delete server replica') + +box.space.sync:drop() +box.schema.user.revoke('guest', 'super') -- 2.21.1 (Apple Git-122.3)
prev parent reply other threads:[~2020-07-05 15:20 UTC|newest] Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-07-05 15:20 [Tarantool-patches] [PATCH 0/3] Another applier ACKs rework Vladislav Shpilevoy 2020-07-05 15:20 ` [Tarantool-patches] [PATCH 1/3] txn: introduce on_wal_write trigger Vladislav Shpilevoy 2020-07-05 15:20 ` [Tarantool-patches] [PATCH 2/3] applier: don't miss WAL writes happened during ACK send Vladislav Shpilevoy 2020-07-05 15:20 ` Vladislav Shpilevoy [this message]
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=6b83ac7e6c7e2606bc3350efda5e165df366e5b5.1593962115.git.v.shpilevoy@tarantool.org \ --to=v.shpilevoy@tarantool.org \ --cc=sergepetrenko@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH 3/3] applier: use WAL write event instead of commit for ACK' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox