[Tarantool-patches] [PATCH] Trigger on vclock change
Maria
maria.khaydich at tarantool.org
Thu Nov 14 15:57:05 MSK 2019
This patch implements replication.on_vclock
trigger that can be useful for programming
shard-systems with redundancy.
Closes #3808
Issue:
https://github.com/tarantool/tarantool/issues/3808
Branch:
https://github.com/tarantool/tarantool/tree/eljashm/gh-3808-wait-for-lsn
---
src/box/lua/info.c | 24 ++++++++++
src/box/relay.cc | 1 +
src/box/replication.cc | 1 +
src/box/replication.h | 4 ++
test/replication/misc.result | 87 ++++++++++++++++++++++++++++++++++
test/replication/misc.test.lua | 35 ++++++++++++++
6 files changed, 152 insertions(+)
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index 55382fd77..607c3084e 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -38,6 +38,7 @@
#include "box/applier.h"
#include "box/relay.h"
+#include "lua/trigger.h"
#include "box/iproto.h"
#include "box/wal.h"
#include "box/replication.h"
@@ -146,6 +147,25 @@ lbox_pushrelay(lua_State *L, struct relay *relay)
}
}
+static int
+lbox_push_replica_vclock(struct lua_State *L, void *event)
+{
+ struct replica *replica = (struct replica *) event;
+ lbox_pushvclock(L, relay_vclock(replica->relay));
+ lua_pushinteger(L, replica->id);
+ return 2;
+}
+
+/**
+ * Set/Reset/Get replication.on_vclock trigger
+ */
+static int
+lbox_replication_on_vclock(struct lua_State *L)
+{
+ return lbox_trigger_reset(L, 2, &replicaset.on_vclock,
+ lbox_push_replica_vclock, NULL);
+}
+
static void
lbox_pushreplica(lua_State *L, struct replica *replica)
{
@@ -191,6 +211,10 @@ lbox_info_replication(struct lua_State *L)
lua_setfield(L, -2, "__serialize");
lua_setmetatable(L, -2);
+ lua_pushstring(L, "on_vclock");
+ lua_pushcfunction(L, lbox_replication_on_vclock);
+ lua_settable(L, -3);
+
replicaset_foreach(replica) {
/* Applier hasn't received replica id yet */
if (replica->id == REPLICA_ID_NIL)
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 74588cba7..9484b4ea5 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -393,6 +393,7 @@ tx_status_update(struct cmsg *msg)
{
struct relay_status_msg *status = (struct relay_status_msg *)msg;
vclock_copy(&status->relay->tx.vclock, &status->vclock);
+ trigger_run(&replicaset.on_vclock, status->relay->replica);
static const struct cmsg_hop route[] = {
{relay_status_update, NULL}
};
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 6fcc56fe3..e743ec972 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -95,6 +95,7 @@ replication_init(void)
vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
rlist_create(&replicaset.applier.on_rollback);
rlist_create(&replicaset.applier.on_commit);
+ rlist_create(&replicaset.on_vclock);
diag_create(&replicaset.applier.diag);
}
diff --git a/src/box/replication.h b/src/box/replication.h
index 19f283c7d..c916a5711 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -187,6 +187,10 @@ struct replicaset {
* to connect and those that failed to connect.
*/
struct rlist anon;
+ /**
+ * List of triggers invoked on lsn changed.
+ */
+ struct rlist on_vclock;
/**
* TX thread local vclock reflecting the state
* of the cluster as maintained by appliers.
diff --git a/test/replication/misc.result b/test/replication/misc.result
index ae72ce3e4..6066ff905 100644
--- a/test/replication/misc.result
+++ b/test/replication/misc.result
@@ -827,3 +827,90 @@ box.cfg{replication_connect_timeout=replication_connect_timeout}
box.cfg{replication_connect_quorum=replication_connect_quorum}
---
...
+--
+-- gh-3808 Trigger on_vclock_changed added
+--
+engine = test_run:get_cfg('engine')
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+_ = box.schema.space.create('test', {engine = engine})
+---
+...
+_ = box.space.test:create_index('pk')
+---
+...
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+---
+- true
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+replication_events = {}
+---
+...
+-- trigger setup
+trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end)
+---
+...
+for i = 0, 99 do box.space["test"]:insert({i}) end
+---
+...
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+---
+- true
+...
+-- check that the trigger caught the last transaction
+replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn
+---
+- true
+...
+-- trigger reset
+_ = box.info.replication.on_vclock(nil, trigger)
+---
+...
+for i = 100, 199 do box.space["test"]:insert({i}) end
+---
+...
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+---
+- true
+...
+-- check that the trigger didn't catch the last transaction
+replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn
+---
+- true
+...
+replication_events = nil
+---
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
+test_run:cmd("delete server replica")
+---
+- true
+...
+box.space.test:drop()
+---
+...
+test_run:cleanup_cluster()
+---
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua
index 16e7e9e42..b8f9c1e9e 100644
--- a/test/replication/misc.test.lua
+++ b/test/replication/misc.test.lua
@@ -332,3 +332,38 @@ box.cfg{replication=""}
box.cfg{replication_connect_timeout=replication_connect_timeout}
box.cfg{replication_connect_quorum=replication_connect_quorum}
+
+--
+-- gh-3808 Trigger on_vclock_changed added
+--
+engine = test_run:get_cfg('engine')
+test_run:cleanup_cluster()
+box.schema.user.grant('guest', 'replication')
+_ = box.schema.space.create('test', {engine = engine})
+_ = box.space.test:create_index('pk')
+test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'")
+test_run:cmd("start server replica")
+replication_events = {}
+-- trigger setup
+trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end)
+for i = 0, 99 do box.space["test"]:insert({i}) end
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+-- check that the trigger caught the last transaction
+replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn
+-- trigger reset
+_ = box.info.replication.on_vclock(nil, trigger)
+for i = 100, 199 do box.space["test"]:insert({i}) end
+-- wait until replica catches up the local one
+test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end)
+-- check that the trigger didn't catch the last transaction
+replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn
+
+replication_events = nil
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
+test_run:cmd("delete server replica")
+box.space.test:drop()
+
+test_run:cleanup_cluster()
+box.schema.user.revoke('guest', 'replication')
--
2.21.0 (Apple Git-122.2)
More information about the Tarantool-patches
mailing list