Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub
@ 2022-02-12  1:18 Yan Shtunder via Tarantool-patches
  2022-02-12 14:48 ` Vladimir Davydov via Tarantool-patches
  2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches
  0 siblings, 2 replies; 4+ messages in thread
From: Yan Shtunder via Tarantool-patches @ 2022-02-12  1:18 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: tarantool-patches, Yan Shtunder

Adding predefined system events: box.status, box.id, box.election
and box.schema.

Closes #6260

NO_CHANGELOG=test stuff
NO_DOC=testing stuff
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3

 src/box/alter.cc                              |   9 +
 src/box/box.cc                                | 103 +++++-
 src/box/box.h                                 |  18 +
 src/box/mp_error.cc                           |   6 -
 src/box/raft.c                                |   1 +
 src/box/replication.cc                        |  16 +
 src/lib/msgpuck                               |   2 +-
 src/lib/raft/raft.c                           |   1 +
 .../gh_6260_add_builtin_events_test.lua       | 320 ++++++++++++++++++
 test/unit/mp_error.cc                         |   6 -
 10 files changed, 462 insertions(+), 20 deletions(-)
 create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 65c1cb952..e9899b968 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,6 +57,7 @@
 #include "sequence.h"
 #include "sql.h"
 #include "constraint_id.h"
+#include "box.h"

 /* {{{ Auxiliary functions and methods. */

@@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter)
 {
     (void)alter;
     ++schema_version;
+    box_broadcast_schema();
 }

 /**
@@ -2260,6 +2262,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		 * create.
 		 */
 		++schema_version;
+		box_broadcast_schema();
 		/*
 		 * So may happen that until the DDL change record
 		 * is written to the WAL, the space is used for
@@ -2374,6 +2377,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		 * AlterSpaceOps are registered in case of space drop.
 		 */
 		++schema_version;
+		box_broadcast_schema();
 		struct trigger *on_commit =
 			txn_alter_trigger_new(on_drop_space_commit, old_space);
 		if (on_commit == NULL)
@@ -4225,6 +4229,8 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
 		if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
 			return -1;
 		REPLICASET_UUID = uu;
+		/* Checking box.info.cluster.uuid change */
+ 		box_broadcast_id();
 		say_info("cluster uuid %s", tt_uuid_str(&uu));
 	} else if (strcmp(key, "version") == 0) {
 		if (new_tuple != NULL) {
@@ -5072,6 +5078,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
 	txn_stmt_on_rollback(stmt, on_rollback);
 	txn_stmt_on_commit(stmt, on_commit);
 	++schema_version;
+	box_broadcast_schema();
 	return 0;
 }

@@ -5599,6 +5606,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
 		space_reset_fk_constraint_mask(parent_space);
 	}
 	++schema_version;
+	box_broadcast_schema();
 	return 0;
 }

@@ -5855,6 +5863,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
 	if (trigger_run(&on_alter_space, space) != 0)
 		return -1;
 	++schema_version;
+	box_broadcast_schema();
 	return 0;
 }

diff --git a/src/box/box.cc b/src/box/box.cc
index 6a33203df..d72bd3dad 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
 #include "audit.h"
 #include "trivia/util.h"
 #include "version.h"
+#include "mp_uuid.h"

 static char status[64] = "unknown";

@@ -98,13 +99,6 @@ double txn_timeout_default;
 struct rlist box_on_shutdown_trigger_list =
 	RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);

-static void title(const char *new_status)
-{
-	snprintf(status, sizeof(status), "%s", new_status);
-	title_set_status(new_status);
-	title_update();
-	systemd_snotify("STATUS=%s", status);
-}

 const struct vclock *box_vclock = &replicaset.vclock;

@@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool;
  */
 static struct cbus_endpoint tx_prio_endpoint;

+static void
+box_broadcast_status(void);
+
+static void
+title(const char *new_status)
+{
+	snprintf(status, sizeof(status), "%s", new_status);
+	title_set_status(new_status);
+	title_update();
+	systemd_snotify("STATUS=%s", status);
+	/* Checking box.info.status change */
+	box_broadcast_status();
+}
+
 void
 box_update_ro_summary(void)
 {
@@ -174,6 +182,8 @@ box_update_ro_summary(void)
 	if (is_ro_summary)
 		engine_switch_to_ro();
 	fiber_cond_broadcast(&ro_cond);
+	box_broadcast_status();
+	box_broadcast_election();
 }

 const char *
@@ -3933,3 +3943,82 @@ box_reset_stat(void)
 	engine_reset_stat();
 	space_foreach(box_reset_space_stat, NULL);
 }
+
+void
+box_broadcast_id(void)
+{
+	char buf[1024];
+	char *w = buf;
+
+	struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
+	uint32_t id = (replica == NULL) ? 0 : replica->id;
+
+	w = mp_encode_map(w, 3);
+	w = mp_encode_str0(w, "id");
+	w = mp_encode_uint(w, id);
+	w = mp_encode_str0(w, "instance_uuid");
+	w = mp_encode_uuid(w, &INSTANCE_UUID);
+	w = mp_encode_str0(w, "replicaset_uuid");
+	w = mp_encode_uuid(w, &REPLICASET_UUID);
+
+	box_broadcast("box.id", strlen("box.id"), buf, w);
+
+	assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(void)
+{
+	char buf[1024];
+	char *w = buf;
+
+	w = mp_encode_map(w, 3);
+	w = mp_encode_str0(w, "is_ro");
+	w = mp_encode_bool(w, box_is_ro());
+	w = mp_encode_str0(w, "is_ro_cfg");
+	w = mp_encode_bool(w, cfg_geti("read_only"));
+	w = mp_encode_str0(w, "status");
+	w = mp_encode_str0(w, box_status());
+
+	box_broadcast("box.status", strlen("box.status"), buf, w);
+
+	assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election()
+{
+	struct raft *raft = box_raft();
+
+	char buf[1024];
+	char *w = buf;
+
+	w = mp_encode_map(w, 4);
+	w = mp_encode_str0(w, "term");
+	w = mp_encode_uint(w, raft->term);
+	w = mp_encode_str0(w, "role");
+	w = mp_encode_str0(w, raft_state_str(raft->state));
+	w = mp_encode_str0(w, "is_ro");
+	w = mp_encode_bool(w, box_is_ro());
+	w = mp_encode_str0(w, "leader");
+	w = mp_encode_uint(w, raft->leader);
+
+	box_broadcast("box.election", strlen("box.election"), buf, w);
+
+	assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(void)
+{
+	char buf[1024];
+	char *w = buf;
+
+	w = mp_encode_map(w, 1);
+	w = mp_encode_str0(w, "version");
+	w = mp_encode_uint(w, box_schema_version());
+
+	box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+	assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
 int
 boxk(int type, uint32_t space_id, const char *format, ...);

+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(void);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(void);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(void);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
index 3c4176e59..93109562d 100644
--- a/src/box/mp_error.cc
+++ b/src/box/mp_error.cc
@@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error)
 	return data_size;
 }

-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
-	return mp_encode_str(data, str, strlen(str));
-}
-
 static char *
 mp_encode_error_one(char *data, const struct error *error)
 {
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
 	 * writable only after it clears its synchro queue.
 	 */
 	box_update_ro_summary();
+	box_broadcast_election();
 	if (raft->state != RAFT_STATE_LEADER)
 		return 0;
 	/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..05d5ffb58 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -183,8 +183,12 @@ replica_new(void)
 		diag_raise();
 	}
 	replica->id = 0;
+	/* Checking box.info.id change */
+	box_broadcast_id();
 	replica->anon = false;
 	replica->uuid = uuid_nil;
+	/* Checking box.info.uuid change */
+	box_broadcast_id();
 	replica->applier = NULL;
 	replica->gc = NULL;
 	rlist_create(&replica->in_anon);
@@ -217,6 +221,8 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
 	assert(replica_by_uuid(replica_uuid) == NULL);
 	struct replica *replica = replica_new();
 	replica->uuid = *replica_uuid;
+	/* Checking box.info.uuid change */
+	box_broadcast_id();
 	replica_hash_insert(&replicaset.hash, replica);
 	replica_set_id(replica, replica_id);
 	return replica;
@@ -230,6 +236,8 @@ replicaset_add_anon(const struct tt_uuid *replica_uuid)

 	struct replica *replica = replica_new();
 	replica->uuid = *replica_uuid;
+	/* Checking box.info.uuid change */
+	box_broadcast_id();
 	replica_hash_insert(&replicaset.hash, replica);
 	replica->anon = true;
 	replicaset.anon_count++;
@@ -242,6 +250,8 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
 	assert(replica_id < VCLOCK_MAX);
 	assert(replica->id == REPLICA_ID_NIL); /* replica id is read-only */
 	replica->id = replica_id;
+	/* Checking box.info.id change */
+	box_broadcast_id();

 	if (tt_uuid_is_equal(&INSTANCE_UUID, &replica->uuid)) {
 		/* Assign local replica id */
@@ -289,6 +299,8 @@ replica_clear_id(struct replica *replica)
 		instance_id = REPLICA_ID_NIL;
 	}
 	replica->id = REPLICA_ID_NIL;
+	/* Checking box.info.id change */
+	box_broadcast_id();
 	say_info("removed replica %s", tt_uuid_str(&replica->uuid));

 	/*
@@ -354,6 +366,8 @@ replica_on_applier_connect(struct replica *replica)
 	assert(replica->applier_sync_state == APPLIER_DISCONNECTED);

 	replica->uuid = applier->uuid;
+	/* Checking box.info.uuid change */
+	box_broadcast_id();
 	replica->anon = applier->ballot.is_anon;
 	replica->applier_sync_state = APPLIER_CONNECTED;
 	replicaset.applier.connected++;
@@ -553,6 +567,8 @@ replicaset_update(struct applier **appliers, int count, bool keep_connect)

 		assert(!tt_uuid_is_nil(&applier->uuid));
 		replica->uuid = applier->uuid;
+		/* Checking box.info.uuid change */
+		box_broadcast_id();
 		replica->anon = applier->ballot.is_anon;

 		if (replica_hash_search(&uniq, replica) != NULL) {
diff --git a/src/lib/msgpuck b/src/lib/msgpuck
index 7f95b6fd7..7b1be1670 160000
--- a/src/lib/msgpuck
+++ b/src/lib/msgpuck
@@ -1 +1 @@
-Subproject commit 7f95b6fd7a5b928cfcbdab2d78bea88d1685821e
+Subproject commit 7b1be16703f53d76b05beb680c0cdbb546c2a756
diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
index 89cfb3c17..489df235b 100644
--- a/src/lib/raft/raft.c
+++ b/src/lib/raft/raft.c
@@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
 			 * it manually.
 			 */
 			raft->leader = 0;
+			raft_schedule_broadcast(raft);
 			if (raft->is_candidate)
 				raft_sm_schedule_new_election(raft);
 		}
diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..3c363fc9a
--- /dev/null
+++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,320 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local helpers = require('test.luatest_helpers')
+local fiber = require('fiber')
+
+local g = t.group('gh_6260')
+
+
+g.before_test('test_box_status', function(cg)
+    cg.cluster = cluster:new({})
+
+    local box_cfg = {
+        read_only = false
+    }
+
+    cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+    cg.cluster:add_server(cg.master)
+    cg.cluster:start()
+end)
+
+
+g.after_test('test_box_status', function(cg)
+    cg.cluster.servers = nil
+    cg.cluster:drop()
+end)
+
+
+g.test_box_status = function(cg)
+    local c = net.connect(cg.master.net_box_uri)
+
+    local result = {}
+    local result_no = 0
+    local watcher = c:watch('box.status',
+                            function(name, state)
+                                assert(name == 'box.status')
+                                result = state
+                                result_no = result_no + 1
+                            end)
+
+    -- initial state should arrive
+    while result_no < 1 do fiber.sleep(0.001) end
+    t.assert_equals(result,
+                    {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+    -- test orphan status appearance
+    cg.master:eval(([[
+        box.cfg{
+            replication = {
+                "%s",
+                "%s"
+            },
+            replication_connect_timeout = 0.001,
+            replication_timeout = 0.001,
+        }
+    ]]):format(helpers.instance_uri('master'),
+               helpers.instance_uri('replica')))
+    -- here we have 2 notifications: entering ro when can't connect
+    -- to master and the second one when going orphan
+    while result_no < 3 do fiber.sleep(0.000001) end
+    t.assert_equals(result,
+                    {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+    -- test ro_cfg appearance
+    cg.master:eval([[
+        box.cfg{
+            replication = {},
+            read_only = true,
+        }
+    ]])
+    while result_no < 4 do fiber.sleep(0.000001) end
+    t.assert_equals(result,
+                    {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+    -- reset to rw
+    cg.master:eval([[
+        box.cfg{
+            read_only = false,
+        }
+    ]])
+    while result_no < 5 do fiber.sleep(0.000001) end
+    t.assert_equals(result,
+                    {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+    -- turning manual election mode puts into ro
+    cg.master:eval([[
+        box.cfg{
+            election_mode = 'manual',
+        }
+    ]])
+    while result_no < 6 do fiber.sleep(0.000001) end
+    t.assert_equals(result,
+                    {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+    -- promotion should turn rm
+    cg.master:eval([[
+        box.ctl.promote()
+    ]])
+    while result_no < 7 do fiber.sleep(0.000001) end
+    t.assert_equals(result,
+                    {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+    watcher:unregister()
+    c:close()
+end
+
+
+g.before_test('test_box_election', function(cg)
+
+    cg.cluster = cluster:new({})
+
+    local box_cfg = {
+        replication = {
+                        helpers.instance_uri('instance_', 1);
+                        helpers.instance_uri('instance_', 2);
+                        helpers.instance_uri('instance_', 3);
+        },
+        replication_connect_quorum = 0,
+        election_mode = 'off',
+        replication_synchro_quorum = 2,
+        replication_synchro_timeout = 1,
+        replication_timeout = 0.25,
+        election_timeout = 0.25,
+    }
+
+    cg.instance_1 = cg.cluster:build_server(
+        {alias = 'instance_1', box_cfg = box_cfg})
+
+    cg.instance_2 = cg.cluster:build_server(
+        {alias = 'instance_2', box_cfg = box_cfg})
+
+    cg.instance_3 = cg.cluster:build_server(
+        {alias = 'instance_3', box_cfg = box_cfg})
+
+    cg.cluster:add_server(cg.instance_1)
+    cg.cluster:add_server(cg.instance_2)
+    cg.cluster:add_server(cg.instance_3)
+    cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.after_test('test_box_election', function(cg)
+    cg.cluster.servers = nil
+    cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+
+g.test_box_election = function(cg)
+    local c = {}
+    c[1] = net.connect(cg.instance_1.net_box_uri)
+    c[2] = net.connect(cg.instance_2.net_box_uri)
+    c[3] = net.connect(cg.instance_3.net_box_uri)
+
+    local res = {}
+    local res_n = {0, 0, 0}
+
+    for i = 1, 3 do
+        c[i]:watch('box.election',
+                   function(n, s)
+                       t.assert_equals(n, 'box.election')
+                       res[i] = s
+                       res_n[i] = res_n[i] + 1
+                   end)
+    end
+    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+
+    -- verify all instances are in the same state
+    t.assert_equals(res[1], res[2])
+    t.assert_equals(res[1], res[3])
+
+    -- wait for elections to complete, verify leader is the instance_1
+    -- trying to avoid the exact number of term - it can vary
+    local instance1_id = cg.instance_1:eval("return box.info.id")
+
+    cg.instance_1:eval("box.cfg{election_mode='candidate'}")
+    cg.instance_2:eval("box.cfg{election_mode='voter'}")
+    cg.instance_3:eval("box.cfg{election_mode='voter'}")
+
+    cg.instance_1:wait_election_leader_found()
+    cg.instance_2:wait_election_leader_found()
+    cg.instance_3:wait_election_leader_found()
+
+    t.assert_equals(res[1].leader, instance1_id)
+    t.assert_equals(res[1].is_ro, false)
+    t.assert_equals(res[1].role, 'leader')
+    t.assert_equals(res[2].leader, instance1_id)
+    t.assert_equals(res[2].is_ro, true)
+    t.assert_equals(res[2].role, 'follower')
+    t.assert_equals(res[3].leader, instance1_id)
+    t.assert_equals(res[3].is_ro, true)
+    t.assert_equals(res[3].role, 'follower')
+    t.assert_equals(res[1].term, res[2].term)
+    t.assert_equals(res[1].term, res[3].term)
+
+    -- check the stepping down is working
+    res_n = {0, 0, 0}
+    cg.instance_1:eval("box.cfg{election_mode='voter'}")
+    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+
+    local expected = {is_ro = true, role = 'follower', term = res[1].term,
+                      leader = 0}
+    t.assert_equals(res[1], expected)
+    t.assert_equals(res[2], expected)
+    t.assert_equals(res[3], expected)
+
+    c[1]:close()
+    c[2]:close()
+    c[3]:close()
+end
+
+
+g.before_test('test_box_schema', function(cg)
+    cg.cluster = cluster:new({})
+    cg.master = cg.cluster:build_server({alias = 'master'})
+    cg.cluster:add_server(cg.master)
+    cg.cluster:start()
+end)
+
+
+g.after_test('test_box_schema', function(cg)
+    cg.cluster.servers = nil
+    cg.cluster:drop()
+end)
+
+
+g.test_box_schema = function(cg)
+    local c = net.connect(cg.master.net_box_uri)
+    local version = 0
+    local version_n = 0
+
+    local watcher = c:watch('box.schema',
+                            function(n, s)
+                                assert(n == 'box.schema')
+                                version = s.version
+                                version_n = version_n + 1
+                            end)
+
+    cg.master:eval("box.schema.create_space('p')")
+    while version_n < 1 do fiber.sleep(0.001) end
+    -- first version change, use it as initial value
+    local init_version = version
+
+    version_n = 0
+    cg.master:eval("box.space.p:create_index('i')")
+    while version_n < 1 do fiber.sleep(0.001) end
+    t.assert_equals(version, init_version + 1)
+
+    version_n = 0
+    cg.master:eval("box.space.p:drop()")
+    while version_n < 1 do fiber.sleep(0.001) end
+    -- there'll be 2 changes - index and space
+    t.assert_equals(version, init_version + 3)
+
+    watcher:unregister()
+    c:close()
+end
+
+
+g.before_test('test_box_id', function(cg)
+    cg.cluster = cluster:new({})
+
+    local box_cfg = {
+        replicaset_uuid = t.helpers.uuid('ab', 1),
+        instance_uuid = t.helpers.uuid('1', '2', '3')
+    }
+
+    cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+    cg.cluster:add_server(cg.instance)
+    cg.cluster:start()
+end)
+
+
+g.after_test('test_box_id', function(cg)
+    cg.cluster.servers = nil
+    cg.cluster:drop()
+end)
+
+
+g.test_box_id = function(cg)
+    local c = net.connect(cg.instance.net_box_uri)
+
+    local result = {}
+    local result_no = 0
+
+    local watcher = c:watch('box.id',
+                            function(name, state)
+                                assert(name == 'box.id')
+                                result = state
+                                result_no = result_no + 1
+                            end)
+
+    -- initial state should arrive
+    while result_no < 1 do fiber.sleep(0.001) end
+    t.assert_equals(result, {id = 1,
+                    instance_uuid = '11111111-2222-0000-0000-333333333333',
+                    replicaset_uuid = 'abababab-0000-0000-0000-000000000001'})
+
+    -- error when instance_uuid set dynamically
+    t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically",
+                                function()
+                                    cg.instance:eval(([[
+                                        box.cfg{
+                                            instance_uuid = "%s"
+                                        }
+                                    ]]):format(t.helpers.uuid('1', '2', '4')))
+                                end)
+
+    -- error when replicaset_uuid set dynamically
+    t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically",
+                                function()
+                                    cg.instance:eval(([[
+                                        box.cfg{
+                                            replicaset_uuid = "%s"
+                                        }
+                                    ]]):format(t.helpers.uuid('ab', 2)))
+                                end)
+
+    watcher:unregister()
+    c:close()
+end
diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
index 777c68dff..6afb73b22 100644
--- a/test/unit/mp_error.cc
+++ b/test/unit/mp_error.cc
@@ -94,12 +94,6 @@ enum {
 		sizeof(standard_errors) / sizeof(standard_errors[0]),
 };

-static inline char *
-mp_encode_str0(char *data, const char *str)
-{
-	return mp_encode_str(data, str, strlen(str));
-}
-
 /** Note, not the same as mp_encode_error(). */
 static char *
 mp_encode_mp_error(const struct mp_test_error *e, char *data)
--
2.25.1


^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub
  2022-02-12  1:18 [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub Yan Shtunder via Tarantool-patches
@ 2022-02-12 14:48 ` Vladimir Davydov via Tarantool-patches
  2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches
  1 sibling, 0 replies; 4+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2022-02-12 14:48 UTC (permalink / raw)
  To: Yan Shtunder via Tarantool-patches; +Cc: v.shpilevoy, Yan Shtunder

On Sat, Feb 12, 2022 at 04:18:03AM +0300, Yan Shtunder via Tarantool-patches wrote:
> Adding predefined system events: box.status, box.id, box.election
> and box.schema.
> 
> Closes #6260
> 
> NO_CHANGELOG=test stuff
> NO_DOC=testing stuff

This change definitely needs both a changelog and a doc request.

> ---
> Issue: https://github.com/tarantool/tarantool/issues/6260
> Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3

I recommend to create a pull request from your own GitHub fork instead
of pushing to a dev branch and sending patches to the mailing list,
because dev branches in the main repo are likely to be banned soon and
the mailing list is probably going to be shut down (or left for LuaJIT
patches only).

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6a33203df..d72bd3dad 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -3933,3 +3943,82 @@ box_reset_stat(void)
>  	engine_reset_stat();
>  	space_foreach(box_reset_space_stat, NULL);
>  }
> +
> +void
> +box_broadcast_id(void)
> +{
> +	char buf[1024];
> +	char *w = buf;
> +
> +	struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
> +	uint32_t id = (replica == NULL) ? 0 : replica->id;
> +
> +	w = mp_encode_map(w, 3);
> +	w = mp_encode_str0(w, "id");
> +	w = mp_encode_uint(w, id);
> +	w = mp_encode_str0(w, "instance_uuid");
> +	w = mp_encode_uuid(w, &INSTANCE_UUID);
> +	w = mp_encode_str0(w, "replicaset_uuid");
> +	w = mp_encode_uuid(w, &REPLICASET_UUID);
> +
> +	box_broadcast("box.id", strlen("box.id"), buf, w);

You can use box_broadcast_fmt instead. AFAIU events added in this patch
are not triggered from hot paths so it should be fine performance-wise.

https://github.com/tarantool/tarantool/blob/17f08c821ea44531a2f4882653527b672d8e447f/src/box/watcher.h#L254-L260

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub
  2022-02-12  1:18 [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub Yan Shtunder via Tarantool-patches
  2022-02-12 14:48 ` Vladimir Davydov via Tarantool-patches
@ 2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches
  2022-02-15  7:36   ` Vladimir Davydov via Tarantool-patches
  1 sibling, 1 reply; 4+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-02-14 21:35 UTC (permalink / raw)
  To: Yan Shtunder; +Cc: tarantool-patches

Hi! Thanks for the patch, good job!

See some new comments below!

On 12.02.2022 02:18, Yan Shtunder via Tarantool-patches wrote:
> Adding predefined system events: box.status, box.id, box.election
> and box.schema.
> 
> Closes #6260
> 
> NO_CHANGELOG=test stuff
> NO_DOC=testing stuff

1. This is a new public API. Lets add both a docbot request and a changelog.

> ---
> Issue: https://github.com/tarantool/tarantool/issues/6260
> Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3
> 
>  src/box/alter.cc                              |   9 +
>  src/box/box.cc                                | 103 +++++-
>  src/box/box.h                                 |  18 +
>  src/box/mp_error.cc                           |   6 -
>  src/box/raft.c                                |   1 +
>  src/box/replication.cc                        |  16 +
>  src/lib/msgpuck                               |   2 +-
>  src/lib/raft/raft.c                           |   1 +
>  .../gh_6260_add_builtin_events_test.lua       | 320 ++++++++++++++++++
>  test/unit/mp_error.cc                         |   6 -
>  10 files changed, 462 insertions(+), 20 deletions(-)
>  create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 65c1cb952..e9899b968 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter)
>  {
>      (void)alter;
>      ++schema_version;
> +    box_broadcast_schema();

2. The schema version increment + its broadcast are duplicated 6 times. Lets
wrap it into a function like

	box_schema_version_bump()


3. A response to Vova's comment about box_broadcast_fmt() - while I am not
against that, I am just afraid it might not work with UUIDs. AFAIR we don't
have a formatter for UUID or any other MP_EXT type. You can still try to use
box_broadcast_fmt() for events having only MessagePack-native types though.
Such as schema version and election event.


4. There is a CI failure:
https://github.com/tarantool/tarantool/runs/5164254300?check_suite_focus=true.
Msgpuck submodule was bumped, but the new version doesn't update its test's
.result file.

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6a33203df..d72bd3dad 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc> @@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool;
>   */
>  static struct cbus_endpoint tx_prio_endpoint;
> 
> +static void
> +box_broadcast_status(void);
> +
> +static void
> +title(const char *new_status)
> +{
> +	snprintf(status, sizeof(status), "%s", new_status);
> +	title_set_status(new_status);
> +	title_update();
> +	systemd_snotify("STATUS=%s", status);
> +	/* Checking box.info.status change */

5. The comment narrates the code, doesn't tell anything new. I would
suggest to drop it. The same in other broadcast calls for all events.

> +	box_broadcast_status();
> +}
> +
>  void
>  box_update_ro_summary(void)
>  {
> @@ -3933,3 +3943,82 @@ box_reset_stat(void)
>  	engine_reset_stat();
>  	space_foreach(box_reset_space_stat, NULL);
>  }
> +
> +void
> +box_broadcast_id(void)
> +{
> +	char buf[1024];
> +	char *w = buf;
> +
> +	struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
> +	uint32_t id = (replica == NULL) ? 0 : replica->id;

6. You don't need to get the replica. You can use the global variable
'instance_id'.

> +
> +	w = mp_encode_map(w, 3);
> +	w = mp_encode_str0(w, "id");
> +	w = mp_encode_uint(w, id);
> +	w = mp_encode_str0(w, "instance_uuid");
> +	w = mp_encode_uuid(w, &INSTANCE_UUID);
> +	w = mp_encode_str0(w, "replicaset_uuid");
> +	w = mp_encode_uuid(w, &REPLICASET_UUID);
> +
> +	box_broadcast("box.id", strlen("box.id"), buf, w);
> +
> +	assert((size_t)(w - buf) < 1024);
> +}
> diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
> index 3c4176e59..93109562d 100644
> --- a/src/box/mp_error.cc
> +++ b/src/box/mp_error.cc
> @@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error)
>  	return data_size;
>  }
> 
> -static inline char *
> -mp_encode_str0(char *data, const char *str)
> -{
> -	return mp_encode_str(data, str, strlen(str));
> -}

7. This and the msgpuck submodule update better be done in a separate
preparatory commit.

> -
>  static char *
>  mp_encode_error_one(char *data, const struct error *error)
>  {
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index bbf4b7417..05d5ffb58 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -183,8 +183,12 @@ replica_new(void)
>  		diag_raise();
>  	}
>  	replica->id = 0;
> +	/* Checking box.info.id change */
> +	box_broadcast_id();
>  	replica->anon = false;
>  	replica->uuid = uuid_nil;
> +	/* Checking box.info.uuid change */
> +	box_broadcast_id();

8. You don't need to broadcast on every line where a change happens.
Anyway nothing is sent until a next yield. You only need to broadcast
when all the assignments are done and you are leaving the context.


9. Here not necessarily the instance's own data is changed. It
is a generic replica constructor. It is called for every record in
_cluster space. You only need to broadcast the places where own IDs
of the current instance are changed. The global variables 'instance_id',
'INSTANCE_UUID', and 'REPLICASET_UUID'.

> diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
> index 89cfb3c17..489df235b 100644
> --- a/src/lib/raft/raft.c
> +++ b/src/lib/raft/raft.c
> @@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
>  			 * it manually.
>  			 */
>  			raft->leader = 0;
> +			raft_schedule_broadcast(raft);

10. This should be a separate commit with its own unit test. Sergos already made
a PR, but it lacks a test. I suggest to finish it and rebase this patchset on
top of that commit.

>  			if (raft->is_candidate)
>  				raft_sm_schedule_new_election(raft);
>  		}
> diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua
> new file mode 100644
> index 000000000..3c363fc9a
> --- /dev/null
> +++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua

11. The test should be in box-luatest, not app-luatest.

> @@ -0,0 +1,320 @@
> +local t = require('luatest')
> +local net = require('net.box')
> +local cluster = require('test.luatest_helpers.cluster')
> +local helpers = require('test.luatest_helpers')
> +local fiber = require('fiber')
> +
> +local g = t.group('gh_6260')
> +
> +
> +g.before_test('test_box_status', function(cg)
> +    cg.cluster = cluster:new({})
> +
> +    local box_cfg = {
> +        read_only = false

12. It is false by default anyway. You don't need this config.

> +    }
> +
> +    cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
> +    cg.cluster:add_server(cg.master)
> +    cg.cluster:start()
> +end)
> +
> +
> +g.after_test('test_box_status', function(cg)
> +    cg.cluster.servers = nil
> +    cg.cluster:drop()
> +end)
> +
> +
> +g.test_box_status = function(cg)
> +    local c = net.connect(cg.master.net_box_uri)
> +
> +    local result = {}
> +    local result_no = 0
> +    local watcher = c:watch('box.status',
> +                            function(name, state)
> +                                assert(name == 'box.status')
> +                                result = state
> +                                result_no = result_no + 1
> +                            end)
> +
> +    -- initial state should arrive
> +    while result_no < 1 do fiber.sleep(0.001) end

13. Lets better use t.helpers.retrying(). In other places too. See how it is used
in existing tests for examples.

> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    -- test orphan status appearance
> +    cg.master:eval(([[
> +        box.cfg{
> +            replication = {
> +                "%s",
> +                "%s"
> +            },
> +            replication_connect_timeout = 0.001,
> +            replication_timeout = 0.001,
> +        }
> +    ]]):format(helpers.instance_uri('master'),
> +               helpers.instance_uri('replica')))

14. Well, while this way of doing box.cfg certainly works, you might
want to consider an alternative via :exec(). Probably you didn't find how
to pass parameters into it? Here is how:

	cg.master:exec(function(repl)
		box.cfg{
			replication = repl,
			...
		}
	end, {{helpers.instance_uri('master'),
		helpers.instance_uri('replica')}})

> +    -- here we have 2 notifications: entering ro when can't connect
> +    -- to master and the second one when going orphan
> +    while result_no < 3 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = false, status = 'orphan'})
> +
> +    -- test ro_cfg appearance
> +    cg.master:eval([[
> +        box.cfg{
> +            replication = {},
> +            read_only = true,
> +        }
> +    ]])

15. Lets better use :exec().

> +    while result_no < 4 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = true, status = 'running'})
> +
> +    -- reset to rw
> +    cg.master:eval([[
> +        box.cfg{
> +            read_only = false,
> +        }
> +    ]])
> +    while result_no < 5 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    -- turning manual election mode puts into ro
> +    cg.master:eval([[
> +        box.cfg{
> +            election_mode = 'manual',
> +        }
> +    ]])
> +    while result_no < 6 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = false, status = 'running'})
> +
> +    -- promotion should turn rm
> +    cg.master:eval([[
> +        box.ctl.promote()
> +    ]])
> +    while result_no < 7 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    watcher:unregister()
> +    c:close()
> +end
> +
> +

