From: Yan Shtunder via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org Cc: tarantool-patches@dev.tarantool.org, Yan Shtunder <ya.shtunder@gmail.com> Subject: [Tarantool-patches] [PATCH v2] net.box: add predefined system events for pub/sub Date: Thu, 20 Jan 2022 07:06:40 +0300 [thread overview] Message-ID: <20220120040640.114751-1-ya.shtunder@gmail.com> (raw) Adding predefined system event box.status and box.election Part of #6260 --- Issue: https://github.com/tarantool/tarantool/issues/6260 Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-for-pub-sub src/box/box.cc | 69 +++++- src/box/box.h | 6 + src/box/mp_error.cc | 6 - src/box/raft.c | 1 + src/lib/msgpuck | 2 +- .../gh_6260_add_builtin_events.lua | 229 ++++++++++++++++++ test/unit/mp_error.cc | 6 - 7 files changed, 299 insertions(+), 20 deletions(-) create mode 100644 test/app-luatest/gh_6260_add_builtin_events.lua diff --git a/src/box/box.cc b/src/box/box.cc index 0413cbf44..dc61dc77b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -97,13 +97,6 @@ double txn_timeout_default; struct rlist box_on_shutdown_trigger_list = RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list); -static void title(const char *new_status) -{ - snprintf(status, sizeof(status), "%s", new_status); - title_set_status(new_status); - title_update(); - systemd_snotify("STATUS=%s", status); -} const struct vclock *box_vclock = &replicaset.vclock; @@ -160,6 +153,19 @@ static struct fiber_pool tx_fiber_pool; */ static struct cbus_endpoint tx_prio_endpoint; +static void +box_broadcast_status(void); + +static void title(const char *new_status) +{ + snprintf(status, sizeof(status), "%s", new_status); + title_set_status(new_status); + title_update(); + systemd_snotify("STATUS=%s", status); + /* Checking box.info.status change */ + box_broadcast_status(); +} + void box_update_ro_summary(void) { @@ -173,6 +179,9 @@ box_update_ro_summary(void) if (is_ro_summary) engine_switch_to_ro(); fiber_cond_broadcast(&ro_cond); + /* Checking box.info.ro change */ + box_broadcast_status(); + box_broadcast_election(); } const char * @@ -3903,3 +3912,49 @@ box_reset_stat(void) engine_reset_stat(); space_foreach(box_reset_space_stat, NULL); } + +void +box_broadcast_status(void) +{ + char buf[1024]; + char *w = buf; + + w = mp_encode_map(w, 3); + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + w = mp_encode_str0(w, "is_ro_cfg"); + w = mp_encode_bool(w, cfg_geti("read_only")); + w = mp_encode_str0(w, "status"); + w = mp_encode_str0(w, box_status()); + + box_broadcast("box.status", strlen("box.status"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +void +box_broadcast_election() +{ + struct raft *raft = box_raft(); + + char buf[1024]; + char *w = buf; + + w = mp_encode_map(w, 4); + + w = mp_encode_str0(w, "term"); + w = mp_encode_uint(w, raft->term); + + w = mp_encode_str0(w, "role"); + w = mp_encode_str0(w, raft_state_str(raft->state)); + + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + + w = mp_encode_str0(w, "leader"); + w = mp_encode_uint(w, raft->leader); + + box_broadcast("box.election", strlen("box.election"), buf, w); + + assert((size_t)(w - buf) < 1024); +} diff --git a/src/box/box.h b/src/box/box.h index b594f6646..2eb1ef437 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -563,6 +563,12 @@ box_process_rw(struct request *request, struct space *space, int boxk(int type, uint32_t space_id, const char *format, ...); +/** + * + */ +void +box_broadcast_election(void); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc index fba562a84..006b5f2d9 100644 --- a/src/box/mp_error.cc +++ b/src/box/mp_error.cc @@ -168,12 +168,6 @@ mp_sizeof_error_one(const struct error *error) return data_size; } -static inline char * -mp_encode_str0(char *data, const char *str) -{ - return mp_encode_str(data, str, strlen(str)); -} - static char * mp_encode_error_one(char *data, const struct error *error) { diff --git a/src/box/raft.c b/src/box/raft.c index 1e360dc88..b2d4002fe 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event) * writable only after it clears its synchro queue. */ box_update_ro_summary(); + box_broadcast_election(); if (raft->state != RAFT_STATE_LEADER) return 0; /* diff --git a/src/lib/msgpuck b/src/lib/msgpuck index 7f95b6fd7..7f343a408 160000 --- a/src/lib/msgpuck +++ b/src/lib/msgpuck @@ -1 +1 @@ -Subproject commit 7f95b6fd7a5b928cfcbdab2d78bea88d1685821e +Subproject commit 7f343a40897cbf070c897886e15eb3d761d322f4 diff --git a/test/app-luatest/gh_6260_add_builtin_events.lua b/test/app-luatest/gh_6260_add_builtin_events.lua new file mode 100644 index 000000000..4a4e1feff --- /dev/null +++ b/test/app-luatest/gh_6260_add_builtin_events.lua @@ -0,0 +1,229 @@ +local t = require('luatest') +local net = require('net.box') +local cluster = require('test.luatest_helpers.cluster') +local helpers = require('test.luatest_helpers') + +local g = t.group('gh_6260') + +g.before_test('test_box_status_event', function(cg) + cg.cluster = cluster:new({}) + + local box_cfg = { + read_only = false + } + + cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + + +g.after_test('test_box_status_event', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + + +g.test_box_status_event= function(cg) + local c = net.connect(cg.master.net_box_uri) + + c:eval([[ + i = '' + box.watch('box.status', function(...) i = {...} end) + ]]) + + t.assert_equals(c:eval("return i"), + {"box.status", + {is_ro = cg.master:eval("return box.info.ro"), + is_ro_cfg = cg.master:eval("return box.cfg.read_only"), + status = cg.master:eval("return box.info.status")}}) + + -- next step + cg.master:eval(([[ + box.cfg{ + replication = { + "%s", + "%s" + }, + replication_connect_timeout = 0.001, + } + ]]):format(helpers.instance_uri('master'), + helpers.instance_uri('replica'))) + + t.assert_equals(c:eval("return i"), + {"box.status", + {is_ro = cg.master:eval("return box.info.ro"), + is_ro_cfg = cg.master:eval("return box.cfg.read_only"), + status = cg.master:eval("return box.info.status")}}) + + -- next step + cg.master:eval(([[ + box.cfg{ + replication = { + "%s", + }, + } + ]]):format(helpers.instance_uri('master'))) + + t.assert_equals(c:eval("return i"), + {"box.status", + {is_ro = cg.master:eval("return box.info.ro"), + is_ro_cfg = cg.master:eval("return box.cfg.read_only"), + status = cg.master:eval("return box.info.status")}}) + + -- next step + cg.master:eval("box.cfg{read_only = true}") + + t.assert_equals(c:eval("return i"), + {"box.status", + {is_ro = cg.master:eval("return box.info.ro"), + is_ro_cfg = cg.master:eval("return box.cfg.read_only"), + status = cg.master:eval("return box.info.status")}}) + + + c:close() +end + + +g.before_test('test_box_election_event', function(cg) + + cg.cluster = cluster:new({}) + + local box_cfg = { + replication = { + helpers.instance_uri('instance_', 1); + helpers.instance_uri('instance_', 2); + helpers.instance_uri('instance_', 3); + }, + replication_connect_quorum = 0, + election_mode = 'candidate', + replication_synchro_quorum = 2, + replication_synchro_timeout = 1, + replication_timeout = 0.25, + election_timeout = 0.25, + } + + cg.instance_1 = cg.cluster:build_server( + {alias = 'instance_1', box_cfg = box_cfg}) + + cg.instance_2 = cg.cluster:build_server( + {alias = 'instance_2', box_cfg = box_cfg}) + + cg.instance_3 = cg.cluster:build_server( + {alias = 'instance_3', box_cfg = box_cfg}) + + cg.cluster:add_server(cg.instance_1) + cg.cluster:add_server(cg.instance_2) + cg.cluster:add_server(cg.instance_3) + cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + + +g.after_test('test_box_election_event', function(cg) + cg.cluster.servers = nil + cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + + +g.test_box_election_event= function(cg) + local c1 = net.connect(cg.instance_1.net_box_uri) + local c2 = net.connect(cg.instance_2.net_box_uri) + local c3 = net.connect(cg.instance_3.net_box_uri) + + c1:eval([[ + i1 = '' + box.watch('box.election', function(...) i1 = {...} end) + ]]) + c2:eval([[ + i2 = '' + box.watch('box.election', function(...) i2 = {...} end) + ]]) + c3:eval([[ + i3 = '' + box.watch('box.election', function(...) i3 = {...} end) + ]]) + + t.assert_equals(c1:eval("return i1"), + {"box.election", + {term = cg.instance_1:eval("return box.info.election.term"), + leader = cg.instance_1:eval("return box.info.election.leader"), + is_ro = cg.instance_1:eval("return box.info.ro"), + role = cg.instance_1:eval("return box.info.election.state")}}) + + t.assert_equals(c2:eval("return i2"), + {"box.election", + {term = cg.instance_2:eval("return box.info.election.term"), + leader = cg.instance_2:eval("return box.info.election.leader"), + is_ro = cg.instance_2:eval("return box.info.ro"), + role = cg.instance_2:eval("return box.info.election.state")}}) + + t.assert_equals(c3:eval("return i3"), + {"box.election", + {term = cg.instance_3:eval("return box.info.election.term"), + leader = cg.instance_3:eval("return box.info.election.leader"), + is_ro = cg.instance_3:eval("return box.info.ro"), + role = cg.instance_3:eval("return box.info.election.state")}}) + + + -- next step + cg.instance_1:eval("return box.cfg{election_mode='voter'}") + + cg.instance_1:wait_election_leader_found() + cg.instance_2:wait_election_leader_found() + cg.instance_3:wait_election_leader_found() + + t.assert_equals(c1:eval("return i1"), + {"box.election", + {term = cg.instance_1:eval("return box.info.election.term"), + leader = cg.instance_1:eval("return box.info.election.leader"), + is_ro = cg.instance_1:eval("return box.info.ro"), + role = cg.instance_1:eval("return box.info.election.state")}}) + + t.assert_equals(c2:eval("return i2"), + {"box.election", + {term = cg.instance_2:eval("return box.info.election.term"), + leader = cg.instance_2:eval("return box.info.election.leader"), + is_ro = cg.instance_2:eval("return box.info.ro"), + role = cg.instance_2:eval("return box.info.election.state")}}) + + t.assert_equals(c3:eval("return i3"), + {"box.election", + {term = cg.instance_3:eval("return box.info.election.term"), + leader = cg.instance_3:eval("return box.info.election.leader"), + is_ro = cg.instance_3:eval("return box.info.ro"), + role = cg.instance_3:eval("return box.info.election.state")}}) + + + -- next step + cg.instance_1:eval("return box.cfg{election_mode='candidate'}") + cg.instance_2:eval("return box.cfg{election_mode='voter'}") + + cg.instance_1:wait_election_leader_found() + cg.instance_2:wait_election_leader_found() + cg.instance_3:wait_election_leader_found() + + t.assert_equals(c1:eval("return i1"), + {"box.election", + {term = cg.instance_1:eval("return box.info.election.term"), + leader = cg.instance_1:eval("return box.info.election.leader"), + is_ro = cg.instance_1:eval("return box.info.ro"), + role = cg.instance_1:eval("return box.info.election.state")}}) + + t.assert_equals(c2:eval("return i2"), + {"box.election", + {term = cg.instance_2:eval("return box.info.election.term"), + leader = cg.instance_2:eval("return box.info.election.leader"), + is_ro = cg.instance_2:eval("return box.info.ro"), + role = cg.instance_2:eval("return box.info.election.state")}}) + + t.assert_equals(c3:eval("return i3"), + {"box.election", + {term = cg.instance_3:eval("return box.info.election.term"), + leader = cg.instance_3:eval("return box.info.election.leader"), + is_ro = cg.instance_3:eval("return box.info.ro"), + role = cg.instance_3:eval("return box.info.election.state")}}) + + c1:close() + c2:close() + c3:close() +end diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc index 5e68b34d2..5c5a323cb 100644 --- a/test/unit/mp_error.cc +++ b/test/unit/mp_error.cc @@ -93,12 +93,6 @@ enum { sizeof(standard_errors) / sizeof(standard_errors[0]), }; -static inline char * -mp_encode_str0(char *data, const char *str) -{ - return mp_encode_str(data, str, strlen(str)); -} - /** Note, not the same as mp_encode_error(). */ static char * mp_encode_mp_error(const struct mp_test_error *e, char *data) -- 2.25.1
next reply other threads:[~2022-01-20 4:06 UTC|newest] Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top 2022-01-20 4:06 Yan Shtunder via Tarantool-patches [this message] 2022-01-27 0:23 ` Vladislav Shpilevoy via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20220120040640.114751-1-ya.shtunder@gmail.com \ --to=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=ya.shtunder@gmail.com \ --subject='Re: [Tarantool-patches] [PATCH v2] net.box: add predefined system events for pub/sub' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox