[Tarantool-patches] [PATCH v2] net.box: add predefined system events for pub/sub
Yan Shtunder
ya.shtunder at gmail.com
Thu Jan 20 07:06:40 MSK 2022
Adding predefined system event box.status and box.election
Part of #6260
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-for-pub-sub
src/box/box.cc | 69 +++++-
src/box/box.h | 6 +
src/box/mp_error.cc | 6 -
src/box/raft.c | 1 +
src/lib/msgpuck | 2 +-
.../gh_6260_add_builtin_events.lua | 229 ++++++++++++++++++
test/unit/mp_error.cc | 6 -
7 files changed, 299 insertions(+), 20 deletions(-)
create mode 100644 test/app-luatest/gh_6260_add_builtin_events.lua
diff --git a/src/box/box.cc b/src/box/box.cc
index 0413cbf44..dc61dc77b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -97,13 +97,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
const struct vclock *box_vclock = &replicaset.vclock;
@@ -160,6 +153,19 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+box_broadcast_status(void);
+
+static void title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ /* Checking box.info.status change */
+ box_broadcast_status();
+}
+
void
box_update_ro_summary(void)
{
@@ -173,6 +179,9 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ /* Checking box.info.ro change */
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3903,3 +3912,49 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+void
+box_broadcast_status(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election()
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 4);
+
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..2eb1ef437 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,12 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ *
+ */
+void
+box_broadcast_election(void);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
index fba562a84..006b5f2d9 100644
--- a/src/box/mp_error.cc
+++ b/src/box/mp_error.cc
@@ -168,12 +168,6 @@ mp_sizeof_error_one(const struct error *error)
return data_size;
}
-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
- return mp_encode_str(data, str, strlen(str));
-}
-
static char *
mp_encode_error_one(char *data, const struct error *error)
{
diff --git a/src/box/raft.c b/src/box/raft.c
index 1e360dc88..b2d4002fe 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/lib/msgpuck b/src/lib/msgpuck
index 7f95b6fd7..7f343a408 160000
--- a/src/lib/msgpuck
+++ b/src/lib/msgpuck
@@ -1 +1 @@
-Subproject commit 7f95b6fd7a5b928cfcbdab2d78bea88d1685821e
+Subproject commit 7f343a40897cbf070c897886e15eb3d761d322f4
diff --git a/test/app-luatest/gh_6260_add_builtin_events.lua b/test/app-luatest/gh_6260_add_builtin_events.lua
new file mode 100644
index 000000000..4a4e1feff
--- /dev/null
+++ b/test/app-luatest/gh_6260_add_builtin_events.lua
@@ -0,0 +1,229 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local helpers = require('test.luatest_helpers')
+
+local g = t.group('gh_6260')
+
+g.before_test('test_box_status_event', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ read_only = false
+ }
+
+ cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+
+g.after_test('test_box_status_event', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+
+g.test_box_status_event= function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ c:eval([[
+ i = ''
+ box.watch('box.status', function(...) i = {...} end)
+ ]])
+
+ t.assert_equals(c:eval("return i"),
+ {"box.status",
+ {is_ro = cg.master:eval("return box.info.ro"),
+ is_ro_cfg = cg.master:eval("return box.cfg.read_only"),
+ status = cg.master:eval("return box.info.status")}})
+
+ -- next step
+ cg.master:eval(([[
+ box.cfg{
+ replication = {
+ "%s",
+ "%s"
+ },
+ replication_connect_timeout = 0.001,
+ }
+ ]]):format(helpers.instance_uri('master'),
+ helpers.instance_uri('replica')))
+
+ t.assert_equals(c:eval("return i"),
+ {"box.status",
+ {is_ro = cg.master:eval("return box.info.ro"),
+ is_ro_cfg = cg.master:eval("return box.cfg.read_only"),
+ status = cg.master:eval("return box.info.status")}})
+
+ -- next step
+ cg.master:eval(([[
+ box.cfg{
+ replication = {
+ "%s",
+ },
+ }
+ ]]):format(helpers.instance_uri('master')))
+
+ t.assert_equals(c:eval("return i"),
+ {"box.status",
+ {is_ro = cg.master:eval("return box.info.ro"),
+ is_ro_cfg = cg.master:eval("return box.cfg.read_only"),
+ status = cg.master:eval("return box.info.status")}})
+
+ -- next step
+ cg.master:eval("box.cfg{read_only = true}")
+
+ t.assert_equals(c:eval("return i"),
+ {"box.status",
+ {is_ro = cg.master:eval("return box.info.ro"),
+ is_ro_cfg = cg.master:eval("return box.cfg.read_only"),
+ status = cg.master:eval("return box.info.status")}})
+
+
+ c:close()
+end
+
+
+g.before_test('test_box_election_event', function(cg)
+
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ helpers.instance_uri('instance_', 1);
+ helpers.instance_uri('instance_', 2);
+ helpers.instance_uri('instance_', 3);
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'candidate',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.after_test('test_box_election_event', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.test_box_election_event= function(cg)
+ local c1 = net.connect(cg.instance_1.net_box_uri)
+ local c2 = net.connect(cg.instance_2.net_box_uri)
+ local c3 = net.connect(cg.instance_3.net_box_uri)
+
+ c1:eval([[
+ i1 = ''
+ box.watch('box.election', function(...) i1 = {...} end)
+ ]])
+ c2:eval([[
+ i2 = ''
+ box.watch('box.election', function(...) i2 = {...} end)
+ ]])
+ c3:eval([[
+ i3 = ''
+ box.watch('box.election', function(...) i3 = {...} end)
+ ]])
+
+ t.assert_equals(c1:eval("return i1"),
+ {"box.election",
+ {term = cg.instance_1:eval("return box.info.election.term"),
+ leader = cg.instance_1:eval("return box.info.election.leader"),
+ is_ro = cg.instance_1:eval("return box.info.ro"),
+ role = cg.instance_1:eval("return box.info.election.state")}})
+
+ t.assert_equals(c2:eval("return i2"),
+ {"box.election",
+ {term = cg.instance_2:eval("return box.info.election.term"),
+ leader = cg.instance_2:eval("return box.info.election.leader"),
+ is_ro = cg.instance_2:eval("return box.info.ro"),
+ role = cg.instance_2:eval("return box.info.election.state")}})
+
+ t.assert_equals(c3:eval("return i3"),
+ {"box.election",
+ {term = cg.instance_3:eval("return box.info.election.term"),
+ leader = cg.instance_3:eval("return box.info.election.leader"),
+ is_ro = cg.instance_3:eval("return box.info.ro"),
+ role = cg.instance_3:eval("return box.info.election.state")}})
+
+
+ -- next step
+ cg.instance_1:eval("return box.cfg{election_mode='voter'}")
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_equals(c1:eval("return i1"),
+ {"box.election",
+ {term = cg.instance_1:eval("return box.info.election.term"),
+ leader = cg.instance_1:eval("return box.info.election.leader"),
+ is_ro = cg.instance_1:eval("return box.info.ro"),
+ role = cg.instance_1:eval("return box.info.election.state")}})
+
+ t.assert_equals(c2:eval("return i2"),
+ {"box.election",
+ {term = cg.instance_2:eval("return box.info.election.term"),
+ leader = cg.instance_2:eval("return box.info.election.leader"),
+ is_ro = cg.instance_2:eval("return box.info.ro"),
+ role = cg.instance_2:eval("return box.info.election.state")}})
+
+ t.assert_equals(c3:eval("return i3"),
+ {"box.election",
+ {term = cg.instance_3:eval("return box.info.election.term"),
+ leader = cg.instance_3:eval("return box.info.election.leader"),
+ is_ro = cg.instance_3:eval("return box.info.ro"),
+ role = cg.instance_3:eval("return box.info.election.state")}})
+
+
+ -- next step
+ cg.instance_1:eval("return box.cfg{election_mode='candidate'}")
+ cg.instance_2:eval("return box.cfg{election_mode='voter'}")
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_equals(c1:eval("return i1"),
+ {"box.election",
+ {term = cg.instance_1:eval("return box.info.election.term"),
+ leader = cg.instance_1:eval("return box.info.election.leader"),
+ is_ro = cg.instance_1:eval("return box.info.ro"),
+ role = cg.instance_1:eval("return box.info.election.state")}})
+
+ t.assert_equals(c2:eval("return i2"),
+ {"box.election",
+ {term = cg.instance_2:eval("return box.info.election.term"),
+ leader = cg.instance_2:eval("return box.info.election.leader"),
+ is_ro = cg.instance_2:eval("return box.info.ro"),
+ role = cg.instance_2:eval("return box.info.election.state")}})
+
+ t.assert_equals(c3:eval("return i3"),
+ {"box.election",
+ {term = cg.instance_3:eval("return box.info.election.term"),
+ leader = cg.instance_3:eval("return box.info.election.leader"),
+ is_ro = cg.instance_3:eval("return box.info.ro"),
+ role = cg.instance_3:eval("return box.info.election.state")}})
+
+ c1:close()
+ c2:close()
+ c3:close()
+end
diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
index 5e68b34d2..5c5a323cb 100644
--- a/test/unit/mp_error.cc
+++ b/test/unit/mp_error.cc
@@ -93,12 +93,6 @@ enum {
sizeof(standard_errors) / sizeof(standard_errors[0]),
};
-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
- return mp_encode_str(data, str, strlen(str));
-}
-
/** Note, not the same as mp_encode_error(). */
static char *
mp_encode_mp_error(const struct mp_test_error *e, char *data)
--
2.25.1
More information about the Tarantool-patches
mailing list