16. Extra empty line. In some other places too. Please, lets remove them.

> +g.before_test('test_box_election', function(cg)
> +
> +    cg.cluster = cluster:new({})
> +
> +    local box_cfg = {
> +        replication = {
> +                        helpers.instance_uri('instance_', 1);
> +                        helpers.instance_uri('instance_', 2);
> +                        helpers.instance_uri('instance_', 3);

17. Too long indentation. Also you could write simpler:

	helpers.instance_uri('instance_1')
	helpers.instance_uri('instance_2')
	...

No need to pass the numbers separately.

> +        },
> +        replication_connect_quorum = 0,
> +        election_mode = 'off',
> +        replication_synchro_quorum = 2,
> +        replication_synchro_timeout = 1,
> +        replication_timeout = 0.25,
> +        election_timeout = 0.25,
> +    }
> +
> +    cg.instance_1 = cg.cluster:build_server(
> +        {alias = 'instance_1', box_cfg = box_cfg})
> +
> +    cg.instance_2 = cg.cluster:build_server(
> +        {alias = 'instance_2', box_cfg = box_cfg})
> +
> +    cg.instance_3 = cg.cluster:build_server(
> +        {alias = 'instance_3', box_cfg = box_cfg})
> +
> +    cg.cluster:add_server(cg.instance_1)
> +    cg.cluster:add_server(cg.instance_2)
> +    cg.cluster:add_server(cg.instance_3)
> +    cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
> +end)
> +
> +
> +g.after_test('test_box_election', function(cg)
> +    cg.cluster.servers = nil
> +    cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
> +end)
> +
> +
> +g.test_box_election = function(cg)
> +    local c = {}
> +    c[1] = net.connect(cg.instance_1.net_box_uri)
> +    c[2] = net.connect(cg.instance_2.net_box_uri)
> +    c[3] = net.connect(cg.instance_3.net_box_uri)
> +
> +    local res = {}
> +    local res_n = {0, 0, 0}
> +
> +    for i = 1, 3 do
> +        c[i]:watch('box.election',
> +                   function(n, s)
> +                       t.assert_equals(n, 'box.election')
> +                       res[i] = s
> +                       res_n[i] = res_n[i] + 1
> +                   end)
> +    end
> +    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> +
> +    -- verify all instances are in the same state
> +    t.assert_equals(res[1], res[2])
> +    t.assert_equals(res[1], res[3])
> +
> +    -- wait for elections to complete, verify leader is the instance_1
> +    -- trying to avoid the exact number of term - it can vary
> +    local instance1_id = cg.instance_1:eval("return box.info.id")

18. You can do cg.instance_1:instance_id().

> +
> +    cg.instance_1:eval("box.cfg{election_mode='candidate'}")
> +    cg.instance_2:eval("box.cfg{election_mode='voter'}")
> +    cg.instance_3:eval("box.cfg{election_mode='voter'}")
> +
> +    cg.instance_1:wait_election_leader_found()
> +    cg.instance_2:wait_election_leader_found()
> +    cg.instance_3:wait_election_leader_found()
> +
> +    t.assert_equals(res[1].leader, instance1_id)
> +    t.assert_equals(res[1].is_ro, false)
> +    t.assert_equals(res[1].role, 'leader')

19. You can use t.assert_covers() if you want not to check a
specific term number.

> +    t.assert_equals(res[2].leader, instance1_id)
> +    t.assert_equals(res[2].is_ro, true)
> +    t.assert_equals(res[2].role, 'follower')
> +    t.assert_equals(res[3].leader, instance1_id)
> +    t.assert_equals(res[3].is_ro, true)
> +    t.assert_equals(res[3].role, 'follower')
> +    t.assert_equals(res[1].term, res[2].term)
> +    t.assert_equals(res[1].term, res[3].term)
> +
> +    -- check the stepping down is working
> +    res_n = {0, 0, 0}
> +    cg.instance_1:eval("box.cfg{election_mode='voter'}")
> +    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> +
> +    local expected = {is_ro = true, role = 'follower', term = res[1].term,
> +                      leader = 0}
> +    t.assert_equals(res[1], expected)
> +    t.assert_equals(res[2], expected)
> +    t.assert_equals(res[3], expected)
> +
> +    c[1]:close()
> +    c[2]:close()
> +    c[3]:close()
> +end

