From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D2E04452566 for ; Thu, 14 Nov 2019 15:56:48 +0300 (MSK) From: Maria Date: Thu, 14 Nov 2019 15:57:05 +0300 Message-Id: <20191114125705.26760-1-maria.khaydich@tarantool.org> MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH] Trigger on vclock change List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, georgy@tarantool.org This patch implements replication.on_vclock trigger that can be useful for programming shard-systems with redundancy. Closes #3808 Issue: https://github.com/tarantool/tarantool/issues/3808 Branch: https://github.com/tarantool/tarantool/tree/eljashm/gh-3808-wait-for-lsn --- src/box/lua/info.c | 24 ++++++++++ src/box/relay.cc | 1 + src/box/replication.cc | 1 + src/box/replication.h | 4 ++ test/replication/misc.result | 87 ++++++++++++++++++++++++++++++++++ test/replication/misc.test.lua | 35 ++++++++++++++ 6 files changed, 152 insertions(+) diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 55382fd77..607c3084e 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -38,6 +38,7 @@ #include "box/applier.h" #include "box/relay.h" +#include "lua/trigger.h" #include "box/iproto.h" #include "box/wal.h" #include "box/replication.h" @@ -146,6 +147,25 @@ lbox_pushrelay(lua_State *L, struct relay *relay) } } +static int +lbox_push_replica_vclock(struct lua_State *L, void *event) +{ + struct replica *replica = (struct replica *) event; + lbox_pushvclock(L, relay_vclock(replica->relay)); + lua_pushinteger(L, replica->id); + return 2; +} + +/** + * Set/Reset/Get replication.on_vclock trigger + */ +static int +lbox_replication_on_vclock(struct lua_State *L) +{ + return lbox_trigger_reset(L, 2, &replicaset.on_vclock, + lbox_push_replica_vclock, NULL); +} + static void lbox_pushreplica(lua_State *L, struct replica *replica) { @@ -191,6 +211,10 @@ lbox_info_replication(struct lua_State *L) lua_setfield(L, -2, "__serialize"); lua_setmetatable(L, -2); + lua_pushstring(L, "on_vclock"); + lua_pushcfunction(L, lbox_replication_on_vclock); + lua_settable(L, -3); + replicaset_foreach(replica) { /* Applier hasn't received replica id yet */ if (replica->id == REPLICA_ID_NIL) diff --git a/src/box/relay.cc b/src/box/relay.cc index 74588cba7..9484b4ea5 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -393,6 +393,7 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + trigger_run(&replicaset.on_vclock, status->relay->replica); static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/replication.cc b/src/box/replication.cc index 6fcc56fe3..e743ec972 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -95,6 +95,7 @@ replication_init(void) vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); rlist_create(&replicaset.applier.on_rollback); rlist_create(&replicaset.applier.on_commit); + rlist_create(&replicaset.on_vclock); diag_create(&replicaset.applier.diag); } diff --git a/src/box/replication.h b/src/box/replication.h index 19f283c7d..c916a5711 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -187,6 +187,10 @@ struct replicaset { * to connect and those that failed to connect. */ struct rlist anon; + /** + * List of triggers invoked on lsn changed. + */ + struct rlist on_vclock; /** * TX thread local vclock reflecting the state * of the cluster as maintained by appliers. diff --git a/test/replication/misc.result b/test/replication/misc.result index ae72ce3e4..6066ff905 100644 --- a/test/replication/misc.result +++ b/test/replication/misc.result @@ -827,3 +827,90 @@ box.cfg{replication_connect_timeout=replication_connect_timeout} box.cfg{replication_connect_quorum=replication_connect_quorum} --- ... +-- +-- gh-3808 Trigger on_vclock_changed added +-- +engine = test_run:get_cfg('engine') +--- +... +test_run:cleanup_cluster() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +_ = box.schema.space.create('test', {engine = engine}) +--- +... +_ = box.space.test:create_index('pk') +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +replication_events = {} +--- +... +-- trigger setup +trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end) +--- +... +for i = 0, 99 do box.space["test"]:insert({i}) end +--- +... +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +--- +- true +... +-- check that the trigger caught the last transaction +replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn +--- +- true +... +-- trigger reset +_ = box.info.replication.on_vclock(nil, trigger) +--- +... +for i = 100, 199 do box.space["test"]:insert({i}) end +--- +... +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +--- +- true +... +-- check that the trigger didn't catch the last transaction +replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn +--- +- true +... +replication_events = nil +--- +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +test_run:cmd("delete server replica") +--- +- true +... +box.space.test:drop() +--- +... +test_run:cleanup_cluster() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua index 16e7e9e42..b8f9c1e9e 100644 --- a/test/replication/misc.test.lua +++ b/test/replication/misc.test.lua @@ -332,3 +332,38 @@ box.cfg{replication=""} box.cfg{replication_connect_timeout=replication_connect_timeout} box.cfg{replication_connect_quorum=replication_connect_quorum} + +-- +-- gh-3808 Trigger on_vclock_changed added +-- +engine = test_run:get_cfg('engine') +test_run:cleanup_cluster() +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test', {engine = engine}) +_ = box.space.test:create_index('pk') +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +replication_events = {} +-- trigger setup +trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end) +for i = 0, 99 do box.space["test"]:insert({i}) end +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +-- check that the trigger caught the last transaction +replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn +-- trigger reset +_ = box.info.replication.on_vclock(nil, trigger) +for i = 100, 199 do box.space["test"]:insert({i}) end +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +-- check that the trigger didn't catch the last transaction +replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn + +replication_events = nil +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +test_run:cmd("delete server replica") +box.space.test:drop() + +test_run:cleanup_cluster() +box.schema.user.revoke('guest', 'replication') -- 2.21.0 (Apple Git-122.2)