[Tarantool-patches] [PATCH 3/3] applier: use WAL write event instead of commit for ACK

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sun Jul 5 18:20:16 MSK 2020


Applier used to send ACKs to master when commit happens. But for
sync transactions this is not enough - their ACK should be sent
after WAL write. Master doesn't really care whether a commit
will happen after WAL write on the replica. The only thing which
matters is whether the replica managed to persist the sync
transaction.

Now applier uses WAL write event instead of commit to send ACKs.
Nothing changed for async transactions (for them WAL write ==
commit). But sync transactions now send ACKs immediately, without
waiting for heartbeat timeout.

Closes #5100
Closes #5127
---
 src/box/applier.cc                     |  36 ++++----
 src/box/replication.cc                 |   2 +-
 src/box/replication.h                  |   4 +-
 src/lib/core/errinj.h                  |   1 +
 test/replication/qsync_basic.result    |  59 ++++++++++++-
 test/replication/qsync_basic.test.lua  |  32 ++++++-
 test/replication/qsync_errinj.result   | 114 +++++++++++++++++++++++++
 test/replication/qsync_errinj.test.lua |  46 ++++++++++
 8 files changed, 269 insertions(+), 25 deletions(-)
 create mode 100644 test/replication/qsync_errinj.result
 create mode 100644 test/replication/qsync_errinj.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index fccde877d..d82613b56 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -45,6 +45,7 @@
 #include "trigger.h"
 #include "xrow_io.h"
 #include "error.h"
+#include "errinj.h"
 #include "session.h"
 #include "cfg.h"
 #include "schema.h"
@@ -179,6 +180,9 @@ applier_writer_f(va_list ap)
 			struct xrow_header xrow;
 			xrow_encode_vclock(&xrow, &replicaset.vclock);
 			coio_write_xrow(&io, &xrow);
+			ERROR_INJECT(ERRINJ_APPLIER_SLOW_ACK, {
+				fiber_sleep(0.01);
+			});
 			/*
 			 * Even if new ACK is requested during the
 			 * write, don't send it again right away.
@@ -807,11 +811,11 @@ applier_txn_rollback_cb(struct trigger *trigger, void *event)
 }
 
 static int
-applier_txn_commit_cb(struct trigger *trigger, void *event)
+applier_txn_wal_write_cb(struct trigger *trigger, void *event)
 {
 	(void) trigger;
 	/* Broadcast the commit event across all appliers. */
-	trigger_run(&replicaset.applier.on_commit, event);
+	trigger_run(&replicaset.applier.on_wal_write, event);
 	return 0;
 }
 
@@ -896,23 +900,23 @@ applier_apply_tx(struct stailq *rows)
 	}
 
 	/* We are ready to submit txn to wal. */
-	struct trigger *on_rollback, *on_commit;
+	struct trigger *on_rollback, *on_wal_write;
 	size_t size;
 	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
 					  &size);
-	on_commit = region_alloc_object(&txn->region, typeof(*on_commit),
-					&size);
-	if (on_rollback == NULL || on_commit == NULL) {
+	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
+					   &size);
+	if (on_rollback == NULL || on_wal_write == NULL) {
 		diag_set(OutOfMemory, size, "region_alloc_object",
-			 "on_rollback/on_commit");
+			 "on_rollback/on_wal_write");
 		goto rollback;
 	}
 
 	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
 	txn_on_rollback(txn, on_rollback);
 
-	trigger_create(on_commit, applier_txn_commit_cb, NULL, NULL);
-	txn_on_commit(txn, on_commit);
+	trigger_create(on_wal_write, applier_txn_wal_write_cb, NULL, NULL);
+	txn_on_wal_write(txn, on_wal_write);
 
 	if (txn_commit_async(txn) < 0)
 		goto fail;
@@ -949,10 +953,10 @@ applier_signal_ack(struct applier *applier)
 }
 
 /*
- * A trigger to update an applier state after a replication commit.
+ * A trigger to update an applier state after WAL write.
  */
 static int
-applier_on_commit(struct trigger *trigger, void *event)
+applier_on_wal_write(struct trigger *trigger, void *event)
 {
 	(void) event;
 	struct applier *applier = (struct applier *)trigger->data;
@@ -1085,17 +1089,17 @@ applier_subscribe(struct applier *applier)
 
 	applier->lag = TIMEOUT_INFINITY;
 
-	/* Register triggers to handle replication commits and rollbacks. */
-	struct trigger on_commit;
-	trigger_create(&on_commit, applier_on_commit, applier, NULL);
-	trigger_add(&replicaset.applier.on_commit, &on_commit);
+	/* Register triggers to handle WAL writes and rollbacks. */
+	struct trigger on_wal_write;
+	trigger_create(&on_wal_write, applier_on_wal_write, applier, NULL);
+	trigger_add(&replicaset.applier.on_wal_write, &on_wal_write);
 
 	struct trigger on_rollback;
 	trigger_create(&on_rollback, applier_on_rollback, applier, NULL);
 	trigger_add(&replicaset.applier.on_rollback, &on_rollback);
 
 	auto trigger_guard = make_scoped_guard([&] {
-		trigger_clear(&on_commit);
+		trigger_clear(&on_wal_write);
 		trigger_clear(&on_rollback);
 	});
 
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 01e9e876a..ef0e2411d 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -96,7 +96,7 @@ replication_init(void)
 	vclock_create(&replicaset.applier.vclock);
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
 	rlist_create(&replicaset.applier.on_rollback);
-	rlist_create(&replicaset.applier.on_commit);
+	rlist_create(&replicaset.applier.on_wal_write);
 
 	diag_create(&replicaset.applier.diag);
 }
