From: Yan Shtunder via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: v.shpilevoy@tarantool.org Cc: tarantool-patches@dev.tarantool.org, Yan Shtunder <ya.shtunder@gmail.com> Subject: [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub Date: Sat, 12 Feb 2022 04:18:03 +0300 [thread overview] Message-ID: <20220212011803.80185-1-ya.shtunder@gmail.com> (raw) Adding predefined system events: box.status, box.id, box.election and box.schema. Closes #6260 NO_CHANGELOG=test stuff NO_DOC=testing stuff --- Issue: https://github.com/tarantool/tarantool/issues/6260 Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3 src/box/alter.cc | 9 + src/box/box.cc | 103 +++++- src/box/box.h | 18 + src/box/mp_error.cc | 6 - src/box/raft.c | 1 + src/box/replication.cc | 16 + src/lib/msgpuck | 2 +- src/lib/raft/raft.c | 1 + .../gh_6260_add_builtin_events_test.lua | 320 ++++++++++++++++++ test/unit/mp_error.cc | 6 - 10 files changed, 462 insertions(+), 20 deletions(-) create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua diff --git a/src/box/alter.cc b/src/box/alter.cc index 65c1cb952..e9899b968 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -57,6 +57,7 @@ #include "sequence.h" #include "sql.h" #include "constraint_id.h" +#include "box.h" /* {{{ Auxiliary functions and methods. */ @@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter) { (void)alter; ++schema_version; + box_broadcast_schema(); } /** @@ -2260,6 +2262,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * create. */ ++schema_version; + box_broadcast_schema(); /* * So may happen that until the DDL change record * is written to the WAL, the space is used for @@ -2374,6 +2377,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * AlterSpaceOps are registered in case of space drop. */ ++schema_version; + box_broadcast_schema(); struct trigger *on_commit = txn_alter_trigger_new(on_drop_space_commit, old_space); if (on_commit == NULL) @@ -4225,6 +4229,8 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event) if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0) return -1; REPLICASET_UUID = uu; + /* Checking box.info.cluster.uuid change */ + box_broadcast_id(); say_info("cluster uuid %s", tt_uuid_str(&uu)); } else if (strcmp(key, "version") == 0) { if (new_tuple != NULL) { @@ -5072,6 +5078,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event) txn_stmt_on_rollback(stmt, on_rollback); txn_stmt_on_commit(stmt, on_commit); ++schema_version; + box_broadcast_schema(); return 0; } @@ -5599,6 +5606,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event) space_reset_fk_constraint_mask(parent_space); } ++schema_version; + box_broadcast_schema(); return 0; } @@ -5855,6 +5863,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event) if (trigger_run(&on_alter_space, space) != 0) return -1; ++schema_version; + box_broadcast_schema(); return 0; } diff --git a/src/box/box.cc b/src/box/box.cc index 6a33203df..d72bd3dad 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -85,6 +85,7 @@ #include "audit.h" #include "trivia/util.h" #include "version.h" +#include "mp_uuid.h" static char status[64] = "unknown"; @@ -98,13 +99,6 @@ double txn_timeout_default; struct rlist box_on_shutdown_trigger_list = RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list); -static void title(const char *new_status) -{ - snprintf(status, sizeof(status), "%s", new_status); - title_set_status(new_status); - title_update(); - systemd_snotify("STATUS=%s", status); -} const struct vclock *box_vclock = &replicaset.vclock; @@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool; */ static struct cbus_endpoint tx_prio_endpoint; +static void +box_broadcast_status(void); + +static void +title(const char *new_status) +{ + snprintf(status, sizeof(status), "%s", new_status); + title_set_status(new_status); + title_update(); + systemd_snotify("STATUS=%s", status); + /* Checking box.info.status change */ + box_broadcast_status(); +} + void box_update_ro_summary(void) { @@ -174,6 +182,8 @@ box_update_ro_summary(void) if (is_ro_summary) engine_switch_to_ro(); fiber_cond_broadcast(&ro_cond); + box_broadcast_status(); + box_broadcast_election(); } const char * @@ -3933,3 +3943,82 @@ box_reset_stat(void) engine_reset_stat(); space_foreach(box_reset_space_stat, NULL); } + +void +box_broadcast_id(void) +{ + char buf[1024]; + char *w = buf; + + struct replica *replica = replica_by_uuid(&INSTANCE_UUID); + uint32_t id = (replica == NULL) ? 0 : replica->id; + + w = mp_encode_map(w, 3); + w = mp_encode_str0(w, "id"); + w = mp_encode_uint(w, id); + w = mp_encode_str0(w, "instance_uuid"); + w = mp_encode_uuid(w, &INSTANCE_UUID); + w = mp_encode_str0(w, "replicaset_uuid"); + w = mp_encode_uuid(w, &REPLICASET_UUID); + + box_broadcast("box.id", strlen("box.id"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +static void +box_broadcast_status(void) +{ + char buf[1024]; + char *w = buf; + + w = mp_encode_map(w, 3); + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + w = mp_encode_str0(w, "is_ro_cfg"); + w = mp_encode_bool(w, cfg_geti("read_only")); + w = mp_encode_str0(w, "status"); + w = mp_encode_str0(w, box_status()); + + box_broadcast("box.status", strlen("box.status"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +void +box_broadcast_election() +{ + struct raft *raft = box_raft(); + + char buf[1024]; + char *w = buf; + + w = mp_encode_map(w, 4); + w = mp_encode_str0(w, "term"); + w = mp_encode_uint(w, raft->term); + w = mp_encode_str0(w, "role"); + w = mp_encode_str0(w, raft_state_str(raft->state)); + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + w = mp_encode_str0(w, "leader"); + w = mp_encode_uint(w, raft->leader); + + box_broadcast("box.election", strlen("box.election"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +void +box_broadcast_schema(void) +{ + char buf[1024]; + char *w = buf; + + w = mp_encode_map(w, 1); + w = mp_encode_str0(w, "version"); + w = mp_encode_uint(w, box_schema_version()); + + box_broadcast("box.schema", strlen("box.schema"), buf, w); + + assert((size_t)(w - buf) < 1024); +} diff --git a/src/box/box.h b/src/box/box.h index b594f6646..766568368 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space, int boxk(int type, uint32_t space_id, const char *format, ...); +/** + * Broadcast the identification of the instance + */ +void +box_broadcast_id(void); + +/** + * Broadcast the current election state of RAFT machinery + */ +void +box_broadcast_election(void); + +/** + * Broadcast the current schema version + */ +void +box_broadcast_schema(void); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc index 3c4176e59..93109562d 100644 --- a/src/box/mp_error.cc +++ b/src/box/mp_error.cc @@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error) return data_size; } -static inline char * -mp_encode_str0(char *data, const char *str) -{ - return mp_encode_str(data, str, strlen(str)); -} - static char * mp_encode_error_one(char *data, const struct error *error) { diff --git a/src/box/raft.c b/src/box/raft.c index be6009cc1..efc7e1038 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event) * writable only after it clears its synchro queue. */ box_update_ro_summary(); + box_broadcast_election(); if (raft->state != RAFT_STATE_LEADER) return 0; /* diff --git a/src/box/replication.cc b/src/box/replication.cc index bbf4b7417..05d5ffb58 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -183,8 +183,12 @@ replica_new(void) diag_raise(); } replica->id = 0; + /* Checking box.info.id change */ + box_broadcast_id(); replica->anon = false; replica->uuid = uuid_nil; + /* Checking box.info.uuid change */ + box_broadcast_id(); replica->applier = NULL; replica->gc = NULL; rlist_create(&replica->in_anon); @@ -217,6 +221,8 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) assert(replica_by_uuid(replica_uuid) == NULL); struct replica *replica = replica_new(); replica->uuid = *replica_uuid; + /* Checking box.info.uuid change */ + box_broadcast_id(); replica_hash_insert(&replicaset.hash, replica); replica_set_id(replica, replica_id); return replica; @@ -230,6 +236,8 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid) struct replica *replica = replica_new(); replica->uuid = *replica_uuid; + /* Checking box.info.uuid change */ + box_broadcast_id(); replica_hash_insert(&replicaset.hash, replica); replica->anon = true; replicaset.anon_count++; @@ -242,6 +250,8 @@ replica_set_id(struct replica *replica, uint32_t replica_id) assert(replica_id < VCLOCK_MAX); assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */ replica->id = replica_id; + /* Checking box.info.id change */ + box_broadcast_id(); if (tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)) { /* Assign local replica id */ @@ -289,6 +299,8 @@ replica_clear_id(struct replica *replica) instance_id = REPLICA_ID_NIL; } replica->id = REPLICA_ID_NIL; + /* Checking box.info.id change */ + box_broadcast_id(); say_info("removed replica %s", tt_uuid_str(&replica->uuid)); /* @@ -354,6 +366,8 @@ replica_on_applier_connect(struct replica *replica) assert(replica->applier_sync_state == APPLIER_DISCONNECTED); replica->uuid = applier->uuid; + /* Checking box.info.uuid change */ + box_broadcast_id(); replica->anon = applier->ballot.is_anon; replica->applier_sync_state = APPLIER_CONNECTED; replicaset.applier.connected++; @@ -553,6 +567,8 @@ replicaset_update(struct applier **appliers, int count, bool keep_connect) assert(!tt_uuid_is_nil(&applier->uuid)); replica->uuid = applier->uuid; + /* Checking box.info.uuid change */ + box_broadcast_id(); replica->anon = applier->ballot.is_anon; if (replica_hash_search(&uniq, replica) != NULL) { diff --git a/src/lib/msgpuck b/src/lib/msgpuck index 7f95b6fd7..7b1be1670 160000 --- a/src/lib/msgpuck +++ b/src/lib/msgpuck @@ -1 +1 @@ -Subproject commit 7f95b6fd7a5b928cfcbdab2d78bea88d1685821e +Subproject commit 7b1be16703f53d76b05beb680c0cdbb546c2a756 diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c index 89cfb3c17..489df235b 100644 --- a/src/lib/raft/raft.c +++ b/src/lib/raft/raft.c @@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source) * it manually. */ raft->leader = 0; + raft_schedule_broadcast(raft); if (raft->is_candidate) raft_sm_schedule_new_election(raft); } diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua new file mode 100644 index 000000000..3c363fc9a --- /dev/null +++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua @@ -0,0 +1,320 @@ +local t = require('luatest') +local net = require('net.box') +local cluster = require('test.luatest_helpers.cluster') +local helpers = require('test.luatest_helpers') +local fiber = require('fiber') + +local g = t.group('gh_6260') + + +g.before_test('test_box_status', function(cg) + cg.cluster = cluster:new({}) + + local box_cfg = { + read_only = false + } + + cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + + +g.after_test('test_box_status', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + + +g.test_box_status = function(cg) + local c = net.connect(cg.master.net_box_uri) + + local result = {} + local result_no = 0 + local watcher = c:watch('box.status', + function(name, state) + assert(name == 'box.status') + result = state + result_no = result_no + 1 + end) + + -- initial state should arrive + while result_no < 1 do fiber.sleep(0.001) end + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + -- test orphan status appearance + cg.master:eval(([[ + box.cfg{ + replication = { + "%s", + "%s" + }, + replication_connect_timeout = 0.001, + replication_timeout = 0.001, + } + ]]):format(helpers.instance_uri('master'), + helpers.instance_uri('replica'))) + -- here we have 2 notifications: entering ro when can't connect + -- to master and the second one when going orphan + while result_no < 3 do fiber.sleep(0.000001) end + t.assert_equals(result, + {is_ro = true, is_ro_cfg = false, status = 'orphan'}) + + -- test ro_cfg appearance + cg.master:eval([[ + box.cfg{ + replication = {}, + read_only = true, + } + ]]) + while result_no < 4 do fiber.sleep(0.000001) end + t.assert_equals(result, + {is_ro = true, is_ro_cfg = true, status = 'running'}) + + -- reset to rw + cg.master:eval([[ + box.cfg{ + read_only = false, + } + ]]) + while result_no < 5 do fiber.sleep(0.000001) end + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + -- turning manual election mode puts into ro + cg.master:eval([[ + box.cfg{ + election_mode = 'manual', + } + ]]) + while result_no < 6 do fiber.sleep(0.000001) end + t.assert_equals(result, + {is_ro = true, is_ro_cfg = false, status = 'running'}) + + -- promotion should turn rm + cg.master:eval([[ + box.ctl.promote() + ]]) + while result_no < 7 do fiber.sleep(0.000001) end + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + watcher:unregister() + c:close() +end + + +g.before_test('test_box_election', function(cg) + + cg.cluster = cluster:new({}) + + local box_cfg = { + replication = { + helpers.instance_uri('instance_', 1); + helpers.instance_uri('instance_', 2); + helpers.instance_uri('instance_', 3); + }, + replication_connect_quorum = 0, + election_mode = 'off', + replication_synchro_quorum = 2, + replication_synchro_timeout = 1, + replication_timeout = 0.25, + election_timeout = 0.25, + } + + cg.instance_1 = cg.cluster:build_server( + {alias = 'instance_1', box_cfg = box_cfg}) + + cg.instance_2 = cg.cluster:build_server( + {alias = 'instance_2', box_cfg = box_cfg}) + + cg.instance_3 = cg.cluster:build_server( + {alias = 'instance_3', box_cfg = box_cfg}) + + cg.cluster:add_server(cg.instance_1) + cg.cluster:add_server(cg.instance_2) + cg.cluster:add_server(cg.instance_3) + cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + + +g.after_test('test_box_election', function(cg) + cg.cluster.servers = nil + cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + + +g.test_box_election = function(cg) + local c = {} + c[1] = net.connect(cg.instance_1.net_box_uri) + c[2] = net.connect(cg.instance_2.net_box_uri) + c[3] = net.connect(cg.instance_3.net_box_uri) + + local res = {} + local res_n = {0, 0, 0} + + for i = 1, 3 do + c[i]:watch('box.election', + function(n, s) + t.assert_equals(n, 'box.election') + res[i] = s + res_n[i] = res_n[i] + 1 + end) + end + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end + + -- verify all instances are in the same state + t.assert_equals(res[1], res[2]) + t.assert_equals(res[1], res[3]) + + -- wait for elections to complete, verify leader is the instance_1 + -- trying to avoid the exact number of term - it can vary + local instance1_id = cg.instance_1:eval("return box.info.id") + + cg.instance_1:eval("box.cfg{election_mode='candidate'}") + cg.instance_2:eval("box.cfg{election_mode='voter'}") + cg.instance_3:eval("box.cfg{election_mode='voter'}") + + cg.instance_1:wait_election_leader_found() + cg.instance_2:wait_election_leader_found() + cg.instance_3:wait_election_leader_found() + + t.assert_equals(res[1].leader, instance1_id) + t.assert_equals(res[1].is_ro, false) + t.assert_equals(res[1].role, 'leader') + t.assert_equals(res[2].leader, instance1_id) + t.assert_equals(res[2].is_ro, true) + t.assert_equals(res[2].role, 'follower') + t.assert_equals(res[3].leader, instance1_id) + t.assert_equals(res[3].is_ro, true) + t.assert_equals(res[3].role, 'follower') + t.assert_equals(res[1].term, res[2].term) + t.assert_equals(res[1].term, res[3].term) + + -- check the stepping down is working + res_n = {0, 0, 0} + cg.instance_1:eval("box.cfg{election_mode='voter'}") + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end + + local expected = {is_ro = true, role = 'follower', term = res[1].term, + leader = 0} + t.assert_equals(res[1], expected) + t.assert_equals(res[2], expected) + t.assert_equals(res[3], expected) + + c[1]:close() + c[2]:close() + c[3]:close() +end + + +g.before_test('test_box_schema', function(cg) + cg.cluster = cluster:new({}) + cg.master = cg.cluster:build_server({alias = 'master'}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + + +g.after_test('test_box_schema', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + + +g.test_box_schema = function(cg) + local c = net.connect(cg.master.net_box_uri) + local version = 0 + local version_n = 0 + + local watcher = c:watch('box.schema', + function(n, s) + assert(n == 'box.schema') + version = s.version + version_n = version_n + 1 + end) + + cg.master:eval("box.schema.create_space('p')") + while version_n < 1 do fiber.sleep(0.001) end + -- first version change, use it as initial value + local init_version = version + + version_n = 0 + cg.master:eval("box.space.p:create_index('i')") + while version_n < 1 do fiber.sleep(0.001) end + t.assert_equals(version, init_version + 1) + + version_n = 0 + cg.master:eval("box.space.p:drop()") + while version_n < 1 do fiber.sleep(0.001) end + -- there'll be 2 changes - index and space + t.assert_equals(version, init_version + 3) + + watcher:unregister() + c:close() +end + + +g.before_test('test_box_id', function(cg) + cg.cluster = cluster:new({}) + + local box_cfg = { + replicaset_uuid = t.helpers.uuid('ab', 1), + instance_uuid = t.helpers.uuid('1', '2', '3') + } + + cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg}) + cg.cluster:add_server(cg.instance) + cg.cluster:start() +end) + + +g.after_test('test_box_id', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + + +g.test_box_id = function(cg) + local c = net.connect(cg.instance.net_box_uri) + + local result = {} + local result_no = 0 + + local watcher = c:watch('box.id', + function(name, state) + assert(name == 'box.id') + result = state + result_no = result_no + 1 + end) + + -- initial state should arrive + while result_no < 1 do fiber.sleep(0.001) end + t.assert_equals(result, {id = 1, + instance_uuid = '11111111-2222-0000-0000-333333333333', + replicaset_uuid = 'abababab-0000-0000-0000-000000000001'}) + + -- error when instance_uuid set dynamically + t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically", + function() + cg.instance:eval(([[ + box.cfg{ + instance_uuid = "%s" + } + ]]):format(t.helpers.uuid('1', '2', '4'))) + end) + + -- error when replicaset_uuid set dynamically + t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically", + function() + cg.instance:eval(([[ + box.cfg{ + replicaset_uuid = "%s" + } + ]]):format(t.helpers.uuid('ab', 2))) + end) + + watcher:unregister() + c:close() +end diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc index 777c68dff..6afb73b22 100644 --- a/test/unit/mp_error.cc +++ b/test/unit/mp_error.cc @@ -94,12 +94,6 @@ enum { sizeof(standard_errors) / sizeof(standard_errors[0]), }; -static inline char * -mp_encode_str0(char *data, const char *str) -{ - return mp_encode_str(data, str, strlen(str)); -} - /** Note, not the same as mp_encode_error(). */ static char * mp_encode_mp_error(const struct mp_test_error *e, char *data) -- 2.25.1
next reply other threads:[~2022-02-12 1:18 UTC|newest] Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top 2022-02-12 1:18 Yan Shtunder via Tarantool-patches [this message] 2022-02-12 14:48 ` Vladimir Davydov via Tarantool-patches 2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches 2022-02-15 7:36 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20220212011803.80185-1-ya.shtunder@gmail.com \ --to=tarantool-patches@dev.tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=ya.shtunder@gmail.com \ --subject='Re: [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox