From: Maria <maria.khaydich@tarantool.org> To: tarantool-patches@dev.tarantool.org, georgy@tarantool.org Subject: [Tarantool-patches] [PATCH] Trigger on vclock change Date: Thu, 14 Nov 2019 15:57:05 +0300 [thread overview] Message-ID: <20191114125705.26760-1-maria.khaydich@tarantool.org> (raw) This patch implements replication.on_vclock trigger that can be useful for programming shard-systems with redundancy. Closes #3808 Issue: https://github.com/tarantool/tarantool/issues/3808 Branch: https://github.com/tarantool/tarantool/tree/eljashm/gh-3808-wait-for-lsn --- src/box/lua/info.c | 24 ++++++++++ src/box/relay.cc | 1 + src/box/replication.cc | 1 + src/box/replication.h | 4 ++ test/replication/misc.result | 87 ++++++++++++++++++++++++++++++++++ test/replication/misc.test.lua | 35 ++++++++++++++ 6 files changed, 152 insertions(+) diff --git a/src/box/lua/info.c b/src/box/lua/info.c index 55382fd77..607c3084e 100644 --- a/src/box/lua/info.c +++ b/src/box/lua/info.c @@ -38,6 +38,7 @@ #include "box/applier.h" #include "box/relay.h" +#include "lua/trigger.h" #include "box/iproto.h" #include "box/wal.h" #include "box/replication.h" @@ -146,6 +147,25 @@ lbox_pushrelay(lua_State *L, struct relay *relay) } } +static int +lbox_push_replica_vclock(struct lua_State *L, void *event) +{ + struct replica *replica = (struct replica *) event; + lbox_pushvclock(L, relay_vclock(replica->relay)); + lua_pushinteger(L, replica->id); + return 2; +} + +/** + * Set/Reset/Get replication.on_vclock trigger + */ +static int +lbox_replication_on_vclock(struct lua_State *L) +{ + return lbox_trigger_reset(L, 2, &replicaset.on_vclock, + lbox_push_replica_vclock, NULL); +} + static void lbox_pushreplica(lua_State *L, struct replica *replica) { @@ -191,6 +211,10 @@ lbox_info_replication(struct lua_State *L) lua_setfield(L, -2, "__serialize"); lua_setmetatable(L, -2); + lua_pushstring(L, "on_vclock"); + lua_pushcfunction(L, lbox_replication_on_vclock); + lua_settable(L, -3); + replicaset_foreach(replica) { /* Applier hasn't received replica id yet */ if (replica->id == REPLICA_ID_NIL) diff --git a/src/box/relay.cc b/src/box/relay.cc index 74588cba7..9484b4ea5 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -393,6 +393,7 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); + trigger_run(&replicaset.on_vclock, status->relay->replica); static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; diff --git a/src/box/replication.cc b/src/box/replication.cc index 6fcc56fe3..e743ec972 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -95,6 +95,7 @@ replication_init(void) vclock_copy(&replicaset.applier.vclock, &replicaset.vclock); rlist_create(&replicaset.applier.on_rollback); rlist_create(&replicaset.applier.on_commit); + rlist_create(&replicaset.on_vclock); diag_create(&replicaset.applier.diag); } diff --git a/src/box/replication.h b/src/box/replication.h index 19f283c7d..c916a5711 100644 --- a/src/box/replication.h +++ b/src/box/replication.h @@ -187,6 +187,10 @@ struct replicaset { * to connect and those that failed to connect. */ struct rlist anon; + /** + * List of triggers invoked on lsn changed. + */ + struct rlist on_vclock; /** * TX thread local vclock reflecting the state * of the cluster as maintained by appliers. diff --git a/test/replication/misc.result b/test/replication/misc.result index ae72ce3e4..6066ff905 100644 --- a/test/replication/misc.result +++ b/test/replication/misc.result @@ -827,3 +827,90 @@ box.cfg{replication_connect_timeout=replication_connect_timeout} box.cfg{replication_connect_quorum=replication_connect_quorum} --- ... +-- +-- gh-3808 Trigger on_vclock_changed added +-- +engine = test_run:get_cfg('engine') +--- +... +test_run:cleanup_cluster() +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +_ = box.schema.space.create('test', {engine = engine}) +--- +... +_ = box.space.test:create_index('pk') +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +replication_events = {} +--- +... +-- trigger setup +trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end) +--- +... +for i = 0, 99 do box.space["test"]:insert({i}) end +--- +... +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +--- +- true +... +-- check that the trigger caught the last transaction +replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn +--- +- true +... +-- trigger reset +_ = box.info.replication.on_vclock(nil, trigger) +--- +... +for i = 100, 199 do box.space["test"]:insert({i}) end +--- +... +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +--- +- true +... +-- check that the trigger didn't catch the last transaction +replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn +--- +- true +... +replication_events = nil +--- +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +test_run:cmd("delete server replica") +--- +- true +... +box.space.test:drop() +--- +... +test_run:cleanup_cluster() +--- +... +box.schema.user.revoke('guest', 'replication') +--- +... diff --git a/test/replication/misc.test.lua b/test/replication/misc.test.lua index 16e7e9e42..b8f9c1e9e 100644 --- a/test/replication/misc.test.lua +++ b/test/replication/misc.test.lua @@ -332,3 +332,38 @@ box.cfg{replication=""} box.cfg{replication_connect_timeout=replication_connect_timeout} box.cfg{replication_connect_quorum=replication_connect_quorum} + +-- +-- gh-3808 Trigger on_vclock_changed added +-- +engine = test_run:get_cfg('engine') +test_run:cleanup_cluster() +box.schema.user.grant('guest', 'replication') +_ = box.schema.space.create('test', {engine = engine}) +_ = box.space.test:create_index('pk') +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") +replication_events = {} +-- trigger setup +trigger = box.info.replication.on_vclock(function(vclock, replica_id) replication_events[replica_id] = vclock end) +for i = 0, 99 do box.space["test"]:insert({i}) end +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +-- check that the trigger caught the last transaction +replication_events[box.info.replication[2].id][box.info.id] == box.info.lsn +-- trigger reset +_ = box.info.replication.on_vclock(nil, trigger) +for i = 100, 199 do box.space["test"]:insert({i}) end +-- wait until replica catches up the local one +test_run:wait_cond(function() return box.info.replication[2].downstream.vclock ~= box.info.vclock end) +-- check that the trigger didn't catch the last transaction +replication_events[box.info.replication[2].id][box.info.id] < box.info.lsn + +replication_events = nil +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +test_run:cmd("delete server replica") +box.space.test:drop() + +test_run:cleanup_cluster() +box.schema.user.revoke('guest', 'replication') -- 2.21.0 (Apple Git-122.2)
next reply other threads:[~2019-11-14 12:56 UTC|newest] Thread overview: 21+ messages / expand[flat|nested] mbox.gz Atom feed top 2019-11-14 12:57 Maria [this message] 2019-11-14 13:44 ` Konstantin Osipov 2019-11-14 14:06 ` Georgy Kirichenko 2019-11-14 15:26 ` Konstantin Osipov 2019-11-14 17:13 ` Georgy Kirichenko 2019-11-14 17:33 ` Konstantin Osipov 2019-11-14 19:16 ` Georgy Kirichenko 2019-11-14 19:48 ` Konstantin Osipov 2019-11-14 20:01 ` Georgy Kirichenko 2019-11-15 1:57 ` Konstantin Osipov 2019-11-15 6:02 ` Georgy Kirichenko 2019-11-15 13:57 ` Konstantin Osipov 2019-11-15 19:57 ` Georgy Kirichenko 2019-11-16 10:37 ` Konstantin Osipov 2019-11-16 20:43 ` Georgy Kirichenko 2019-11-16 11:56 ` Konstantin Osipov 2019-11-16 20:34 ` Georgy Kirichenko 2019-11-18 9:31 ` Konstantin Osipov 2020-06-02 12:22 ` Maria Khaydich 2020-06-03 10:12 ` Sergey Ostanevich 2020-06-03 12:08 ` Alexander Turenko
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20191114125705.26760-1-maria.khaydich@tarantool.org \ --to=maria.khaydich@tarantool.org \ --cc=georgy@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH] Trigger on vclock change' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox