[Tarantool-patches] [PATCH] Trigger on vclock change

Maria maria.khaydich at tarantool.org
Thu Nov 14 15:57:05 MSK 2019


This patch implements replication.on_vclock
trigger that can be  useful for programming
shard-systems with redundancy.

Closes #3808
Issue:
https://github.com/tarantool/tarantool/issues/3808
Branch:
https://github.com/tarantool/tarantool/tree/eljashm/gh-3808-wait-for-lsn
---
 src/box/lua/info.c             | 24 ++++++++++
 src/box/relay.cc               |  1 +
 src/box/replication.cc         |  1 +
 src/box/replication.h          |  4 ++
 test/replication/misc.result   | 87 ++++++++++++++++++++++++++++++++++
 test/replication/misc.test.lua | 35 ++++++++++++++
 6 files changed, 152 insertions(+)

diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 55382fd77..607c3084e 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -38,6 +38,7 @@
 
 #include "box/applier.h"
 #include "box/relay.h"
+#include "lua/trigger.h"
 #include "box/iproto.h"
 #include "box/wal.h"
 #include "box/replication.h"
@@ -146,6 +147,25 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
 	}
 }
 
+static int
+lbox_push_replica_vclock(struct lua_State *L, void *event)
+{
+	struct replica *replica = (struct replica *) event;
+	lbox_pushvclock(L, relay_vclock(replica->relay));
+	lua_pushinteger(L, replica->id);
+	return 2;
+}
+
+/**
+ * Set/Reset/Get replication.on_vclock trigger
+ */
+static int
+lbox_replication_on_vclock(struct lua_State *L)
+{
+	return lbox_trigger_reset(L, 2, &replicaset.on_vclock,
+      lbox_push_replica_vclock, NULL);
+}
+
 static void
 lbox_pushreplica(lua_State *L, struct replica *replica)
 {
@@ -191,6 +211,10 @@ lbox_info_replication(struct lua_State *L)
 	lua_setfield(L, -2, "__serialize");
 	lua_setmetatable(L, -2);
 
+	lua_pushstring(L, "on_vclock");
+	lua_pushcfunction(L, lbox_replication_on_vclock);
+	lua_settable(L, -3);
+
 	replicaset_foreach(replica) {
 		/* Applier hasn't received replica id yet */
 		if (replica->id == REPLICA_ID_NIL)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74588cba7..9484b4ea5 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -393,6 +393,7 @@ tx_status_update(struct cmsg *msg)
 {
 	struct relay_status_msg *status = (struct relay_status_msg *)msg;
 	vclock_copy(&status->relay->tx.vclock, &status->vclock);
+	trigger_run(&replicaset.on_vclock, status->relay->replica);
 	static const struct cmsg_hop route[] = {
 		{relay_status_update, NULL}
 	};
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 6fcc56fe3..e743ec972 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -95,6 +95,7 @@ replication_init(void)
 	vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
 	rlist_create(&replicaset.applier.on_rollback);
 	rlist_create(&replicaset.applier.on_commit);
+	rlist_create(&replicaset.on_vclock);
 
 	diag_create(&replicaset.applier.diag);
 }
diff --git a/src/box/replication.h b/src/box/replication.h
index 19f283c7d..c916a5711 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -187,6 +187,10 @@ struct replicaset {
 	 * to connect and those that failed to connect.
 	 */
 	struct rlist anon;
+	/**
+	 * List of triggers invoked on lsn changed.
+	 */
+	struct rlist on_vclock;
 	/**
 	 * TX thread local vclock reflecting the state
 	 * of the cluster as maintained by appliers.
diff --git a/test/replication/misc.result b/test/replication/misc.result
index ae72ce3e4..6066ff905 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -827,3 +827,90 @@ box.cfg{replication_connect_timeout=replication_connect_timeout}
 box.cfg{replication_connect_quorum=replication_connect_quorum}
 ---
 ...
+--
+-- gh-3808 Trigger on_vclock_changed added
+--
+engine = test_run:get_cfg('engine')
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+replication_events = {}
+---
+...
+-- trigger setup
+trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end)
+---
+...
+for i = 0, 99 do box.space["test"]:insert({i}) end
+---
+...
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+---
+- true
+...
+-- check that the trigger caught the last transaction
+replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn
+---
+- true
+...
+-- trigger reset
+_ = box.info.replication.on_vclock(nil, trigger)
+---
+...
+for i = 100, 199 do box.space["test"]:insert({i}) end
+---
+...
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+---
+- true
+...
+-- check that the trigger didn't catch the last transaction
+replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn
+---
+- true
+...
+replication_events = nil
+---
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cmd("delete server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index 16e7e9e42..b8f9c1e9e 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -332,3 +332,38 @@ box.cfg{replication=""}
 
 box.cfg{replication_connect_timeout=replication_connect_timeout}
 box.cfg{replication_connect_quorum=replication_connect_quorum}
+
+--
+-- gh-3808 Trigger on_vclock_changed added
+--
+engine = test_run:get_cfg('engine')
+test_run:cleanup_cluster()
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', {engine = engine})
+_ = box.space.test:create_index('pk')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+replication_events = {}
+-- trigger setup
+trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end)
+for i = 0, 99 do box.space["test"]:insert({i}) end
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+-- check that the trigger caught the last transaction
+replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn
+-- trigger reset
+_ = box.info.replication.on_vclock(nil, trigger)
+for i = 100, 199 do box.space["test"]:insert({i}) end
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+-- check that the trigger didn't catch the last transaction
+replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn
+
+replication_events = nil
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+box.space.test:drop()
+
+test_run:cleanup_cluster()
+box.schema.user.revoke('guest', 'replication')
-- 
2.21.0 (Apple Git-122.2)



More information about the Tarantool-patches mailing list