diff --git a/src/box/replication.h b/src/box/replication.h
index a081870f9..ddc2bddf4 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -259,8 +259,8 @@ struct replicaset {
 		struct vclock vclock;
 		/* Trigger to fire when a replication request failed to apply. */
 		struct rlist on_rollback;
-		/* Trigget to fire a replication request commited to a wal. */
-		struct rlist on_commit;
+		/* Trigget to fire a replication request when WAL write happens. */
+		struct rlist on_wal_write;
 		/* Shared applier diagnostic area. */
 		struct diag diag;
 	} applier;
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 261ab22a8..76b453003 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -146,6 +146,7 @@ struct errinj {
 	_(ERRINJ_VY_RUN_OPEN, ERRINJ_INT, {.iparam = -1})\
 	_(ERRINJ_AUTO_UPGRADE, ERRINJ_BOOL, {.bparam = false})\
 	_(ERRINJ_COIO_WRITE_CHUNK, ERRINJ_BOOL, {.bparam = false}) \
+	_(ERRINJ_APPLIER_SLOW_ACK, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/replication/qsync_basic.result b/test/replication/qsync_basic.result
index 4b9823d77..8b37ba6f5 100644
--- a/test/replication/qsync_basic.result
+++ b/test/replication/qsync_basic.result
@@ -90,10 +90,10 @@ box.schema.user.grant('guest', 'replication')
  | ---
  | ...
 -- Set up synchronous replication options.
-quorum = box.cfg.replication_synchro_quorum
+old_synchro_quorum = box.cfg.replication_synchro_quorum
  | ---
  | ...
-timeout = box.cfg.replication_synchro_timeout
+old_synchro_timeout = box.cfg.replication_synchro_timeout
  | ---
  | ...
 box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
@@ -178,13 +178,63 @@ box.space.sync:select{}
  |   - [3]
  | ...
 
+--
+-- gh-5100: replica should send ACKs for sync transactions after
+-- WAL write immediately, not waiting for replication timeout or
+-- a CONFIRM.
+--
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+old_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+-- Commit something non-sync. So as applier writer fiber would
+-- flush the pending heartbeat and go to sleep with the new huge
+-- replication timeout.
+s = box.schema.create_space('test')
+ | ---
+ | ...
+pk = s:create_index('pk')
+ | ---
+ | ...
+s:replace{1}
+ | ---
+ | - [1]
+ | ...
+-- Now commit something sync. It should return immediately even
+-- though the replication timeout is huge.
+box.space.sync:replace{4}
+ | ---
+ | - [4]
+ | ...
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:select{4}
+ | ---
+ | - - [4]
+ | ...
+
 -- Cleanup.
 test_run:cmd('switch default')
  | ---
  | - true
  | ...
 
-box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
+box.cfg{                                                                        \
+    replication_synchro_quorum = old_synchro_quorum,                            \
+    replication_synchro_timeout = old_synchro_timeout,                          \
+    replication_timeout = old_timeout,                                          \
+}
  | ---
  | ...
 test_run:cmd('stop server replica')
@@ -195,6 +245,9 @@ test_run:cmd('delete server replica')
  | ---
  | - true
  | ...
+box.space.test:drop()
+ | ---
+ | ...
 box.space.sync:drop()
  | ---
  | ...
diff --git a/test/replication/qsync_basic.test.lua b/test/replication/qsync_basic.test.lua
index 8715a4600..b0326fd4b 100644
--- a/test/replication/qsync_basic.test.lua
+++ b/test/replication/qsync_basic.test.lua
@@ -38,8 +38,8 @@ engine = test_run:get_cfg('engine')
 
 box.schema.user.grant('guest', 'replication')
 -- Set up synchronous replication options.
-quorum = box.cfg.replication_synchro_quorum
-timeout = box.cfg.replication_synchro_timeout
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
 box.cfg{replication_synchro_quorum=2, replication_synchro_timeout=0.1}
 
 test_run:cmd('create server replica with rpl_master=default,\
@@ -71,11 +71,37 @@ box.space.sync:select{}
 test_run:cmd('restart server replica')
 box.space.sync:select{}
 
+--
+-- gh-5100: replica should send ACKs for sync transactions after
+-- WAL write immediately, not waiting for replication timeout or
+-- a CONFIRM.
+--
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+test_run:switch('default')
+old_timeout = box.cfg.replication_timeout
+box.cfg{replication_timeout = 1000, replication_synchro_timeout = 1000}
+-- Commit something non-sync. So as applier writer fiber would
+-- flush the pending heartbeat and go to sleep with the new huge
+-- replication timeout.
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+-- Now commit something sync. It should return immediately even
+-- though the replication timeout is huge.
+box.space.sync:replace{4}
+test_run:switch('replica')
+box.space.sync:select{4}
+
 -- Cleanup.
 test_run:cmd('switch default')
 
-box.cfg{replication_synchro_quorum=quorum, replication_synchro_timeout=timeout}
+box.cfg{                                                                        \
+    replication_synchro_quorum = old_synchro_quorum,                            \
+    replication_synchro_timeout = old_synchro_timeout,                          \
+    replication_timeout = old_timeout,                                          \
+}
 test_run:cmd('stop server replica')
 test_run:cmd('delete server replica')
+box.space.test:drop()
 box.space.sync:drop()
 box.schema.user.revoke('guest', 'replication')
diff --git a/test/replication/qsync_errinj.result b/test/replication/qsync_errinj.result
new file mode 100644
index 000000000..cd7f2bc8d
--- /dev/null
+++ b/test/replication/qsync_errinj.result
@@ -0,0 +1,114 @@
+-- test-run result file version 2
+test_run = require('test_run').new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+ | ---
+ | ...
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+ | ---
+ | ...
+old_timeout = box.cfg.replication_timeout
+ | ---
+ | ...
+box.schema.user.grant('guest', 'super')
+ | ---
+ | ...
+
+test_run:cmd('create server replica with rpl_master=default,\
+             script="replication/replica.lua"')
+ | ---
+ | - true
+ | ...
+test_run:cmd('start server replica with wait=True, wait_load=True')
+ | ---
+ | - true
+ | ...
+
+_ = box.schema.space.create('sync', {is_sync = true, engine = engine})
+ | ---
+ | ...
+_ = box.space.sync:create_index('pk')
+ | ---
+ | ...
+
+--
+-- gh-5100: slow ACK sending shouldn't stun replica for the
+-- replication timeout seconds.
+--
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_timeout = 1000, replication_synchro_quorum = 2, replication_synchro_timeout = 1000}
+ | ---
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.cfg{replication_timeout = 1000}
+ | ---
+ | ...
+box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', true)
+ | ---
+ | - ok
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+for i = 1, 10 do box.space.sync:replace{i} end
+ | ---
+ | ...
+box.space.sync:count()
+ | ---
+ | - 10
+ | ...
+
+test_run:switch('replica')
+ | ---
+ | - true
+ | ...
+box.space.sync:count()
+ | ---
+ | - 10
+ | ...
+box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', false)
+ | ---
+ | - ok
+ | ...
+
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+
+box.cfg{                                                                        \
+    replication_synchro_quorum = old_synchro_quorum,                            \
+    replication_synchro_timeout = old_synchro_timeout,                          \
+    replication_timeout = old_timeout,                                          \
+}
+ | ---
+ | ...
+test_run:cmd('stop server replica')
+ | ---
+ | - true
+ | ...
+test_run:cmd('delete server replica')
+ | ---
+ | - true
+ | ...
+
+box.space.sync:drop()
+ | ---
+ | ...
+box.schema.user.revoke('guest', 'super')
+ | ---
+ | ...
diff --git a/test/replication/qsync_errinj.test.lua b/test/replication/qsync_errinj.test.lua
new file mode 100644
index 000000000..2404b01aa
--- /dev/null
+++ b/test/replication/qsync_errinj.test.lua
@@ -0,0 +1,46 @@
+test_run = require('test_run').new()
+engine = test_run:get_cfg('engine')
+
+old_synchro_quorum = box.cfg.replication_synchro_quorum
+old_synchro_timeout = box.cfg.replication_synchro_timeout
+old_timeout = box.cfg.replication_timeout
+box.schema.user.grant('guest', 'super')
+
+test_run:cmd('create server replica with rpl_master=default,\
+             script="replication/replica.lua"')
+test_run:cmd('start server replica with wait=True, wait_load=True')
+
+_ = box.schema.space.create('sync', {is_sync = true, engine = engine})
+_ = box.space.sync:create_index('pk')
+
+--
+-- gh-5100: slow ACK sending shouldn't stun replica for the
+-- replication timeout seconds.
+--
+test_run:cmd('switch default')
+box.cfg{replication_timeout = 1000, replication_synchro_quorum = 2, replication_synchro_timeout = 1000}
+
+test_run:switch('replica')
+box.cfg{replication_timeout = 1000}
+box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', true)
+
+test_run:cmd('switch default')
+for i = 1, 10 do box.space.sync:replace{i} end
+box.space.sync:count()
+
+test_run:switch('replica')
+box.space.sync:count()
+box.error.injection.set('ERRINJ_APPLIER_SLOW_ACK', false)
+
+test_run:cmd('switch default')
+
+box.cfg{                                                                        \
+    replication_synchro_quorum = old_synchro_quorum,                            \
+    replication_synchro_timeout = old_synchro_timeout,                          \
+    replication_timeout = old_timeout,                                          \
+}
+test_run:cmd('stop server replica')
+test_run:cmd('delete server replica')
+
+box.space.sync:drop()
+box.schema.user.revoke('guest', 'super')
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list