[Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub
Yan Shtunder
ya.shtunder at gmail.com
Sat Feb 12 04:18:03 MSK 2022
Adding predefined system events: box.status, box.id, box.election
and box.schema.
Closes #6260
NO_CHANGELOG=test stuff
NO_DOC=testing stuff
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3
src/box/alter.cc | 9 +
src/box/box.cc | 103 +++++-
src/box/box.h | 18 +
src/box/mp_error.cc | 6 -
src/box/raft.c | 1 +
src/box/replication.cc | 16 +
src/lib/msgpuck | 2 +-
src/lib/raft/raft.c | 1 +
.../gh_6260_add_builtin_events_test.lua | 320 ++++++++++++++++++
test/unit/mp_error.cc | 6 -
10 files changed, 462 insertions(+), 20 deletions(-)
create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 65c1cb952..e9899b968 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,6 +57,7 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
+#include "box.h"
/* {{{ Auxiliary functions and methods. */
@@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter)
{
(void)alter;
++schema_version;
+ box_broadcast_schema();
}
/**
@@ -2260,6 +2262,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* create.
*/
++schema_version;
+ box_broadcast_schema();
/*
* So may happen that until the DDL change record
* is written to the WAL, the space is used for
@@ -2374,6 +2377,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* AlterSpaceOps are registered in case of space drop.
*/
++schema_version;
+ box_broadcast_schema();
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
if (on_commit == NULL)
@@ -4225,6 +4229,8 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
+ /* Checking box.info.cluster.uuid change */
+ box_broadcast_id();
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
@@ -5072,6 +5078,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
++schema_version;
+ box_broadcast_schema();
return 0;
}
@@ -5599,6 +5606,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(parent_space);
}
++schema_version;
+ box_broadcast_schema();
return 0;
}
@@ -5855,6 +5863,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
++schema_version;
+ box_broadcast_schema();
return 0;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 6a33203df..d72bd3dad 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
#include "audit.h"
#include "trivia/util.h"
#include "version.h"
+#include "mp_uuid.h"
static char status[64] = "unknown";
@@ -98,13 +99,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
const struct vclock *box_vclock = &replicaset.vclock;
@@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+box_broadcast_status(void);
+
+static void
+title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ /* Checking box.info.status change */
+ box_broadcast_status();
+}
+
void
box_update_ro_summary(void)
{
@@ -174,6 +182,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3933,3 +3943,82 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+void
+box_broadcast_id(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
+ uint32_t id = (replica == NULL) ? 0 : replica->id;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
+
+ box_broadcast("box.id", strlen("box.id"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election()
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
+
+ box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(void);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(void);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(void);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
index 3c4176e59..93109562d 100644
--- a/src/box/mp_error.cc
+++ b/src/box/mp_error.cc
@@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error)
return data_size;
}
-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
- return mp_encode_str(data, str, strlen(str));
-}
-
static char *
mp_encode_error_one(char *data, const struct error *error)
{
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..05d5ffb58 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -183,8 +183,12 @@ replica_new(void)
diag_raise();
}
replica->id = 0;
+ /* Checking box.info.id change */
+ box_broadcast_id();
replica->anon = false;
replica->uuid = uuid_nil;
+ /* Checking box.info.uuid change */
+ box_broadcast_id();
replica->applier = NULL;
replica->gc = NULL;
rlist_create(&replica->in_anon);
@@ -217,6 +221,8 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
assert(replica_by_uuid(replica_uuid) == NULL);
struct replica *replica = replica_new();
replica->uuid = *replica_uuid;
+ /* Checking box.info.uuid change */
+ box_broadcast_id();
replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
return replica;
@@ -230,6 +236,8 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid)
struct replica *replica = replica_new();
replica->uuid = *replica_uuid;
+ /* Checking box.info.uuid change */
+ box_broadcast_id();
replica_hash_insert(&replicaset.hash, replica);
replica->anon = true;
replicaset.anon_count++;
@@ -242,6 +250,8 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
assert(replica_id < VCLOCK_MAX);
assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */
replica->id = replica_id;
+ /* Checking box.info.id change */
+ box_broadcast_id();
if (tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)) {
/* Assign local replica id */
@@ -289,6 +299,8 @@ replica_clear_id(struct replica *replica)
instance_id = REPLICA_ID_NIL;
}
replica->id = REPLICA_ID_NIL;
+ /* Checking box.info.id change */
+ box_broadcast_id();
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
/*
@@ -354,6 +366,8 @@ replica_on_applier_connect(struct replica *replica)
assert(replica->applier_sync_state == APPLIER_DISCONNECTED);
replica->uuid = applier->uuid;
+ /* Checking box.info.uuid change */
+ box_broadcast_id();
replica->anon = applier->ballot.is_anon;
replica->applier_sync_state = APPLIER_CONNECTED;
replicaset.applier.connected++;
@@ -553,6 +567,8 @@ replicaset_update(struct applier **appliers, int count, bool keep_connect)
assert(!tt_uuid_is_nil(&applier->uuid));
replica->uuid = applier->uuid;
+ /* Checking box.info.uuid change */
+ box_broadcast_id();
replica->anon = applier->ballot.is_anon;
if (replica_hash_search(&uniq, replica) != NULL) {
diff --git a/src/lib/msgpuck b/src/lib/msgpuck
index 7f95b6fd7..7b1be1670 160000
--- a/src/lib/msgpuck
+++ b/src/lib/msgpuck
@@ -1 +1 @@
-Subproject commit 7f95b6fd7a5b928cfcbdab2d78bea88d1685821e
+Subproject commit 7b1be16703f53d76b05beb680c0cdbb546c2a756
diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
index 89cfb3c17..489df235b 100644
--- a/src/lib/raft/raft.c
+++ b/src/lib/raft/raft.c
@@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
* it manually.
*/
raft->leader = 0;
+ raft_schedule_broadcast(raft);
if (raft->is_candidate)
raft_sm_schedule_new_election(raft);
}
diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..3c363fc9a
--- /dev/null
+++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,320 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local helpers = require('test.luatest_helpers')
+local fiber = require('fiber')
+
+local g = t.group('gh_6260')
+
+
+g.before_test('test_box_status', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ read_only = false
+ }
+
+ cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+
+g.after_test('test_box_status', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+
+g.test_box_status = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+ local watcher = c:watch('box.status',
+ function(name, state)
+ assert(name == 'box.status')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ while result_no < 1 do fiber.sleep(0.001) end
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- test orphan status appearance
+ cg.master:eval(([[
+ box.cfg{
+ replication = {
+ "%s",
+ "%s"
+ },
+ replication_connect_timeout = 0.001,
+ replication_timeout = 0.001,
+ }
+ ]]):format(helpers.instance_uri('master'),
+ helpers.instance_uri('replica')))
+ -- here we have 2 notifications: entering ro when can't connect
+ -- to master and the second one when going orphan
+ while result_no < 3 do fiber.sleep(0.000001) end
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+ -- test ro_cfg appearance
+ cg.master:eval([[
+ box.cfg{
+ replication = {},
+ read_only = true,
+ }
+ ]])
+ while result_no < 4 do fiber.sleep(0.000001) end
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+ -- reset to rw
+ cg.master:eval([[
+ box.cfg{
+ read_only = false,
+ }
+ ]])
+ while result_no < 5 do fiber.sleep(0.000001) end
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- turning manual election mode puts into ro
+ cg.master:eval([[
+ box.cfg{
+ election_mode = 'manual',
+ }
+ ]])
+ while result_no < 6 do fiber.sleep(0.000001) end
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+ -- promotion should turn rm
+ cg.master:eval([[
+ box.ctl.promote()
+ ]])
+ while result_no < 7 do fiber.sleep(0.000001) end
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ watcher:unregister()
+ c:close()
+end
+
+
+g.before_test('test_box_election', function(cg)
+
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ helpers.instance_uri('instance_', 1);
+ helpers.instance_uri('instance_', 2);
+ helpers.instance_uri('instance_', 3);
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'off',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.after_test('test_box_election', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.test_box_election = function(cg)
+ local c = {}
+ c[1] = net.connect(cg.instance_1.net_box_uri)
+ c[2] = net.connect(cg.instance_2.net_box_uri)
+ c[3] = net.connect(cg.instance_3.net_box_uri)
+
+ local res = {}
+ local res_n = {0, 0, 0}
+
+ for i = 1, 3 do
+ c[i]:watch('box.election',
+ function(n, s)
+ t.assert_equals(n, 'box.election')
+ res[i] = s
+ res_n[i] = res_n[i] + 1
+ end)
+ end
+ while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+
+ -- verify all instances are in the same state
+ t.assert_equals(res[1], res[2])
+ t.assert_equals(res[1], res[3])
+
+ -- wait for elections to complete, verify leader is the instance_1
+ -- trying to avoid the exact number of term - it can vary
+ local instance1_id = cg.instance_1:eval("return box.info.id")
+
+ cg.instance_1:eval("box.cfg{election_mode='candidate'}")
+ cg.instance_2:eval("box.cfg{election_mode='voter'}")
+ cg.instance_3:eval("box.cfg{election_mode='voter'}")
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_equals(res[1].leader, instance1_id)
+ t.assert_equals(res[1].is_ro, false)
+ t.assert_equals(res[1].role, 'leader')
+ t.assert_equals(res[2].leader, instance1_id)
+ t.assert_equals(res[2].is_ro, true)
+ t.assert_equals(res[2].role, 'follower')
+ t.assert_equals(res[3].leader, instance1_id)
+ t.assert_equals(res[3].is_ro, true)
+ t.assert_equals(res[3].role, 'follower')
+ t.assert_equals(res[1].term, res[2].term)
+ t.assert_equals(res[1].term, res[3].term)
+
+ -- check the stepping down is working
+ res_n = {0, 0, 0}
+ cg.instance_1:eval("box.cfg{election_mode='voter'}")
+ while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+
+ local expected = {is_ro = true, role = 'follower', term = res[1].term,
+ leader = 0}
+ t.assert_equals(res[1], expected)
+ t.assert_equals(res[2], expected)
+ t.assert_equals(res[3], expected)
+
+ c[1]:close()
+ c[2]:close()
+ c[3]:close()
+end
+
+
+g.before_test('test_box_schema', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+
+g.after_test('test_box_schema', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+
+g.test_box_schema = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+ local version = 0
+ local version_n = 0
+
+ local watcher = c:watch('box.schema',
+ function(n, s)
+ assert(n == 'box.schema')
+ version = s.version
+ version_n = version_n + 1
+ end)
+
+ cg.master:eval("box.schema.create_space('p')")
+ while version_n < 1 do fiber.sleep(0.001) end
+ -- first version change, use it as initial value
+ local init_version = version
+
+ version_n = 0
+ cg.master:eval("box.space.p:create_index('i')")
+ while version_n < 1 do fiber.sleep(0.001) end
+ t.assert_equals(version, init_version + 1)
+
+ version_n = 0
+ cg.master:eval("box.space.p:drop()")
+ while version_n < 1 do fiber.sleep(0.001) end
+ -- there'll be 2 changes - index and space
+ t.assert_equals(version, init_version + 3)
+
+ watcher:unregister()
+ c:close()
+end
+
+
+g.before_test('test_box_id', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replicaset_uuid = t.helpers.uuid('ab', 1),
+ instance_uuid = t.helpers.uuid('1', '2', '3')
+ }
+
+ cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.instance)
+ cg.cluster:start()
+end)
+
+
+g.after_test('test_box_id', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+
+g.test_box_id = function(cg)
+ local c = net.connect(cg.instance.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+
+ local watcher = c:watch('box.id',
+ function(name, state)
+ assert(name == 'box.id')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ while result_no < 1 do fiber.sleep(0.001) end
+ t.assert_equals(result, {id = 1,
+ instance_uuid = '11111111-2222-0000-0000-333333333333',
+ replicaset_uuid = 'abababab-0000-0000-0000-000000000001'})
+
+ -- error when instance_uuid set dynamically
+ t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically",
+ function()
+ cg.instance:eval(([[
+ box.cfg{
+ instance_uuid = "%s"
+ }
+ ]]):format(t.helpers.uuid('1', '2', '4')))
+ end)
+
+ -- error when replicaset_uuid set dynamically
+ t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically",
+ function()
+ cg.instance:eval(([[
+ box.cfg{
+ replicaset_uuid = "%s"
+ }
+ ]]):format(t.helpers.uuid('ab', 2)))
+ end)
+
+ watcher:unregister()
+ c:close()
+end
diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
index 777c68dff..6afb73b22 100644
--- a/test/unit/mp_error.cc
+++ b/test/unit/mp_error.cc
@@ -94,12 +94,6 @@ enum {
sizeof(standard_errors) / sizeof(standard_errors[0]),
};
-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
- return mp_encode_str(data, str, strlen(str));
-}
-
/** Note, not the same as mp_encode_error(). */
static char *
mp_encode_mp_error(const struct mp_test_error *e, char *data)
--
2.25.1
More information about the Tarantool-patches
mailing list