<...>

> +g.test_box_id = function(cg)
> +    local c = net.connect(cg.instance.net_box_uri)
> +
> +    local result = {}
> +    local result_no = 0
> +
> +    local watcher = c:watch('box.id',
> +                            function(name, state)
> +                                assert(name == 'box.id')
> +                                result = state
> +                                result_no = result_no + 1
> +                            end)
> +
> +    -- initial state should arrive
> +    while result_no < 1 do fiber.sleep(0.001) end
> +    t.assert_equals(result, {id = 1,
> +                    instance_uuid = '11111111-2222-0000-0000-333333333333',
> +                    replicaset_uuid = 'abababab-0000-0000-0000-000000000001'})

20. Lets better use cg.instance.box_cfg.instance_uuid and
cg.instance.box_cfg.replicaset_uuid instead of the hardcoded values.

> +
> +    -- error when instance_uuid set dynamically
> +    t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically",
> +                                function()
> +                                    cg.instance:eval(([[
> +                                        box.cfg{
> +                                            instance_uuid = "%s"
> +                                        }
> +                                    ]]):format(t.helpers.uuid('1', '2', '4')))
> +                                end)

21. This and the case below don't seem related to your patch. This was
banned beforehand and is already tested. I would suggest to drop these.

> +
> +    -- error when replicaset_uuid set dynamically
> +    t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically",
> +                                function()
> +                                    cg.instance:eval(([[
> +                                        box.cfg{
> +                                            replicaset_uuid = "%s"
> +                                        }
> +                                    ]]):format(t.helpers.uuid('ab', 2)))
> +                                end)
> +
> +    watcher:unregister()
> +    c:close()
> +end
> diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
> index 777c68dff..6afb73b22 100644
> --- a/test/unit/mp_error.cc
> +++ b/test/unit/mp_error.cc
> @@ -94,12 +94,6 @@ enum {
>  		sizeof(standard_errors) / sizeof(standard_errors[0]),
>  };
> 
> -static inline char *
> -mp_encode_str0(char *data, const char *str)
> -{
> -	return mp_encode_str(data, str, strlen(str));
> -}

22. Should be in the separate commit which bumps msgpuck submodule.

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub
  2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches
@ 2022-02-15  7:36   ` Vladimir Davydov via Tarantool-patches
  0 siblings, 0 replies; 4+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2022-02-15  7:36 UTC (permalink / raw)
  To: Vladislav Shpilevoy via Tarantool-patches; +Cc: Yan Shtunder

On Mon, Feb 14, 2022 at 10:35:37PM +0100, Vladislav Shpilevoy via Tarantool-patches wrote:
> 3. A response to Vova's comment about box_broadcast_fmt() - while I am not
> against that, I am just afraid it might not work with UUIDs. AFAIR we don't
> have a formatter for UUID or any other MP_EXT type. You can still try to use
> box_broadcast_fmt() for events having only MessagePack-native types though.
> Such as schema version and election event.

Good point. I didn't think about UUID. It'd be nice if we could use
mp_format to format MP_EXT, like this

  mp_format("%eu", &REPLICASET_UUID);

but it's probably out of the scope of this patch.

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2022-02-15  7:36 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-02-12  1:18 [Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub Yan Shtunder via Tarantool-patches
2022-02-12 14:48 ` Vladimir Davydov via Tarantool-patches
2022-02-14 21:35 ` Vladislav Shpilevoy via Tarantool-patches
2022-02-15  7:36   ` Vladimir Davydov via Tarantool-patches

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox