* [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
@ 2022-03-24 14:01 Yan Shtunder via Tarantool-patches
2022-03-25 23:44 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 1 reply; 8+ messages in thread
From: Yan Shtunder via Tarantool-patches @ 2022-03-24 14:01 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches, Yan Shtunder
Added predefined system events: box.status, box.id, box.election and
box.schema.
Closes #6260
@TarantoolBot document
Title: Built-in events for pub/sub
Built-in events are needed, first of all, in order to learn who is the
master, unless it is defined in an application specific way. Knowing who
is the master is necessary to send changes to a correct instance, and
probably make reads of the most actual data if it is important. Also
defined more built-in events for other mutable properties like leader
state change, his election role and election term, schema version change
and instance state.
Built-in events have a special naming schema - their name always starts
with box.. The prefix is reserved for built-in events. Creating new events
with this prefix is banned. Below is a list of all the events + their names
and values:
1. box.id
Description - identification of the instance. Changes are extra rare. Some
values never change or change only once. For example, instance UUID never
changes after the first box.cfg. But is not known before box.cfg is called.
Replicaset UUID is unknown until the instance joins to a replicaset or
bootsa new one, but the events are supposed to start working before that -
right at listen launch. Instance numeric ID is known only after
registration. On anonymous replicas is 0 until they are registered
officially.
Value - {
MP_STR “id”: MP_UINT; box.info.id,
MP_STR “instance_uuid”: MP_UUID; box.info.uuid,
MP_STR “replicaset_uuid”: MP_UUID box.info.cluster.uuid,
}
2. box.status
Description - generic blob about instance status. Its most commonly used
and not frequently changed config options and box.info fields.
Value - {
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “is_ro_cfg”: MP_BOOL box.cfg.read_only,
MP_STR “status”: MP_STR box.info.status,
}
3. box.election
Description - all the needed parts of box.info.election needed to find who
is the most recent writable leader.
Value - {
MP_STR “term”: MP_UINT box.info.election.term,
MP_STR “role”: MP_STR box.info.election.state,
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “leader”: MP_UINT box.info.election.leader,
}
4. box.schema
Description - schema-related data. Currently it is only version.
Value - {
MP_STR “version”: MP_UINT schema_version,
}
Built-in events can't be override. Meaning, users can't be able to call
box.broadcast(‘box.id’, any_data) etc.
The events are available from the very beginning as not MP_NIL. It's
necessary for supported local subscriptions. Otherwise no way to detect
whether an event is even supported at all by this Tarantool version. If
events are broadcast before box.cfg{}, then the following values will
available:
box.id = {}
box.schema = {}
box.status = {}
box.election = {}
This way the users will be able to distinguish an event being not supported
at all from box.cfg{} being not called yet. Otherwise they would need to
parse _TARANTOOL version string locally and peer_version in netbox.
Example usage:
* Client:
```lua
conn = net.box.connect(URI)
-- Subscribe to updates of key 'box.id'
w = conn:watch('box.id', function(key, value)
assert(key == 'box.id')
-- do something with value
end)
-- or to updates of key 'box.status'
w = conn:watch('box.status', function(key, value)
assert(key == 'box.status')
-- do something with value
end)
-- or to updates of key 'box.election'
w = conn:watch('box.election', function(key, value)
assert(key == 'box.election')
-- do something with value
end)
-- or to updates of key 'box.schema'
w = conn:watch('box.schema', function(key, value)
assert(key == 'box.schema')
-- do something with value
end)
-- Unregister the watcher when it's no longer needed.
w:unregister()
```
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4
Hi, thanks for the useful review!
> Hi! Good job on the fixes!
> See 3 comments below, almost finished!
>
> > Built-in events can't be override. Meaning, users can't be able to call
> > box.broadcast(‘box.id’, any_data) etc.
> >
> > The events are available from the very beginning as not MP_NIL. It will come
> > in handy when local subscriptions will be supported (not via the network).
>
> 1. 'when local subscriptions will be supported (not via the network).' - they
> are already supported. I wrote this sentence before Vova managed to implement
> `box.watch()`.
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index 45a6b7f41..342f893b8 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -57,9 +57,17 @@
> > #include "sequence.h"
> > #include "sql.h"
> > #include "constraint_id.h"
> > +#include "box.h"
> >
> > /* {{{ Auxiliary functions and methods. */
> >
> > +static void
> > +box_schema_version_bump(void)
> > +{
> > + ++schema_version;
> > + box_broadcast_schema(false);
>
> 2. The main problem with adding flags as booleans in function arguments is
> that when you call the function, you can't tell from the call what does the
> argument mean. For instance, if I don't know/remember the function's signature,
> I can't tell what 'false' stands for.
>
> Normally such cases are ironed out by either splitting the function in 2
> (inside they can call some third function with this flag as an argument), or
> by just not using the flag.
>
> Here, for example, the following can be done - just use these functions only
> after box.cfg. Before that, as a first step, you broadcast the sys events
> explicitly, manually:
>
> ====================
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 342f893b8..a7598a88f 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -65,7 +65,7 @@ static void
> box_schema_version_bump(void)
> {
> ++schema_version;
> - box_broadcast_schema(false);
> + box_broadcast_schema();
> }
>
> static int
> @@ -4264,7 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
> if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
> return -1;
> REPLICASET_UUID = uu;
> - box_broadcast_id(false);
> + box_broadcast_id();
> say_info("cluster uuid %s", tt_uuid_str(&uu));
> } else if (strcmp(key, "version") == 0) {
> if (new_tuple != NULL) {
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 9be1a5bbf..62693b3db 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -155,7 +155,7 @@ static struct fiber_pool tx_fiber_pool;
> static struct cbus_endpoint tx_prio_endpoint;
>
> static void
> -box_broadcast_status(bool is_outside_box_cfg);
> +box_broadcast_status(void);
>
> static void
> title(const char *new_status)
> @@ -164,7 +164,7 @@ title(const char *new_status)
> title_set_status(new_status);
> title_update();
> systemd_snotify("STATUS=%s", status);
> - box_broadcast_status(false);
> + box_broadcast_status();
> }
>
> void
> @@ -180,8 +180,8 @@ box_update_ro_summary(void)
> if (is_ro_summary)
> engine_switch_to_ro();
> fiber_cond_broadcast(&ro_cond);
> - box_broadcast_status(false);
> - box_broadcast_election(false);
> + box_broadcast_status();
> + box_broadcast_election();
> }
>
> const char *
> @@ -3367,7 +3367,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
> else
> tt_uuid_create(&INSTANCE_UUID);
>
> - box_broadcast_id(false);
> + box_broadcast_id();
> say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
>
> /*
> @@ -3646,10 +3646,15 @@ box_init(void)
> sequence_init();
> box_raft_init();
> box_watcher_init();
> - box_broadcast_id(true);
> - box_broadcast_status(true);
> - box_broadcast_election(true);
> - box_broadcast_schema(true);
> +
> + /*
> + * Default built-in events to help users distinguish an event being not
> + * supported from box.cfg not being called yet.
> + */
> + box_broadcast_fmt("box.status", "{}");
> + box_broadcast_fmt("box.election", "{}");
> + box_broadcast_fmt("box.id", "{}");
> + box_broadcast_fmt("box.schema", "{}");
> }
>
> bool
> @@ -3952,22 +3957,17 @@ box_reset_stat(void)
> }
>
> void
> -box_broadcast_id(bool is_outside_box_cfg)
> +box_broadcast_id(void)
> {
> char buf[1024];
> char *w = buf;
> -
> - if (is_outside_box_cfg)
> - w = mp_encode_map(w, 0);
> - else {
> - w = mp_encode_map(w, 3);
> - w = mp_encode_str0(w, "id");
> - w = mp_encode_uint(w, instance_id);
> - w = mp_encode_str0(w, "instance_uuid");
> - w = mp_encode_uuid(w, &INSTANCE_UUID);
> - w = mp_encode_str0(w, "replicaset_uuid");
> - w = mp_encode_uuid(w, &REPLICASET_UUID);
> - }
> + w = mp_encode_map(w, 3);
> + w = mp_encode_str0(w, "id");
> + w = mp_encode_uint(w, instance_id);
> + w = mp_encode_str0(w, "instance_uuid");
> + w = mp_encode_uuid(w, &INSTANCE_UUID);
> + w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
>
> box_broadcast("box.id", strlen("box.id"), buf, w);
>
> @@ -3975,22 +3975,17 @@ box_broadcast_id(bool is_outside_box_cfg)
> }
>
> static void
> -box_broadcast_status(bool is_outside_box_cfg)
> +box_broadcast_status(void)
> {
> char buf[1024];
> char *w = buf;
> -
> - if (is_outside_box_cfg)
> - w = mp_encode_map(w, 0);
> - else {
> - w = mp_encode_map(w, 3);
> - w = mp_encode_str0(w, "is_ro");
> - w = mp_encode_bool(w, box_is_ro());
> - w = mp_encode_str0(w, "is_ro_cfg");
> - w = mp_encode_bool(w, cfg_geti("read_only"));
> - w = mp_encode_str0(w, "status");
> - w = mp_encode_str0(w, box_status());
> - }
> + w = mp_encode_map(w, 3);
> + w = mp_encode_str0(w, "is_ro");
> + w = mp_encode_bool(w, box_is_ro());
> + w = mp_encode_str0(w, "is_ro_cfg");
> + w = mp_encode_bool(w, cfg_geti("read_only"));
> + w = mp_encode_str0(w, "status");
> + w = mp_encode_str0(w, box_status());
>
> box_broadcast("box.status", strlen("box.status"), buf, w);
>
> @@ -3998,26 +3993,21 @@ box_broadcast_status(bool is_outside_box_cfg)
> }
>
> void
> -box_broadcast_election(bool is_outside_box_cfg)
> +box_broadcast_election(void)
> {
> struct raft *raft = box_raft();
>
> char buf[1024];
> char *w = buf;
> -
> - if (is_outside_box_cfg)
> - w = mp_encode_map(w, 0);
> - else {
> - w = mp_encode_map(w, 4);
> - w = mp_encode_str0(w, "term");
> - w = mp_encode_uint(w, raft->term);
> - w = mp_encode_str0(w, "role");
> - w = mp_encode_str0(w, raft_state_str(raft->state));
> - w = mp_encode_str0(w, "is_ro");
> - w = mp_encode_bool(w, box_is_ro());
> - w = mp_encode_str0(w, "leader");
> - w = mp_encode_uint(w, raft->leader);
> - }
> + w = mp_encode_map(w, 4);
> + w = mp_encode_str0(w, "term");
> + w = mp_encode_uint(w, raft->term);
> + w = mp_encode_str0(w, "role");
> + w = mp_encode_str0(w, raft_state_str(raft->state));
> + w = mp_encode_str0(w, "is_ro");
> + w = mp_encode_bool(w, box_is_ro());
> + w = mp_encode_str0(w, "leader");
> + w = mp_encode_uint(w, raft->leader);
>
> box_broadcast("box.election", strlen("box.election"), buf, w);
>
> @@ -4025,18 +4015,13 @@ box_broadcast_election(bool is_outside_box_cfg)
> }
>
> void
> -box_broadcast_schema(bool is_outside_box_cfg)
> +box_broadcast_schema(void)
> {
> char buf[1024];
> char *w = buf;
> -
> - if (is_outside_box_cfg)
> - w = mp_encode_map(w, 0);
> - else {
> - w = mp_encode_map(w, 1);
> - w = mp_encode_str0(w, "version");
> - w = mp_encode_uint(w, box_schema_version());
> - }
> + w = mp_encode_map(w, 1);
> + w = mp_encode_str0(w, "version");
> + w = mp_encode_uint(w, box_schema_version());
>
> box_broadcast("box.schema", strlen("box.schema"), buf, w);
>
> diff --git a/src/box/box.h b/src/box/box.h
> index 41f60c0ea..766568368 100644
> --- a/src/box/box.h
> +++ b/src/box/box.h
> @@ -567,19 +567,19 @@ boxk(int type, uint32_t space_id, const char *format, ...);
> * Broadcast the identification of the instance
> */
> void
> -box_broadcast_id(bool is_outside_box_cfg);
> +box_broadcast_id(void);
>
> /**
> * Broadcast the current election state of RAFT machinery
> */
> void
> -box_broadcast_election(bool is_outside_box_cfg);
> +box_broadcast_election(void);
>
> /**
> * Broadcast the current schema version
> */
> void
> -box_broadcast_schema(bool is_outside_box_cfg);
> +box_broadcast_schema(void);
>
> #if defined(__cplusplus)
> } /* extern "C" */
> diff --git a/src/box/raft.c b/src/box/raft.c
> index 259be639e..efc7e1038 100644
> --- a/src/box/raft.c
> +++ b/src/box/raft.c
> @@ -170,7 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
> * writable only after it clears its synchro queue.
> */
> box_update_ro_summary();
> - box_broadcast_election(false);
> + box_broadcast_election();
> if (raft->state != RAFT_STATE_LEADER)
> return 0;
> /*
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 732fe6211..0da256721 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -247,7 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
> /* Assign local replica id */
> assert(instance_id == REPLICA_ID_NIL);
> instance_id = replica_id;
> - box_broadcast_id(false);
> + box_broadcast_id();
> } else if (replica->anon) {
> /*
> * Set replica gc on its transition from
> @@ -288,7 +288,7 @@ replica_clear_id(struct replica *replica)
> /* See replica_check_id(). */
> assert(replicaset.is_joining);
> instance_id = REPLICA_ID_NIL;
> - box_broadcast_id(false);
> + box_broadcast_id();
> }
> replica->id = REPLICA_ID_NIL;
> say_info("removed replica %s", tt_uuid_str(&replica->uuid));
> ====================
>
> Now on each box_broadcast_<sys_event>() call you can tell that it
> just broadcasts the event. No flags, not other parameters, simple to
> read.
> > diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> > new file mode 100644
> > index 000000000..4170ce420
> > --- /dev/null
> > +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
>
> <...>
>
> > +g.test_sys_events_no_override = function(cg)
> > + local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
> > +
> + for i = 1, #sys_events do
> + t.assert_error_msg_content_equals("System event can't be override",
> + function()
> + cg.master:exec(function(sys_events, i)
> + box.broadcast(sys_events[i], 'any_data')
> + end, {sys_events, i})
> + end)
> + end
>
> 3. Lua supports range loops. You can do this:
>
> ====================
> @@ -39,12 +39,12 @@ end)
> g.test_sys_events_no_override = function(cg)
> local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
>
> - for i = 1, #sys_events do
> + for _, key in pairs(sys_events) do
> t.assert_error_msg_content_equals("System event can't be override",
> function()
> - cg.master:exec(function(sys_events, i)
> - box.broadcast(sys_events[i], 'any_data')
> - end, {sys_events, i})
> + cg.master:exec(function(key)
> + box.broadcast(key, 'any_data')
> + end, {key})
> end)
> end
> ====================
>
> Up to you.
>
>
> Technically everything looks good except for the nits above. Now you also
> should comb the patch to make the linters happy: luacheck and checkpatch.
>
> To run luacheck you need to install it. How - depends on your system. In
> Linux I think it would be something like
>
> sudo apt install -y lua5.1 luarocks
> sudo luarocks install luacheck
> Or
>
> tarantoolctl rocks install luacheck (I didn't validate this)
>
> Then you run it with our config. This is how I do it in tarantool's root
> dir (when I want to check 'test/' folder):
>
> luacheck --codes --config .luacheckrc test/
>
> Then for checkpatch you need to clone the repository:
>
> https://github.com/tarantool/checkpatch
>
> For instance, I cloned it just in a neighbour folder near my tarantool/
> repo. Then from inside of tarantool/ I do this:
>
> ../checkpatch/checkpatch.pl --color=always --git head
>
> to check the latest commit. I suppose you can also specify head~1 for a
> previous one, head~2 for pre-previous and so on. Maybe it also supports
> git hash ranges, I didn't check yet. On your commit this linter raises some
> errors.
1. I fixed the bugs following your comments. Now, I got OK from linter
and luachek
====
.../unreleased/gh-6260-add-builtin-events.md | 4 +
src/box/alter.cc | 21 +-
src/box/box.cc | 121 ++++++-
src/box/box.h | 18 +
src/box/lua/watcher.c | 2 +
src/box/raft.c | 1 +
src/box/replication.cc | 2 +
.../gh_6260_add_builtin_events_test.lua | 323 ++++++++++++++++++
8 files changed, 478 insertions(+), 14 deletions(-)
create mode 100644 changelogs/unreleased/gh-6260-add-builtin-events.md
create mode 100644 test/box-luatest/gh_6260_add_builtin_events_test.lua
diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md
new file mode 100644
index 000000000..1d13e410f
--- /dev/null
+++ b/changelogs/unreleased/gh-6260-add-builtin-events.md
@@ -0,0 +1,4 @@
+## feature/core
+
+* Added predefined system events: `box.status`, `box.id`, `box.election`
+ and `box.schema` (gh-6260)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 45a6b7f41..a7598a88f 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,9 +57,17 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
+#include "box.h"
/* {{{ Auxiliary functions and methods. */
+static void
+box_schema_version_bump(void)
+{
+ ++schema_version;
+ box_broadcast_schema();
+}
+
static int
access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid,
enum schema_object_type type, enum priv_type priv_type)
@@ -1702,7 +1710,7 @@ void
UpdateSchemaVersion::alter(struct alter_space *alter)
{
(void)alter;
- ++schema_version;
+ box_schema_version_bump();
}
/**
@@ -2277,7 +2285,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* AlterSpaceOps are registered in case of space
* create.
*/
- ++schema_version;
+ box_schema_version_bump();
/*
* So may happen that until the DDL change record
* is written to the WAL, the space is used for
@@ -2391,7 +2399,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* deleting the space from the space_cache, since no
* AlterSpaceOps are registered in case of space drop.
*/
- ++schema_version;
+ box_schema_version_bump();
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
if (on_commit == NULL)
@@ -4256,6 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
+ box_broadcast_id();
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
@@ -5102,7 +5111,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5629,7 +5638,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5885,7 +5894,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 5414f3d9d..7f622c587 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
#include "audit.h"
#include "trivia/util.h"
#include "version.h"
+#include "mp_uuid.h"
static char status[64] = "unknown";
@@ -98,14 +99,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
-
const struct vclock *box_vclock = &replicaset.vclock;
/**
@@ -161,6 +154,28 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+builtin_events_init(void);
+
+/**
+ * Broadcast the current instance status
+ */
+static void
+box_broadcast_status(void);
+
+/**
+ * Generate and update the instance status title
+ */
+static void
+title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ box_broadcast_status();
+}
+
void
box_update_ro_summary(void)
{
@@ -174,6 +189,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3359,6 +3376,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
else
tt_uuid_create(&INSTANCE_UUID);
+ box_broadcast_id();
say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
/*
@@ -3637,6 +3655,12 @@ box_init(void)
sequence_init();
box_raft_init();
box_watcher_init();
+
+ /*
+ * Default built-in events to help users distinguish an event
+ * being not supported from box.cfg not being called yet.
+ */
+ builtin_events_init();
}
bool
@@ -3937,3 +3961,84 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+static void
+builtin_events_init(void)
+{
+ box_broadcast_fmt("box.id", "{}");
+ box_broadcast_fmt("box.schema", "{}");
+ box_broadcast_fmt("box.status", "{}");
+ box_broadcast_fmt("box.election", "{}");
+}
+
+void
+box_broadcast_id(void)
+{
+ char buf[1024];
+ char *w = buf;
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, instance_id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
+
+ box_broadcast("box.id", strlen("box.id"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(void)
+{
+ char buf[1024];
+ char *w = buf;
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election(void)
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(void)
+{
+ char buf[1024];
+ char *w = buf;
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
+
+ box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(void);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(void);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(void);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/watcher.c b/src/box/lua/watcher.c
index 01a307f1d..dd843863e 100644
--- a/src/box/lua/watcher.c
+++ b/src/box/lua/watcher.c
@@ -175,6 +175,8 @@ lbox_broadcast(struct lua_State *L)
return luaL_error(L, "Usage: box.broadcast(key[, value])");
size_t key_len;
const char *key = luaL_checklstring(L, 1, &key_len);
+ if (strncmp(key, "box.", 4) == 0)
+ return luaL_error(L, "System event can't be override");
struct ibuf *ibuf = cord_ibuf_take();
const char *data = NULL;
const char *data_end = NULL;
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..0da256721 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
+ box_broadcast_id();
} else if (replica->anon) {
/*
* Set replica gc on its transition from
@@ -287,6 +288,7 @@ replica_clear_id(struct replica *replica)
/* See replica_check_id(). */
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
+ box_broadcast_id();
}
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..b3a67ee56
--- /dev/null
+++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,323 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local server = require('test.luatest_helpers.server')
+
+local g = t.group('gh_6260')
+
+g.test_subscriptions_outside_box_cfg = function()
+ local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
+
+ for _, val in ipairs(sys_events) do
+ local result = {}
+ local result_no = 0
+ local watcher = box.watch(val,
+ function(name, state)
+ assert(name == val)
+ result = state
+ result_no = result_no + 1
+ end)
+
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+ t.assert_equals(result, {})
+ watcher:unregister()
+ end
+end
+
+g.before_test('test_sys_events_no_override', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_sys_events_no_override', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_sys_events_no_override = function(cg)
+ local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
+
+ for _, val in ipairs(sys_events) do
+ t.assert_error_msg_content_equals("System event can't be override",
+ function()
+ cg.master:exec(function(key)
+ box.broadcast(key, 'any_data')
+ end, {val})
+ end)
+ end
+end
+
+g.before_test('test_box_status', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_status', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_status = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+ local watcher = c:watch('box.status',
+ function(name, state)
+ assert(name == 'box.status')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- test orphan status appearance
+ cg.master:exec(function(repl)
+ box.cfg{
+ replication = repl,
+ replication_connect_timeout = 0.001,
+ replication_timeout = 0.001,
+ }
+ end, {{server.build_instance_uri('master'),
+ server.build_instance_uri('replica')}})
+ -- here we have 2 notifications: entering ro when can't connect
+ -- to master and the second one when going orphan
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 3) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+ -- test ro_cfg appearance
+ cg.master:exec(function()
+ box.cfg{
+ replication = {},
+ read_only = true,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 4) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+ -- reset to rw
+ cg.master:exec(function()
+ box.cfg{
+ read_only = false,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 5) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- turning manual election mode puts into ro
+ cg.master:exec(function()
+ box.cfg{
+ election_mode = 'manual',
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 6) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+ -- promotion should turn rm
+ cg.master:exec(function() box.ctl.promote() end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 7) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_election', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ server.build_instance_uri('instance_1'),
+ server.build_instance_uri('instance_2'),
+ server.build_instance_uri('instance_3'),
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'off',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.after_test('test_box_election', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.test_box_election = function(cg)
+ local c = {}
+ c[1] = net.connect(cg.instance_1.net_box_uri)
+ c[2] = net.connect(cg.instance_2.net_box_uri)
+ c[3] = net.connect(cg.instance_3.net_box_uri)
+
+ local res = {}
+ local res_n = {0, 0, 0}
+
+ for i = 1, 3 do
+ c[i]:watch('box.election',
+ function(n, s)
+ t.assert_equals(n, 'box.election')
+ res[i] = s
+ res_n[i] = res_n[i] + 1
+ end)
+ end
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ -- verify all instances are in the same state
+ t.assert_equals(res[1], res[2])
+ t.assert_equals(res[1], res[3])
+
+ -- wait for elections to complete, verify leader is the instance_1
+ -- trying to avoid the exact number of term - it can vary
+ local instance1_id = cg.instance_1:instance_id()
+
+ cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
+ cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
+ cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_covers(res[1],
+ {leader = instance1_id, is_ro = false, role = 'leader'})
+ t.assert_covers(res[2],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+ t.assert_covers(res[3],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+
+ -- verify all terms are in the same state
+ t.assert_equals(res[1].term, res[2].term)
+ t.assert_equals(res[1].term, res[3].term)
+
+ -- check the stepping down is working
+ res_n = {0, 0, 0}
+ cg.instance_1:exec(function() box.cfg{election_mode='voter'} end)
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ local expected = {is_ro = true, role = 'follower', term = res[1].term,
+ leader = 0}
+ t.assert_covers(res, {expected, expected, expected})
+
+ c[1]:close()
+ c[2]:close()
+ c[3]:close()
+end
+
+g.before_test('test_box_schema', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_schema', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_schema = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+ local version = 0
+ local version_n = 0
+
+ local watcher = c:watch('box.schema',
+ function(n, s)
+ assert(n == 'box.schema')
+ version = s.version
+ version_n = version_n + 1
+ end)
+
+ cg.master:exec(function() box.schema.create_space('p') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ -- first version change, use it as initial value
+ local init_version = version
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:create_index('i') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 1) end)
+ t.assert_equals(version, init_version + 1)
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:drop() end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ t.assert_equals(version, init_version + 3)
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_id', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replicaset_uuid = t.helpers.uuid('ab', 1),
+ instance_uuid = t.helpers.uuid('1', '2', '3')
+ }
+
+ cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.instance)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_id', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_id = function(cg)
+ local c = net.connect(cg.instance.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+
+ local watcher = c:watch('box.id',
+ function(name, state)
+ assert(name == 'box.id')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+ t.assert_equals(result, {id = 1,
+ instance_uuid = cg.instance.box_cfg.instance_uuid,
+ replicaset_uuid = cg.instance.box_cfg.replicaset_uuid})
+
+ watcher:unregister()
+ c:close()
+end
--
2.25.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
@ 2022-03-22 11:51 Yan Shtunder via Tarantool-patches
2022-03-22 22:55 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 1 reply; 8+ messages in thread
From: Yan Shtunder via Tarantool-patches @ 2022-03-22 11:51 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches, Yan Shtunder
Added predefined system events: box.status, box.id, box.election and
box.schema.
Closes #6260
@TarantoolBot document
Title: Built-in events for pub/sub
Built-in events are needed, first of all, in order to learn who is the
master, unless it is defined in an application specific way. Knowing who
is the master is necessary to send changes to a correct instance, and
probably make reads of the most actual data if it is important. Also
defined more built-in events for other mutable properties like leader
state change, his election role and election term, schema version change
and instance state.
Built-in events have a special naming schema - their name always starts
with box.. The prefix is reserved for built-in events. Creating new events
with this prefix is banned. Below is a list of all the events + their names
and values:
1. box.id
Description - identification of the instance. Changes are extra rare. Some
values never change or change only once. For example, instance UUID never
changes after the first box.cfg. But is not known before box.cfg is called.
Replicaset UUID is unknown until the instance joins to a replicaset or boots
a new one, but the events are supposed to start working before that - right
at listen launch. Instance numeric ID is known only after registration. On
anonymous replicas is 0 until they are registered officially.
Value - {
MP_STR “id”: MP_UINT; box.info.id,
MP_STR “instance_uuid”: MP_UUID; box.info.uuid,
MP_STR “replicaset_uuid”: MP_UUID box.info.cluster.uuid,
}
2. box.status
Description - generic blob about instance status. Its most commonly used and
not frequently changed config options and box.info fields.
Value - {
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “is_ro_cfg”: MP_BOOL box.cfg.read_only,
MP_STR “status”: MP_STR box.info.status,
}
3. box.election
Description - all the needed parts of box.info.election needed to find who is
the most recent writable leader.
Value - {
MP_STR “term”: MP_UINT box.info.election.term,
MP_STR “role”: MP_STR box.info.election.state,
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “leader”: MP_UINT box.info.election.leader,
}
4. box.schema
Description - schema-related data. Currently it is only version.
Value - {
MP_STR “version”: MP_UINT schema_version,
}
Built-in events can't be override. Meaning, users can't be able to call
box.broadcast(‘box.id’, any_data) etc.
The events are available from the very beginning as not MP_NIL. It will come
in handy when local subscriptions will be supported (not via the network).
Otherwise no way to detect whether an event is evenе supported at all by
this Tarantool version. If events are broadcast before box.cfg{}, then the
following values will available:
box.id = {}
box.schema = {}
box.status = {}
box.election = {}
This way the users will be able to distinguish an event being not supported
at all from box.cfg{} being not called yet. Otherwise they would need to
parse _TARANTOOL version string locally and peer_version in netbox.
Example usage:
* Client:
```lua
conn = net.box.connect(URI)
-- Subscribe to updates of key 'box.id'
w = conn:watch('box.id', function(key, value)
assert(key == 'box.id')
-- do something with value
end)
-- or to updates of key 'box.status'
w = conn:watch('box.status', function(key, value)
assert(key == 'box.status')
-- do something with value
end)
-- or to updates of key 'box.election'
w = conn:watch('box.election', function(key, value)
assert(key == 'box.election')
-- do something with value
end)
-- or to updates of key 'box.schema'
w = conn:watch('box.schema', function(key, value)
assert(key == 'box.schema')
-- do something with value
end)
-- Unregister the watcher when it's no longer needed.
w:unregister()
```
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4
Hi, thanks for the review!
> Hi! Thanks for the fixes!
> In the design document it was also said:
>
> The events must be available from the very beginning as not MP_NIL.
> It will come in handy when local subscriptions will be supported (not
> via the network). Otherwise no way to detect whether an event is even
> supported at all by this Tarantool version. By design absence of a
> subscription key equals MP_NIL and no events are delivered ever except
> this MP_NIL itself. It happens both if it is not broadcast and if it
> simply does not exist in the code.
> And
>
> The builtin events should not allow userspace broadcast. Meaning, users
> should not be able to call box.event.broadcast(‘box.status’, any_data).
> In the current implementation the built-in events override is still
> allowed, and before box.cfg{} is called, all the events will return nil.
>
> For how to ban built-in events override - in the design doc there is a couple
> of proposals. A third option would be to check for "box." prefix in the public
> API of box.broadcast() and return an error if it matches. I would probably ask
> Vladimir D. for what he thinks is the best among these options. I think he will
> be the second reviewer anyway.
> For what to broadcast before box.cfg{} - I have no strong opinion here.
> Option 1 is to broadcast some invalid default values like `box.schema = {version = 0}`.
> Option 2 would be to broadcast an empty dictionary. For instance, before box.cfg is
> called, the following values are available:
> box.id = {}
> box.schema = {}
> box.status = {}
> box.election = {}
> This way the users will be able to distinguish an event being not supported
> at all from box.cfg{} being not called yet. Otherwise they would need to parse
> _TARANTOOL version string locally and peer_version in netbox.
>
> The rest looks good!
All fixed.
.../unreleased/gh-6260-add-builtin-events.md | 4 +
src/box/alter.cc | 21 +-
src/box/box.cc | 121 ++++++-
src/box/box.h | 18 +
src/box/lua/watcher.c | 2 +
src/box/raft.c | 1 +
src/box/replication.cc | 2 +
.../gh_6260_add_builtin_events_test.lua | 323 ++++++++++++++++++
8 files changed, 478 insertions(+), 14 deletions(-)
create mode 100644 changelogs/unreleased/gh-6260-add-builtin-events.md
create mode 100644 test/box-luatest/gh_6260_add_builtin_events_test.lua
diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md
new file mode 100644
index 000000000..1d13e410f
--- /dev/null
+++ b/changelogs/unreleased/gh-6260-add-builtin-events.md
@@ -0,0 +1,4 @@
+## feature/core
+
+* Added predefined system events: `box.status`, `box.id`, `box.election`
+ and `box.schema` (gh-6260)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 45a6b7f41..342f893b8 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,9 +57,17 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
+#include "box.h"
/* {{{ Auxiliary functions and methods. */
+static void
+box_schema_version_bump(void)
+{
+ ++schema_version;
+ box_broadcast_schema(false);
+}
+
static int
access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid,
enum schema_object_type type, enum priv_type priv_type)
@@ -1702,7 +1710,7 @@ void
UpdateSchemaVersion::alter(struct alter_space *alter)
{
(void)alter;
- ++schema_version;
+ box_schema_version_bump();
}
/**
@@ -2277,7 +2285,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* AlterSpaceOps are registered in case of space
* create.
*/
- ++schema_version;
+ box_schema_version_bump();
/*
* So may happen that until the DDL change record
* is written to the WAL, the space is used for
@@ -2391,7 +2399,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* deleting the space from the space_cache, since no
* AlterSpaceOps are registered in case of space drop.
*/
- ++schema_version;
+ box_schema_version_bump();
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
if (on_commit == NULL)
@@ -4256,6 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
+ box_broadcast_id(false);
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
@@ -5102,7 +5111,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5629,7 +5638,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5885,7 +5894,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 5414f3d9d..9be1a5bbf 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
#include "audit.h"
#include "trivia/util.h"
#include "version.h"
+#include "mp_uuid.h"
static char status[64] = "unknown";
@@ -98,14 +99,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
-
const struct vclock *box_vclock = &replicaset.vclock;
/**
@@ -161,6 +154,19 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+box_broadcast_status(bool is_outside_box_cfg);
+
+static void
+title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ box_broadcast_status(false);
+}
+
void
box_update_ro_summary(void)
{
@@ -174,6 +180,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ box_broadcast_status(false);
+ box_broadcast_election(false);
}
const char *
@@ -3359,6 +3367,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
else
tt_uuid_create(&INSTANCE_UUID);
+ box_broadcast_id(false);
say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
/*
@@ -3637,6 +3646,10 @@ box_init(void)
sequence_init();
box_raft_init();
box_watcher_init();
+ box_broadcast_id(true);
+ box_broadcast_status(true);
+ box_broadcast_election(true);
+ box_broadcast_schema(true);
}
bool
@@ -3937,3 +3950,95 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+void
+box_broadcast_id(bool is_outside_box_cfg)
+{
+ char buf[1024];
+ char *w = buf;
+
+ if (is_outside_box_cfg)
+ w = mp_encode_map(w, 0);
+ else {
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, instance_id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
+ }
+
+ box_broadcast("box.id", strlen("box.id"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(bool is_outside_box_cfg)
+{
+ char buf[1024];
+ char *w = buf;
+
+ if (is_outside_box_cfg)
+ w = mp_encode_map(w, 0);
+ else {
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+ }
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election(bool is_outside_box_cfg)
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+
+ if (is_outside_box_cfg)
+ w = mp_encode_map(w, 0);
+ else {
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+ }
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(bool is_outside_box_cfg)
+{
+ char buf[1024];
+ char *w = buf;
+
+ if (is_outside_box_cfg)
+ w = mp_encode_map(w, 0);
+ else {
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
+ }
+
+ box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..41f60c0ea 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(bool is_outside_box_cfg);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(bool is_outside_box_cfg);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(bool is_outside_box_cfg);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/watcher.c b/src/box/lua/watcher.c
index 01a307f1d..dd843863e 100644
--- a/src/box/lua/watcher.c
+++ b/src/box/lua/watcher.c
@@ -175,6 +175,8 @@ lbox_broadcast(struct lua_State *L)
return luaL_error(L, "Usage: box.broadcast(key[, value])");
size_t key_len;
const char *key = luaL_checklstring(L, 1, &key_len);
+ if (strncmp(key, "box.", 4) == 0)
+ return luaL_error(L, "System event can't be override");
struct ibuf *ibuf = cord_ibuf_take();
const char *data = NULL;
const char *data_end = NULL;
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..259be639e 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election(false);
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..732fe6211 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
+ box_broadcast_id(false);
} else if (replica->anon) {
/*
* Set replica gc on its transition from
@@ -287,6 +288,7 @@ replica_clear_id(struct replica *replica)
/* See replica_check_id(). */
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
+ box_broadcast_id(false);
}
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..4170ce420
--- /dev/null
+++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,323 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local server = require('test.luatest_helpers.server')
+
+local g = t.group('gh_6260')
+
+g.test_subscriptions_outside_box_cfg = function()
+ local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
+
+ for i, val in ipairs(sys_events) do
+ local result = {}
+ local result_no = 0
+ local watcher = box.watch(val,
+ function(name, state)
+ assert(name == val)
+ result = state
+ result_no = result_no + 1
+ end)
+
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+ t.assert_equals(result, {})
+ watcher:unregister()
+ end
+end
+
+g.before_test('test_sys_events_no_override', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_sys_events_no_override', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_sys_events_no_override = function(cg)
+ local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
+
+ for i = 1, #sys_events do
+ t.assert_error_msg_content_equals("System event can't be override",
+ function()
+ cg.master:exec(function(sys_events, i)
+ box.broadcast(sys_events[i], 'any_data')
+ end, {sys_events, i})
+ end)
+ end
+end
+
+g.before_test('test_box_status', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_status', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_status = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+ local watcher = c:watch('box.status',
+ function(name, state)
+ assert(name == 'box.status')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- test orphan status appearance
+ cg.master:exec(function(repl)
+ box.cfg{
+ replication = repl,
+ replication_connect_timeout = 0.001,
+ replication_timeout = 0.001,
+ }
+ end, {{server.build_instance_uri('master'),
+ server.build_instance_uri('replica')}})
+ -- here we have 2 notifications: entering ro when can't connect
+ -- to master and the second one when going orphan
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 3) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+ -- test ro_cfg appearance
+ cg.master:exec(function()
+ box.cfg{
+ replication = {},
+ read_only = true,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 4) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+ -- reset to rw
+ cg.master:exec(function()
+ box.cfg{
+ read_only = false,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 5) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- turning manual election mode puts into ro
+ cg.master:exec(function()
+ box.cfg{
+ election_mode = 'manual',
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 6) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+ -- promotion should turn rm
+ cg.master:exec(function() box.ctl.promote() end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 7) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_election', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ server.build_instance_uri('instance_1'),
+ server.build_instance_uri('instance_2'),
+ server.build_instance_uri('instance_3'),
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'off',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.after_test('test_box_election', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.test_box_election = function(cg)
+ local c = {}
+ c[1] = net.connect(cg.instance_1.net_box_uri)
+ c[2] = net.connect(cg.instance_2.net_box_uri)
+ c[3] = net.connect(cg.instance_3.net_box_uri)
+
+ local res = {}
+ local res_n = {0, 0, 0}
+
+ for i = 1, 3 do
+ c[i]:watch('box.election',
+ function(n, s)
+ t.assert_equals(n, 'box.election')
+ res[i] = s
+ res_n[i] = res_n[i] + 1
+ end)
+ end
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ -- verify all instances are in the same state
+ t.assert_equals(res[1], res[2])
+ t.assert_equals(res[1], res[3])
+
+ -- wait for elections to complete, verify leader is the instance_1
+ -- trying to avoid the exact number of term - it can vary
+ local instance1_id = cg.instance_1:instance_id()
+
+ cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
+ cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
+ cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_covers(res[1],
+ {leader = instance1_id, is_ro = false, role = 'leader'})
+ t.assert_covers(res[2],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+ t.assert_covers(res[3],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+
+ -- verify all terms are in the same state
+ t.assert_equals(res[1].term, res[2].term)
+ t.assert_equals(res[1].term, res[3].term)
+
+ -- check the stepping down is working
+ res_n = {0, 0, 0}
+ cg.instance_1:exec(function() box.cfg{election_mode='voter'} end)
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ local expected = {is_ro = true, role = 'follower', term = res[1].term,
+ leader = 0}
+ t.assert_covers(res, {expected, expected, expected})
+
+ c[1]:close()
+ c[2]:close()
+ c[3]:close()
+end
+
+g.before_test('test_box_schema', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_schema', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_schema = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+ local version = 0
+ local version_n = 0
+
+ local watcher = c:watch('box.schema',
+ function(n, s)
+ assert(n == 'box.schema')
+ version = s.version
+ version_n = version_n + 1
+ end)
+
+ cg.master:exec(function() box.schema.create_space('p') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ -- first version change, use it as initial value
+ local init_version = version
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:create_index('i') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 1) end)
+ t.assert_equals(version, init_version + 1)
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:drop() end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ t.assert_equals(version, init_version + 3)
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_id', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replicaset_uuid = t.helpers.uuid('ab', 1),
+ instance_uuid = t.helpers.uuid('1', '2', '3')
+ }
+
+ cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.instance)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_id', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_id = function(cg)
+ local c = net.connect(cg.instance.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+
+ local watcher = c:watch('box.id',
+ function(name, state)
+ assert(name == 'box.id')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+ t.assert_equals(result, {id = 1,
+ instance_uuid = cg.instance.box_cfg.instance_uuid,
+ replicaset_uuid = cg.instance.box_cfg.replicaset_uuid})
+
+ watcher:unregister()
+ c:close()
+end
--
2.25.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
2022-03-22 11:51 Yan Shtunder via Tarantool-patches
@ 2022-03-22 22:55 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-03-22 22:55 UTC (permalink / raw)
To: Yan Shtunder; +Cc: tarantool-patches
Hi! Good job on the fixes!
See 3 comments below, almost finished!
> Built-in events can't be override. Meaning, users can't be able to call
> box.broadcast(‘box.id’, any_data) etc.
>
> The events are available from the very beginning as not MP_NIL. It will come
> in handy when local subscriptions will be supported (not via the network).
1. 'when local subscriptions will be supported (not via the network).' - they
are already supported. I wrote this sentence before Vova managed to implement
`box.watch()`.
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 45a6b7f41..342f893b8 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -57,9 +57,17 @@
> #include "sequence.h"
> #include "sql.h"
> #include "constraint_id.h"
> +#include "box.h"
>
> /* {{{ Auxiliary functions and methods. */
>
> +static void
> +box_schema_version_bump(void)
> +{
> + ++schema_version;
> + box_broadcast_schema(false);
2. The main problem with adding flags as booleans in function arguments is
that when you call the function, you can't tell from the call what does the
argument mean. For instance, if I don't know/remember the function's signature,
I can't tell what 'false' stands for.
Normally such cases are ironed out by either splitting the function in 2
(inside they can call some third function with this flag as an argument), or
by just not using the flag.
Here, for example, the following can be done - just use these functions only
after box.cfg. Before that, as a first step, you broadcast the sys events
explicitly, manually:
====================
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 342f893b8..a7598a88f 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -65,7 +65,7 @@ static void
box_schema_version_bump(void)
{
++schema_version;
- box_broadcast_schema(false);
+ box_broadcast_schema();
}
static int
@@ -4264,7 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
- box_broadcast_id(false);
+ box_broadcast_id();
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
diff --git a/src/box/box.cc b/src/box/box.cc
index 9be1a5bbf..62693b3db 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -155,7 +155,7 @@ static struct fiber_pool tx_fiber_pool;
static struct cbus_endpoint tx_prio_endpoint;
static void
-box_broadcast_status(bool is_outside_box_cfg);
+box_broadcast_status(void);
static void
title(const char *new_status)
@@ -164,7 +164,7 @@ title(const char *new_status)
title_set_status(new_status);
title_update();
systemd_snotify("STATUS=%s", status);
- box_broadcast_status(false);
+ box_broadcast_status();
}
void
@@ -180,8 +180,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
- box_broadcast_status(false);
- box_broadcast_election(false);
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3367,7 +3367,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
else
tt_uuid_create(&INSTANCE_UUID);
- box_broadcast_id(false);
+ box_broadcast_id();
say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
/*
@@ -3646,10 +3646,15 @@ box_init(void)
sequence_init();
box_raft_init();
box_watcher_init();
- box_broadcast_id(true);
- box_broadcast_status(true);
- box_broadcast_election(true);
- box_broadcast_schema(true);
+
+ /*
+ * Default built-in events to help users distinguish an event being not
+ * supported from box.cfg not being called yet.
+ */
+ box_broadcast_fmt("box.status", "{}");
+ box_broadcast_fmt("box.election", "{}");
+ box_broadcast_fmt("box.id", "{}");
+ box_broadcast_fmt("box.schema", "{}");
}
bool
@@ -3952,22 +3957,17 @@ box_reset_stat(void)
}
void
-box_broadcast_id(bool is_outside_box_cfg)
+box_broadcast_id(void)
{
char buf[1024];
char *w = buf;
-
- if (is_outside_box_cfg)
- w = mp_encode_map(w, 0);
- else {
- w = mp_encode_map(w, 3);
- w = mp_encode_str0(w, "id");
- w = mp_encode_uint(w, instance_id);
- w = mp_encode_str0(w, "instance_uuid");
- w = mp_encode_uuid(w, &INSTANCE_UUID);
- w = mp_encode_str0(w, "replicaset_uuid");
- w = mp_encode_uuid(w, &REPLICASET_UUID);
- }
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, instance_id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
box_broadcast("box.id", strlen("box.id"), buf, w);
@@ -3975,22 +3975,17 @@ box_broadcast_id(bool is_outside_box_cfg)
}
static void
-box_broadcast_status(bool is_outside_box_cfg)
+box_broadcast_status(void)
{
char buf[1024];
char *w = buf;
-
- if (is_outside_box_cfg)
- w = mp_encode_map(w, 0);
- else {
- w = mp_encode_map(w, 3);
- w = mp_encode_str0(w, "is_ro");
- w = mp_encode_bool(w, box_is_ro());
- w = mp_encode_str0(w, "is_ro_cfg");
- w = mp_encode_bool(w, cfg_geti("read_only"));
- w = mp_encode_str0(w, "status");
- w = mp_encode_str0(w, box_status());
- }
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
box_broadcast("box.status", strlen("box.status"), buf, w);
@@ -3998,26 +3993,21 @@ box_broadcast_status(bool is_outside_box_cfg)
}
void
-box_broadcast_election(bool is_outside_box_cfg)
+box_broadcast_election(void)
{
struct raft *raft = box_raft();
char buf[1024];
char *w = buf;
-
- if (is_outside_box_cfg)
- w = mp_encode_map(w, 0);
- else {
- w = mp_encode_map(w, 4);
- w = mp_encode_str0(w, "term");
- w = mp_encode_uint(w, raft->term);
- w = mp_encode_str0(w, "role");
- w = mp_encode_str0(w, raft_state_str(raft->state));
- w = mp_encode_str0(w, "is_ro");
- w = mp_encode_bool(w, box_is_ro());
- w = mp_encode_str0(w, "leader");
- w = mp_encode_uint(w, raft->leader);
- }
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
box_broadcast("box.election", strlen("box.election"), buf, w);
@@ -4025,18 +4015,13 @@ box_broadcast_election(bool is_outside_box_cfg)
}
void
-box_broadcast_schema(bool is_outside_box_cfg)
+box_broadcast_schema(void)
{
char buf[1024];
char *w = buf;
-
- if (is_outside_box_cfg)
- w = mp_encode_map(w, 0);
- else {
- w = mp_encode_map(w, 1);
- w = mp_encode_str0(w, "version");
- w = mp_encode_uint(w, box_schema_version());
- }
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
box_broadcast("box.schema", strlen("box.schema"), buf, w);
diff --git a/src/box/box.h b/src/box/box.h
index 41f60c0ea..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -567,19 +567,19 @@ boxk(int type, uint32_t space_id, const char *format, ...);
* Broadcast the identification of the instance
*/
void
-box_broadcast_id(bool is_outside_box_cfg);
+box_broadcast_id(void);
/**
* Broadcast the current election state of RAFT machinery
*/
void
-box_broadcast_election(bool is_outside_box_cfg);
+box_broadcast_election(void);
/**
* Broadcast the current schema version
*/
void
-box_broadcast_schema(bool is_outside_box_cfg);
+box_broadcast_schema(void);
#if defined(__cplusplus)
} /* extern "C" */
diff --git a/src/box/raft.c b/src/box/raft.c
index 259be639e..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,7 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
- box_broadcast_election(false);
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 732fe6211..0da256721 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,7 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
- box_broadcast_id(false);
+ box_broadcast_id();
} else if (replica->anon) {
/*
* Set replica gc on its transition from
@@ -288,7 +288,7 @@ replica_clear_id(struct replica *replica)
/* See replica_check_id(). */
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
- box_broadcast_id(false);
+ box_broadcast_id();
}
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
====================
Now on each box_broadcast_<sys_event>() call you can tell that it
just broadcasts the event. No flags, not other parameters, simple to
read.
> diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> new file mode 100644
> index 000000000..4170ce420
> --- /dev/null
> +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
<...>
> +g.test_sys_events_no_override = function(cg)
> + local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
> +
> + for i = 1, #sys_events do
> + t.assert_error_msg_content_equals("System event can't be override",
> + function()
> + cg.master:exec(function(sys_events, i)
> + box.broadcast(sys_events[i], 'any_data')
> + end, {sys_events, i})
> + end)
> + end
3. Lua supports range loops. You can do this:
====================
@@ -39,12 +39,12 @@ end)
g.test_sys_events_no_override = function(cg)
local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'}
- for i = 1, #sys_events do
+ for _, key in pairs(sys_events) do
t.assert_error_msg_content_equals("System event can't be override",
function()
- cg.master:exec(function(sys_events, i)
- box.broadcast(sys_events[i], 'any_data')
- end, {sys_events, i})
+ cg.master:exec(function(key)
+ box.broadcast(key, 'any_data')
+ end, {key})
end)
end
====================
Up to you.
Technically everything looks good except for the nits above. Now you also
should comb the patch to make the linters happy: luacheck and checkpatch.
To run luacheck you need to install it. How - depends on your system. In
Linux I think it would be something like
sudo apt install -y lua5.1 luarocks
sudo luarocks install luacheck
Or
tarantoolctl rocks install luacheck (I didn't validate this)
Then you run it with our config. This is how I do it in tarantool's root
dir (when I want to check 'test/' folder):
luacheck --codes --config .luacheckrc test/
Then for checkpatch you need to clone the repository:
https://github.com/tarantool/checkpatch
For instance, I cloned it just in a neighbour folder near my tarantool/
repo. Then from inside of tarantool/ I do this:
../checkpatch/checkpatch.pl --color=always --git head
to check the latest commit. I suppose you can also specify head~1 for a
previous one, head~2 for pre-previous and so on. Maybe it also supports
git hash ranges, I didn't check yet. On your commit this linter raises some
errors.
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
@ 2022-02-28 13:32 Yan Shtunder via Tarantool-patches
2022-03-01 22:44 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 1 reply; 8+ messages in thread
From: Yan Shtunder via Tarantool-patches @ 2022-02-28 13:32 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches, Yan Shtunder
Added predefined system events: box.status, box.id, box.election and
box.schema.
Closes #6260
@TarantoolBot document
Title: Built-in events for pub/sub
Built-in events are needed, first of all, in order to learn who is the
master, unless it is defined in an application specific way. Knowing who
is the master is necessary to send changes to a correct instance, and
probably make reads of the most actual data if it is important. Also
defined more built-in events for other mutable properties like leader
state change, his election role and election term, schema version change
and instance state.
Built-in events have a special naming schema - their name always starts
with box.. The prefix is reserved for built-in events. Creating new events
with this prefix is banned. Below is a list of all the events + their names
and values:
1. box.id
Description - identification of the instance. Changes are extra rare. Some
values never change or change only once. For example, instance UUID never
changes after the first box.cfg. But is not known before box.cfg is called.
Replicaset UUID is unknown until the instance joins to a replicaset or boots
a new one, but the events are supposed to start working before that - right
at listen launch. Instance numeric ID is known only after registration. On
anonymous replicas is 0 until they are registered officially.
Value - {
MP_STR “id”: MP_UINT; box.info.id,
MP_STR “instance_uuid”: MP_UUID; box.info.uuid,
MP_STR “replicaset_uuid”: MP_UUID box.info.cluster.uuid,
}
2. box.status
Description - generic blob about instance status. Its most commonly used and
not frequently changed config options and box.info fields.
Value - {
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “is_ro_cfg”: MP_BOOL box.cfg.read_only,
MP_STR “status”: MP_STR box.info.status,
}
3. box.election
Description - all the needed parts of box.info.election needed to find who is
the most recent writable leader.
Value - {
MP_STR “term”: MP_UINT box.info.election.term,
MP_STR “role”: MP_STR box.info.election.state,
MP_STR “is_ro”: MP_BOOL box.info.ro,
MP_STR “leader”: MP_UINT box.info.election.leader,
}
4. box.schema
Description - schema-related data. Currently it is only version.
Value - {
MP_STR “version”: MP_UINT schema_version,
}
Example usage:
* Client:
```lua
conn = net.box.connect(URI)
-- Subscribe to updates of key 'box.id'
w = conn:watch('box.id', function(key, value)
assert(key == 'box.id')
-- do something with value
end)
-- or to updates of key 'box.status'
w = conn:watch('box.status', function(key, value)
assert(key == 'box.status')
-- do something with value
end)
-- or to updates of key 'box.election'
w = conn:watch('box.election', function(key, value)
assert(key == 'box.election')
-- do something with value
end)
-- or to updates of key 'box.schema'
w = conn:watch('box.schema', function(key, value)
assert(key == 'box.schema')
-- do something with value
end)
-- Unregister the watcher when it's no longer needed.
w:unregister()
```
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4
.../unreleased/gh-6260-add-builtin-events.md | 4 +
src/box/alter.cc | 21 +-
src/box/box.cc | 101 ++++++-
src/box/box.h | 18 ++
src/box/raft.c | 1 +
src/box/replication.cc | 2 +
.../gh_6260_add_builtin_events_test.lua | 279 ++++++++++++++++++
7 files changed, 412 insertions(+), 14 deletions(-)
create mode 100644 changelogs/unreleased/gh-6260-add-builtin-events.md
create mode 100644 test/box-luatest/gh_6260_add_builtin_events_test.lua
diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md
new file mode 100644
index 000000000..1d13e410f
--- /dev/null
+++ b/changelogs/unreleased/gh-6260-add-builtin-events.md
@@ -0,0 +1,4 @@
+## feature/core
+
+* Added predefined system events: `box.status`, `box.id`, `box.election`
+ and `box.schema` (gh-6260)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index b85d279e3..f043688fd 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,9 +57,17 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
+#include "box.h"
/* {{{ Auxiliary functions and methods. */
+static void
+box_schema_version_bump(void)
+{
+ ++schema_version;
+ box_broadcast_schema();
+}
+
static int
access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid,
enum schema_object_type type, enum priv_type priv_type)
@@ -1684,7 +1692,7 @@ void
UpdateSchemaVersion::alter(struct alter_space *alter)
{
(void)alter;
- ++schema_version;
+ box_schema_version_bump();
}
/**
@@ -2259,7 +2267,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* AlterSpaceOps are registered in case of space
* create.
*/
- ++schema_version;
+ box_schema_version_bump();
/*
* So may happen that until the DDL change record
* is written to the WAL, the space is used for
@@ -2373,7 +2381,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* deleting the space from the space_cache, since no
* AlterSpaceOps are registered in case of space drop.
*/
- ++schema_version;
+ box_schema_version_bump();
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
if (on_commit == NULL)
@@ -4238,6 +4246,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
+ box_broadcast_id();
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
@@ -5084,7 +5093,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5611,7 +5620,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5867,7 +5876,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 6a33203df..38a0f24e6 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
#include "audit.h"
#include "trivia/util.h"
#include "version.h"
+#include "mp_uuid.h"
static char status[64] = "unknown";
@@ -98,14 +99,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
-
const struct vclock *box_vclock = &replicaset.vclock;
/**
@@ -161,6 +154,19 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+box_broadcast_status(void);
+
+static void
+title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ box_broadcast_status();
+}
+
void
box_update_ro_summary(void)
{
@@ -174,6 +180,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3355,6 +3363,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
else
tt_uuid_create(&INSTANCE_UUID);
+ box_broadcast_id();
say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
/*
@@ -3933,3 +3942,79 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+void
+box_broadcast_id(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, instance_id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
+
+ box_broadcast("box.id", strlen("box.id"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election(void)
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
+
+ box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(void);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(void);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(void);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..0da256721 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
+ box_broadcast_id();
} else if (replica->anon) {
/*
* Set replica gc on its transition from
@@ -287,6 +288,7 @@ replica_clear_id(struct replica *replica)
/* See replica_check_id(). */
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
+ box_broadcast_id();
}
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..f34bc4681
--- /dev/null
+++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,279 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local server = require('test.luatest_helpers.server')
+
+local g = t.group('gh_6260')
+
+g.before_test('test_box_status', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_status', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_status = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+ local watcher = c:watch('box.status',
+ function(name, state)
+ assert(name == 'box.status')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- test orphan status appearance
+ cg.master:exec(function(repl)
+ box.cfg{
+ replication = repl,
+ replication_connect_timeout = 0.001,
+ replication_timeout = 0.001,
+ }
+ end, {{server.build_instance_uri('master'),
+ server.build_instance_uri('replica')}})
+ -- here we have 2 notifications: entering ro when can't connect
+ -- to master and the second one when going orphan
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 3) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+ -- test ro_cfg appearance
+ cg.master:exec(function()
+ box.cfg{
+ replication = {},
+ read_only = true,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 4) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+ -- reset to rw
+ cg.master:exec(function()
+ box.cfg{
+ read_only = false,
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 5) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- turning manual election mode puts into ro
+ cg.master:exec(function()
+ box.cfg{
+ election_mode = 'manual',
+ }
+ end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 6) end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+ -- promotion should turn rm
+ cg.master:exec(function() box.ctl.promote() end)
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 7) end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_election', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ server.build_instance_uri('instance_1'),
+ server.build_instance_uri('instance_2'),
+ server.build_instance_uri('instance_3'),
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'off',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.after_test('test_box_election', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.test_box_election = function(cg)
+ local c = {}
+ c[1] = net.connect(cg.instance_1.net_box_uri)
+ c[2] = net.connect(cg.instance_2.net_box_uri)
+ c[3] = net.connect(cg.instance_3.net_box_uri)
+
+ local res = {}
+ local res_n = {0, 0, 0}
+
+ for i = 1, 3 do
+ c[i]:watch('box.election',
+ function(n, s)
+ t.assert_equals(n, 'box.election')
+ res[i] = s
+ res_n[i] = res_n[i] + 1
+ end)
+ end
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ -- verify all instances are in the same state
+ t.assert_equals(res[1], res[2])
+ t.assert_equals(res[1], res[3])
+
+ -- wait for elections to complete, verify leader is the instance_1
+ -- trying to avoid the exact number of term - it can vary
+ local instance1_id = cg.instance_1:instance_id()
+
+ cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
+ cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
+ cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_covers(res[1],
+ {leader = instance1_id, is_ro = false, role = 'leader'})
+ t.assert_covers(res[2],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+ t.assert_covers(res[3],
+ {leader = instance1_id, is_ro = true, role = 'follower'})
+
+ -- verify all terms are in the same state
+ t.assert_equals(res[1].term, res[2].term)
+ t.assert_equals(res[1].term, res[3].term)
+
+ -- check the stepping down is working
+ res_n = {0, 0, 0}
+ cg.instance_1:exec(function() box.cfg{election_mode='voter'} end)
+ t.helpers.retrying({}, function()
+ t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3)
+ end)
+
+ local expected = {is_ro = true, role = 'follower', term = res[1].term,
+ leader = 0}
+ t.assert_covers(res, {expected, expected, expected})
+
+ c[1]:close()
+ c[2]:close()
+ c[3]:close()
+end
+
+g.before_test('test_box_schema', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_schema', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_schema = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+ local version = 0
+ local version_n = 0
+
+ local watcher = c:watch('box.schema',
+ function(n, s)
+ assert(n == 'box.schema')
+ version = s.version
+ version_n = version_n + 1
+ end)
+
+ cg.master:exec(function() box.schema.create_space('p') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ -- first version change, use it as initial value
+ local init_version = version
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:create_index('i') end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 1) end)
+ t.assert_equals(version, init_version + 1)
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:drop() end)
+ t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end)
+ -- there'll be 2 changes - index and space
+ t.assert_equals(version, init_version + 3)
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_id', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replicaset_uuid = t.helpers.uuid('ab', 1),
+ instance_uuid = t.helpers.uuid('1', '2', '3')
+ }
+
+ cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.instance)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_id', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_id = function(cg)
+ local c = net.connect(cg.instance.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+
+ local watcher = c:watch('box.id',
+ function(name, state)
+ assert(name == 'box.id')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end)
+ t.assert_equals(result, {id = 1,
+ instance_uuid = cg.instance.box_cfg.instance_uuid,
+ replicaset_uuid = cg.instance.box_cfg.replicaset_uuid})
+
+ watcher:unregister()
+ c:close()
+end
--
2.25.1
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
2022-02-28 13:32 Yan Shtunder via Tarantool-patches
@ 2022-03-01 22:44 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-03-01 22:44 UTC (permalink / raw)
To: Yan Shtunder; +Cc: tarantool-patches
Hi! Thanks for the fixes!
In the design document it was also said:
The events must be available from the very beginning as not MP_NIL.
It will come in handy when local subscriptions will be supported (not
via the network). Otherwise no way to detect whether an event is even
supported at all by this Tarantool version. By design absence of a
subscription key equals MP_NIL and no events are delivered ever except
this MP_NIL itself. It happens both if it is not broadcast and if it
simply does not exist in the code.
And
The builtin events should not allow userspace broadcast. Meaning, users
should not be able to call box.event.broadcast(‘box.status’, any_data).
In the current implementation the built-in events override is still
allowed, and before box.cfg{} is called, all the events will return nil.
For how to ban built-in events override - in the design doc there is a couple
of proposals. A third option would be to check for "box." prefix in the public
API of box.broadcast() and return an error if it matches. I would probably ask
Vladimir D. for what he thinks is the best among these options. I think he will
be the second reviewer anyway.
For what to broadcast before box.cfg{} - I have no strong opinion here.
Option 1 is to broadcast some invalid default values like `box.schema = {version = 0}`.
Option 2 would be to broadcast an empty dictionary. For instance, before box.cfg is
called, the following values are available:
box.id = {}
box.schema = {}
box.status = {}
box.election = {}
This way the users will be able to distinguish an event being not supported
at all from box.cfg{} being not called yet. Otherwise they would need to parse
_TARANTOOL version string locally and peer_version in netbox.
The rest looks good!
^ permalink raw reply [flat|nested] 8+ messages in thread
* [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
@ 2022-02-22 12:56 Yan Shtunder via Tarantool-patches
2022-02-23 22:44 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 1 reply; 8+ messages in thread
From: Yan Shtunder via Tarantool-patches @ 2022-02-22 12:56 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches, Yan Shtunder
Added predefined system events: box.status, box.id, box.election and
box.schema.
Closes #6260
@TarantoolBot document
Title: Built-in events for pub/sub
Built-in events are needed, first of all, in order to learn who is the
master, unless it is defined in an application specific way. Knowing who
is the master is necessary to send changes to a correct instance, and
probably make reads of the most actual data if it is important. Also
defined more built-in events for other mutable properties like replication
list of the server, schema version change and instance state.
Example usage:
* Client:
```lua
conn = net.box.connect(URI)
-- Subscribe to updates of key 'box.id'
w = conn:watch('box.id', function(key, value)
assert(key == 'box.id')
-- do something with value
end)
-- or to updates of key 'box.status'
w = conn:watch('box.status', function(key, value)
assert(key == 'box.status')
-- do something with value
end)
-- or to updates of key 'box.election'
w = conn:watch('box.election', function(key, value)
assert(key == 'box.election')
-- do something with value
end)
-- or to updates of key 'box.schema'
w = conn:watch('box.schema', function(key, value)
assert(key == 'box.schema')
-- do something with value
end)
-- Unregister the watcher when it's no longer needed.
w:unregister()
```
---
Issue: https://github.com/tarantool/tarantool/issues/6260
Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4
.../unreleased/gh-6260-add-builtin-events.md | 4 +
src/box/alter.cc | 21 +-
src/box/box.cc | 101 +++++-
src/box/box.h | 18 ++
src/box/raft.c | 1 +
src/box/replication.cc | 2 +
.../gh_6260_add_builtin_events_test.lua | 294 ++++++++++++++++++
7 files changed, 427 insertions(+), 14 deletions(-)
create mode 100644 changelogs/unreleased/gh-6260-add-builtin-events.md
create mode 100644 test/box-luatest/gh_6260_add_builtin_events_test.lua
diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md
new file mode 100644
index 000000000..1d13e410f
--- /dev/null
+++ b/changelogs/unreleased/gh-6260-add-builtin-events.md
@@ -0,0 +1,4 @@
+## feature/core
+
+* Added predefined system events: `box.status`, `box.id`, `box.election`
+ and `box.schema` (gh-6260)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index b85d279e3..cd48cd6aa 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -57,6 +57,14 @@
#include "sequence.h"
#include "sql.h"
#include "constraint_id.h"
+#include "box.h"
+
+static void
+box_schema_version_bump(void)
+{
+ ++schema_version;
+ box_broadcast_schema();
+}
/* {{{ Auxiliary functions and methods. */
@@ -1684,7 +1692,7 @@ void
UpdateSchemaVersion::alter(struct alter_space *alter)
{
(void)alter;
- ++schema_version;
+ box_schema_version_bump();
}
/**
@@ -2259,7 +2267,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* AlterSpaceOps are registered in case of space
* create.
*/
- ++schema_version;
+ box_schema_version_bump();
/*
* So may happen that until the DDL change record
* is written to the WAL, the space is used for
@@ -2373,7 +2381,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
* deleting the space from the space_cache, since no
* AlterSpaceOps are registered in case of space drop.
*/
- ++schema_version;
+ box_schema_version_bump();
struct trigger *on_commit =
txn_alter_trigger_new(on_drop_space_commit, old_space);
if (on_commit == NULL)
@@ -4238,6 +4246,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
return -1;
REPLICASET_UUID = uu;
+ box_broadcast_id();
say_info("cluster uuid %s", tt_uuid_str(&uu));
} else if (strcmp(key, "version") == 0) {
if (new_tuple != NULL) {
@@ -5084,7 +5093,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
txn_stmt_on_rollback(stmt, on_rollback);
txn_stmt_on_commit(stmt, on_commit);
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5611,7 +5620,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
space_reset_fk_constraint_mask(child_space);
space_reset_fk_constraint_mask(parent_space);
}
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
@@ -5867,7 +5876,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
if (trigger_run(&on_alter_space, space) != 0)
return -1;
- ++schema_version;
+ box_schema_version_bump();
return 0;
}
diff --git a/src/box/box.cc b/src/box/box.cc
index 6a33203df..38a0f24e6 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -85,6 +85,7 @@
#include "audit.h"
#include "trivia/util.h"
#include "version.h"
+#include "mp_uuid.h"
static char status[64] = "unknown";
@@ -98,14 +99,6 @@ double txn_timeout_default;
struct rlist box_on_shutdown_trigger_list =
RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list);
-static void title(const char *new_status)
-{
- snprintf(status, sizeof(status), "%s", new_status);
- title_set_status(new_status);
- title_update();
- systemd_snotify("STATUS=%s", status);
-}
-
const struct vclock *box_vclock = &replicaset.vclock;
/**
@@ -161,6 +154,19 @@ static struct fiber_pool tx_fiber_pool;
*/
static struct cbus_endpoint tx_prio_endpoint;
+static void
+box_broadcast_status(void);
+
+static void
+title(const char *new_status)
+{
+ snprintf(status, sizeof(status), "%s", new_status);
+ title_set_status(new_status);
+ title_update();
+ systemd_snotify("STATUS=%s", status);
+ box_broadcast_status();
+}
+
void
box_update_ro_summary(void)
{
@@ -174,6 +180,8 @@ box_update_ro_summary(void)
if (is_ro_summary)
engine_switch_to_ro();
fiber_cond_broadcast(&ro_cond);
+ box_broadcast_status();
+ box_broadcast_election();
}
const char *
@@ -3355,6 +3363,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
else
tt_uuid_create(&INSTANCE_UUID);
+ box_broadcast_id();
say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID));
/*
@@ -3933,3 +3942,79 @@ box_reset_stat(void)
engine_reset_stat();
space_foreach(box_reset_space_stat, NULL);
}
+
+void
+box_broadcast_id(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "id");
+ w = mp_encode_uint(w, instance_id);
+ w = mp_encode_str0(w, "instance_uuid");
+ w = mp_encode_uuid(w, &INSTANCE_UUID);
+ w = mp_encode_str0(w, "replicaset_uuid");
+ w = mp_encode_uuid(w, &REPLICASET_UUID);
+
+ box_broadcast("box.id", strlen("box.id"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+static void
+box_broadcast_status(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 3);
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "is_ro_cfg");
+ w = mp_encode_bool(w, cfg_geti("read_only"));
+ w = mp_encode_str0(w, "status");
+ w = mp_encode_str0(w, box_status());
+
+ box_broadcast("box.status", strlen("box.status"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_election(void)
+{
+ struct raft *raft = box_raft();
+
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 4);
+ w = mp_encode_str0(w, "term");
+ w = mp_encode_uint(w, raft->term);
+ w = mp_encode_str0(w, "role");
+ w = mp_encode_str0(w, raft_state_str(raft->state));
+ w = mp_encode_str0(w, "is_ro");
+ w = mp_encode_bool(w, box_is_ro());
+ w = mp_encode_str0(w, "leader");
+ w = mp_encode_uint(w, raft->leader);
+
+ box_broadcast("box.election", strlen("box.election"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
+
+void
+box_broadcast_schema(void)
+{
+ char buf[1024];
+ char *w = buf;
+
+ w = mp_encode_map(w, 1);
+ w = mp_encode_str0(w, "version");
+ w = mp_encode_uint(w, box_schema_version());
+
+ box_broadcast("box.schema", strlen("box.schema"), buf, w);
+
+ assert((size_t)(w - buf) < 1024);
+}
diff --git a/src/box/box.h b/src/box/box.h
index b594f6646..766568368 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space,
int
boxk(int type, uint32_t space_id, const char *format, ...);
+/**
+ * Broadcast the identification of the instance
+ */
+void
+box_broadcast_id(void);
+
+/**
+ * Broadcast the current election state of RAFT machinery
+ */
+void
+box_broadcast_election(void);
+
+/**
+ * Broadcast the current schema version
+ */
+void
+box_broadcast_schema(void);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/raft.c b/src/box/raft.c
index be6009cc1..efc7e1038 100644
--- a/src/box/raft.c
+++ b/src/box/raft.c
@@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event)
* writable only after it clears its synchro queue.
*/
box_update_ro_summary();
+ box_broadcast_election();
if (raft->state != RAFT_STATE_LEADER)
return 0;
/*
diff --git a/src/box/replication.cc b/src/box/replication.cc
index bbf4b7417..0da256721 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id)
/* Assign local replica id */
assert(instance_id == REPLICA_ID_NIL);
instance_id = replica_id;
+ box_broadcast_id();
} else if (replica->anon) {
/*
* Set replica gc on its transition from
@@ -287,6 +288,7 @@ replica_clear_id(struct replica *replica)
/* See replica_check_id(). */
assert(replicaset.is_joining);
instance_id = REPLICA_ID_NIL;
+ box_broadcast_id();
}
replica->id = REPLICA_ID_NIL;
say_info("removed replica %s", tt_uuid_str(&replica->uuid));
diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
new file mode 100644
index 000000000..c11d79e30
--- /dev/null
+++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
@@ -0,0 +1,294 @@
+local t = require('luatest')
+local net = require('net.box')
+local cluster = require('test.luatest_helpers.cluster')
+local server = require('test.luatest_helpers.server')
+local fiber = require('fiber')
+
+local g = t.group('gh_6260')
+
+g.before_test('test_box_status', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_status', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_status = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+ local watcher = c:watch('box.status',
+ function(name, state)
+ assert(name == 'box.status')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function()
+ while result_no < 1 do fiber.sleep(0.000001) end
+ end)
+
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- test orphan status appearance
+ cg.master:exec(function(repl)
+ box.cfg{
+ replication = repl,
+ replication_connect_timeout = 0.001,
+ replication_timeout = 0.001,
+ }
+ end, {{server.build_instance_uri('master'), server.build_instance_uri('replica')}})
+ -- here we have 2 notifications: entering ro when can't connect
+ -- to master and the second one when going orphan
+ t.helpers.retrying({}, function()
+ while result_no < 3 do fiber.sleep(0.000001) end
+ end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'orphan'})
+
+ -- test ro_cfg appearance
+ cg.master:exec(function()
+ box.cfg{
+ replication = {},
+ read_only = true,
+ }
+ end)
+ t.helpers.retrying({}, function()
+ while result_no < 4 do fiber.sleep(0.000001) end
+ end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = true, status = 'running'})
+
+ -- reset to rw
+ cg.master:exec(function()
+ box.cfg{
+ read_only = false,
+ }
+ end)
+ t.helpers.retrying({}, function()
+ while result_no < 5 do fiber.sleep(0.000001) end
+ end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ -- turning manual election mode puts into ro
+ cg.master:exec(function()
+ box.cfg{
+ election_mode = 'manual',
+ }
+ end)
+ t.helpers.retrying({}, function()
+ while result_no < 6 do fiber.sleep(0.000001) end
+ end)
+ t.assert_equals(result,
+ {is_ro = true, is_ro_cfg = false, status = 'running'})
+
+ -- promotion should turn rm
+ cg.master:exec(function() box.ctl.promote() end)
+ t.helpers.retrying({}, function()
+ while result_no < 7 do fiber.sleep(0.000001) end
+ end)
+ t.assert_equals(result,
+ {is_ro = false, is_ro_cfg = false, status = 'running'})
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_election', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replication = {
+ server.build_instance_uri('instance_1'),
+ server.build_instance_uri('instance_2'),
+ server.build_instance_uri('instance_3'),
+ },
+ replication_connect_quorum = 0,
+ election_mode = 'off',
+ replication_synchro_quorum = 2,
+ replication_synchro_timeout = 1,
+ replication_timeout = 0.25,
+ election_timeout = 0.25,
+ }
+
+ cg.instance_1 = cg.cluster:build_server(
+ {alias = 'instance_1', box_cfg = box_cfg})
+
+ cg.instance_2 = cg.cluster:build_server(
+ {alias = 'instance_2', box_cfg = box_cfg})
+
+ cg.instance_3 = cg.cluster:build_server(
+ {alias = 'instance_3', box_cfg = box_cfg})
+
+ cg.cluster:add_server(cg.instance_1)
+ cg.cluster:add_server(cg.instance_2)
+ cg.cluster:add_server(cg.instance_3)
+ cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.after_test('test_box_election', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
+end)
+
+g.test_box_election = function(cg)
+ local c = {}
+ c[1] = net.connect(cg.instance_1.net_box_uri)
+ c[2] = net.connect(cg.instance_2.net_box_uri)
+ c[3] = net.connect(cg.instance_3.net_box_uri)
+
+ local res = {}
+ local res_n = {0, 0, 0}
+
+ for i = 1, 3 do
+ c[i]:watch('box.election',
+ function(n, s)
+ t.assert_equals(n, 'box.election')
+ res[i] = s
+ res_n[i] = res_n[i] + 1
+ end)
+ end
+ t.helpers.retrying({}, function()
+ while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+ end)
+
+ -- verify all instances are in the same state
+ t.assert_equals(res[1], res[2])
+ t.assert_equals(res[1], res[3])
+
+ -- wait for elections to complete, verify leader is the instance_1
+ -- trying to avoid the exact number of term - it can vary
+ local instance1_id = cg.instance_1:instance_id()
+
+ cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
+ cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
+ cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
+
+ cg.instance_1:wait_election_leader_found()
+ cg.instance_2:wait_election_leader_found()
+ cg.instance_3:wait_election_leader_found()
+
+ t.assert_covers(res,
+ {
+ {leader = instance1_id, is_ro = false, role = 'leader', term = 2},
+ {leader = instance1_id, is_ro = true, role = 'follower', term = 2},
+ {leader = instance1_id, is_ro = true, role = 'follower', term = 2}
+ })
+
+ -- check the stepping down is working
+ res_n = {0, 0, 0}
+ cg.instance_1:exec(function() box.cfg{election_mode='voter'} end)
+ t.helpers.retrying({}, function()
+ while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
+ end)
+
+ local expected = {is_ro = true, role = 'follower', term = res[1].term,
+ leader = 0}
+ t.assert_covers(res, {expected, expected, expected})
+
+ c[1]:close()
+ c[2]:close()
+ c[3]:close()
+end
+
+g.before_test('test_box_schema', function(cg)
+ cg.cluster = cluster:new({})
+ cg.master = cg.cluster:build_server({alias = 'master'})
+ cg.cluster:add_server(cg.master)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_schema', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_schema = function(cg)
+ local c = net.connect(cg.master.net_box_uri)
+ local version = 0
+ local version_n = 0
+
+ local watcher = c:watch('box.schema',
+ function(n, s)
+ assert(n == 'box.schema')
+ version = s.version
+ version_n = version_n + 1
+ end)
+
+ cg.master:exec(function() box.schema.create_space('p') end)
+ t.helpers.retrying({}, function()
+ while version_n < 1 do fiber.sleep(0.00001) end
+ end)
+ -- first version change, use it as initial value
+ local init_version = version
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:create_index('i') end)
+ t.helpers.retrying({}, function()
+ while version_n < 1 do fiber.sleep(0.00001) end
+ end)
+ t.assert_equals(version, init_version + 1)
+
+ version_n = 0
+ cg.master:exec(function() box.space.p:drop() end)
+ t.helpers.retrying({}, function()
+ while version_n < 1 do fiber.sleep(0.00001) end
+ end)
+ -- there'll be 2 changes - index and space
+ t.assert_equals(version, init_version + 3)
+
+ watcher:unregister()
+ c:close()
+end
+
+g.before_test('test_box_id', function(cg)
+ cg.cluster = cluster:new({})
+
+ local box_cfg = {
+ replicaset_uuid = t.helpers.uuid('ab', 1),
+ instance_uuid = t.helpers.uuid('1', '2', '3')
+ }
+
+ cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
+ cg.cluster:add_server(cg.instance)
+ cg.cluster:start()
+end)
+
+g.after_test('test_box_id', function(cg)
+ cg.cluster.servers = nil
+ cg.cluster:drop()
+end)
+
+g.test_box_id = function(cg)
+ local c = net.connect(cg.instance.net_box_uri)
+
+ local result = {}
+ local result_no = 0
+
+ local watcher = c:watch('box.id',
+ function(name, state)
+ assert(name == 'box.id')
+ result = state
+ result_no = result_no + 1
+ end)
+
+ -- initial state should arrive
+ t.helpers.retrying({}, function()
+ while result_no < 1 do fiber.sleep(0.00001) end
+ end)
+ t.assert_equals(result, {id = 1,
+ instance_uuid = cg.instance.box_cfg.instance_uuid,
+ replicaset_uuid = cg.instance.box_cfg.replicaset_uuid})
+
+ watcher:unregister()
+ c:close()
+end
--
2.25.1
Hi! Thank you for the review!
> Hi! Thanks for the patch, good job!
>
> See some new comments below!
>
> On 12.02.2022 02:18, Yan Shtunder via Tarantool-patches wrote:
> > Adding predefined system events: box.status, box.id, box.election
> > and box.schema.
> >
> > Closes #6260
> >
> > NO_CHANGELOG=test stuff
> > NO_DOC=testing stuff
>
> 1. This is a new public API. Lets add both a docbot request and a changelog.
1. Done.
> @TarantoolBot document
> Title: Built-in events for pub/sub
>
> Built-in events are needed, first of all, in order to learn who is the
> master, unless it is defined in an application specific way. Knowing who
> is the master is necessary to send changes to a correct instance, and
> probably make reads of the most actual data if it is important. Also
> defined more built-in events for other mutable properties like replication
> list of the server, schema version change and instance state.
>
> Example usage:
>
> * Client:
> ```lua
> conn = net.box.connect(URI)
> -- Subscribe to updates of key 'box.id'
> w = conn:watch('box.id', function(key, value)
> assert(key == 'box.id')
> -- do something with value
> end)
> -- or to updates of key 'box.status'
> w = conn:watch('box.status', function(key, value)
> assert(key == 'box.status')
> -- do something with value
> end)
> -- or to updates of key 'box.election'
> w = conn:watch('box.election', function(key, value)
> assert(key == 'box.election')
> -- do something with value
> end)
> -- or to updates of key 'box.schema'
> w = conn:watch('box.schema', function(key, value)
> assert(key == 'box.schema')
> -- do something with value
> end)
> -- Unregister the watcher when it's no longer needed.
> w:unregister()
> ```
>
> diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md
> new file mode 100644
> index 000000000..1d13e410f
> --- /dev/null
> +++ b/changelogs/unreleased/gh-6260-add-builtin-events.md
> @@ -0,0 +1,4 @@
> +## feature/core
> +
> +* Added predefined system events: `box.status`, `box.id`, `box.election`
> + and `box.schema` (gh-6260)
> > ---
> > Issue: https://github.com/tarantool/tarantool/issues/6260
> > Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3
> >
> > src/box/alter.cc | 9 +
> > src/box/box.cc | 103 +++++-
> > src/box/box.h | 18 +
> > src/box/mp_error.cc | 6 -
> > src/box/raft.c | 1 +
> > src/box/replication.cc | 16 +
> > src/lib/msgpuck | 2 +-
> > src/lib/raft/raft.c | 1 +
> > .../gh_6260_add_builtin_events_test.lua | 320 ++++++++++++++++++
> > test/unit/mp_error.cc | 6 -
> > 10 files changed, 462 insertions(+), 20 deletions(-)
> > create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua
> >
> > diff --git a/src/box/alter.cc b/src/box/alter.cc
> > index 65c1cb952..e9899b968 100644
> > --- a/src/box/alter.cc
> > +++ b/src/box/alter.cc
> > @@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter)
> > {
> > (void)alter;
> > ++schema_version;
> > + box_broadcast_schema();
>
> 2. The schema version increment + its broadcast are duplicated 6 times. Lets
> wrap it into a function like
>
> box_schema_version_bump()
2.
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -57,6 +57,14 @@
> #include "sequence.h"
> #include "sql.h"
> #include "constraint_id.h"
> +#include "box.h"
> +
> +static void
> +box_schema_version_bump(void)
> +{
> + ++schema_version;
> + box_broadcast_schema();
> +}
>
> /* {{{ Auxiliary functions and methods. */
>
> @@ -1684,7 +1692,7 @@ void
> UpdateSchemaVersion::alter(struct alter_space *alter)
> {
> (void)alter;
> - ++schema_version;
> + box_schema_version_bump();
> }
>
> /**
> @@ -2259,7 +2267,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
> * AlterSpaceOps are registered in case of space
> * create.
> */
> - ++schema_version;
> + box_schema_version_bump();
> /*
> * So may happen that until the DDL change record
> * is written to the WAL, the space is used for
> @@ -2373,7 +2381,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
> * deleting the space from the space_cache, since no
> * AlterSpaceOps are registered in case of space drop.
> */
> - ++schema_version;
> + box_schema_version_bump();
> struct trigger *on_commit =
> txn_alter_trigger_new(on_drop_space_commit, old_space);
> if (on_commit == NULL)
> @@ -4238,6 +4246,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event)
> if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0)
> return -1;
> REPLICASET_UUID = uu;
> + box_broadcast_id();
> say_info("cluster uuid %s", tt_uuid_str(&uu));
> } else if (strcmp(key, "version") == 0) {
> if (new_tuple != NULL) {
> @@ -5084,7 +5093,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event)
>
> txn_stmt_on_rollback(stmt, on_rollback);
> txn_stmt_on_commit(stmt, on_commit);
> - ++schema_version;
> + box_schema_version_bump();
> return 0;
> }
>
> @@ -5611,7 +5620,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
> space_reset_fk_constraint_mask(child_space);
> space_reset_fk_constraint_mask(parent_space);
> }
> - ++schema_version;
> + box_schema_version_bump();
> return 0;
> }
>
> @@ -5867,7 +5876,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
>
> if (trigger_run(&on_alter_space, space) != 0)
> return -1;
> - ++schema_version;
> + box_schema_version_bump();
> return 0;
> }
> 3. A response to Vova's comment about box_broadcast_fmt() - while I am not
> against that, I am just afraid it might not work with UUIDs. AFAIR we don't
> have a formatter for UUID or any other MP_EXT type. You can still try to use
> box_broadcast_fmt() for events having only MessagePack-native types though.
> Such as schema version and election event.
3. I didn't do anything.
> 4. There is a CI failure:
> https://github.com/tarantool/tarantool/runs/5164254300?check_suite_focus=true.
> Msgpuck submodule was bumped, but the new version doesn't update its test's
> .result file.
4. Done.
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 6a33203df..d72bd3dad 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc> @@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool;
> > */
> > static struct cbus_endpoint tx_prio_endpoint;
> >
> > +static void
> > +box_broadcast_status(void);
> > +
> > +static void
> > +title(const char *new_status)
> > +{
> > + snprintf(status, sizeof(status), "%s", new_status);
> > + title_set_status(new_status);
> > + title_update();
> > + systemd_snotify("STATUS=%s", status);
> > + /* Checking box.info.status change */
>
> 5. The comment narrates the code, doesn't tell anything new. I would
> suggest to drop it. The same in other broadcast calls for all events.
5. Done.
> > + box_broadcast_status();
> > +}
> > +
> > void
> > box_update_ro_summary(void)
> > {
> > @@ -3933,3 +3943,82 @@ box_reset_stat(void)
> > engine_reset_stat();
> > space_foreach(box_reset_space_stat, NULL);
> > }
> > +
> > +void
> > +box_broadcast_id(void)
> > +{
> > + char buf[1024];
> > + char *w = buf;
> > +
> > + struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
> > + uint32_t id = (replica == NULL) ? 0 : replica->id;
>
> 6. You don't need to get the replica. You can use the global variable
> 'instance_id'.
6. I fixed this.
> +void
> +box_broadcast_id(void)
> +{
> + char buf[1024];
> + char *w = buf;
> +
> + w = mp_encode_map(w, 3);
> + w = mp_encode_str0(w, "id");
> + w = mp_encode_uint(w, instance_id);
> + w = mp_encode_str0(w, "instance_uuid");
> + w = mp_encode_uuid(w, &INSTANCE_UUID);
> + w = mp_encode_str0(w, "replicaset_uuid");
> + w = mp_encode_uuid(w, &REPLICASET_UUID);
> +
> + box_broadcast("box.id", strlen("box.id"), buf, w);
> +
> + assert((size_t)(w - buf) < 1024);
> +}
> > -
> > static char *
> > mp_encode_error_one(char *data, const struct error *error)
> > {
> > diff --git a/src/box/replication.cc b/src/box/replication.cc
> > index bbf4b7417..05d5ffb58 100644
> > --- a/src/box/replication.cc
> > +++ b/src/box/replication.cc
> > @@ -183,8 +183,12 @@ replica_new(void)
> > diag_raise();
> > }
> > replica->id = 0;
> > + /* Checking box.info.id change */
> > + box_broadcast_id();
> > replica->anon = false;
> > replica->uuid = uuid_nil;
> > + /* Checking box.info.uuid change */
> > + box_broadcast_id();
>
> 8. You don't need to broadcast on every line where a change happens.
> Anyway nothing is sent until a next yield. You only need to broadcast
> when all the assignments are done and you are leaving the context.
>
>
> 9. Here not necessarily the instance's own data is changed. It
> is a generic replica constructor. It is called for every record in
> _cluster space. You only need to broadcast the places where own IDs
> of the current instance are changed. The global variables 'instance_id',
> 'INSTANCE_UUID', and 'REPLICASET_UUID'.
7. I fixed this.
> > diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
> > index 89cfb3c17..489df235b 100644
> > --- a/src/lib/raft/raft.c
> > +++ b/src/lib/raft/raft.c
> > @@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
> > * it manually.
> > */
> > raft->leader = 0;
> > + raft_schedule_broadcast(raft);
>
> 10. This should be a separate commit with its own unit test. Sergos already made
> a PR, but it lacks a test. I suggest to finish it and rebase this patchset on
> top of that commit.
8. It is done.
> > if (raft->is_candidate)
> > raft_sm_schedule_new_election(raft);
> > }
> > diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua
> > new file mode 100644
> > index 000000000..3c363fc9a
> > --- /dev/null
> > +++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua
>
> 11. The test should be in box-luatest, not app-luatest.
9. Done.
> diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> new file mode 100644
> index 000000000..6c3731449
> --- /dev/null
> +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> > @@ -0,0 +1,320 @@
> > +local t = require('luatest')
> > +local net = require('net.box')
> > +local cluster = require('test.luatest_helpers.cluster')
> > +local helpers = require('test.luatest_helpers')
> > +local fiber = require('fiber')
> > +
> > +local g = t.group('gh_6260')
> > +
> > +
> > +g.before_test('test_box_status', function(cg)
> > + cg.cluster = cluster:new({})
> > +
> > + local box_cfg = {
> > + read_only = false
>
> 12. It is false by default anyway. You don't need this config.
10. Done.
> +local t = require('luatest')
> +local net = require('net.box')
> +local cluster = require('test.luatest_helpers.cluster')
> +local server = require('test.luatest_helpers.server')
> +local fiber = require('fiber')
> +
> +local g = t.group('gh_6260')
> +
> +g.before_test('test_box_status', function(cg)
> + cg.cluster = cluster:new({})
> + cg.master = cg.cluster:build_server({alias = 'master'})
> + cg.cluster:add_server(cg.master)
> + cg.cluster:start()
> +end)
> > + }
> > +
> > + cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
> > + cg.cluster:add_server(cg.master)
> > + cg.cluster:start()
> > +end)
> > +
> > +
> > +g.after_test('test_box_status', function(cg)
> > + cg.cluster.servers = nil
> > + cg.cluster:drop()
> > +end)
> > +
> > +
> > +g.test_box_status = function(cg)
> > + local c = net.connect(cg.master.net_box_uri)
> > +
> > + local result = {}
> > + local result_no = 0
> > + local watcher = c:watch('box.status',
> > + function(name, state)
> > + assert(name == 'box.status')
> > + result = state
> > + result_no = result_no + 1
> > + end)
> > +
> > + -- initial state should arrive
> > + while result_no < 1 do fiber.sleep(0.001) end
>
> 13. Lets better use t.helpers.retrying(). In other places too. See how it is used
> in existing tests for examples.
11.
> + t.helpers.retrying({}, function()
> + while result_no < 1 do fiber.sleep(0.000001) end
> + end)
> > + t.assert_equals(result,
> > + {is_ro = false, is_ro_cfg = false, status = 'running'})
> > +
> > + -- test orphan status appearance
> > + cg.master:eval(([[
> > + box.cfg{
> > + replication = {
> > + "%s",
> > + "%s"
> > + },
> > + replication_connect_timeout = 0.001,
> > + replication_timeout = 0.001,
> > + }
> + ]]):format(helpers.instance_uri('master'),
> + helpers.instance_uri('replica')))
>
> 14. Well, while this way of doing box.cfg certainly works, you might
> want to consider an alternative via :exec(). Probably you didn't find how
> to pass parameters into it? Here is how:
>
> cg.master:exec(function(repl)
> box.cfg{
> replication = repl,
> ...
> }
> end, {{helpers.instance_uri('master'),
> helpers.instance_uri('replica')}})
>
> > + -- here we have 2 notifications: entering ro when can't connect
> > + -- to master and the second one when going orphan
> > + while result_no < 3 do fiber.sleep(0.000001) end
> > + t.assert_equals(result,
> > + {is_ro = true, is_ro_cfg = false, status = 'orphan'})
> > +
> > + -- test ro_cfg appearance
> > + cg.master:eval([[
> > + box.cfg{
> > + replication = {},
> > + read_only = true,
> > + }
> > + ]])
>
> 15. Lets better use :exec().
12. Done.
> + -- test orphan status appearance
> + cg.master:exec(function(repl)
> + box.cfg{
> + replication = repl,
> + replication_connect_timeout = 0.001,
> + replication_timeout = 0.001,
> + }
> + end, {{server.build_instance_uri('master'), server.build_instance_uri('replica')}})
> + -- here we have 2 notifications: entering ro when can't connect
> + -- to master and the second one when going orphan
> + t.helpers.retrying({}, function()
> + while result_no < 3 do fiber.sleep(0.000001) end
> + end)
> + t.assert_equals(result,
> + {is_ro = true, is_ro_cfg = false, status = 'orphan'})
> +
> + -- test ro_cfg appearance
> + cg.master:exec(function()
> + box.cfg{
> + replication = {},
> + read_only = true,
> + }
> + end)
> > + while result_no < 4 do fiber.sleep(0.000001) end
> > + t.assert_equals(result,
> > + {is_ro = true, is_ro_cfg = true, status = 'running'})
> > +
> > + -- reset to rw
> > + cg.master:eval([[
> > + box.cfg{
> > + read_only = false,
> > + }
> > + ]])
> > + while result_no < 5 do fiber.sleep(0.000001) end
> > + t.assert_equals(result,
> > + {is_ro = false, is_ro_cfg = false, status = 'running'})
> > +
> > + -- turning manual election mode puts into ro
> > + cg.master:eval([[
> > + box.cfg{
> > + election_mode = 'manual',
> > + }
> > + ]])
> > + while result_no < 6 do fiber.sleep(0.000001) end
> > + t.assert_equals(result,
> > + {is_ro = true, is_ro_cfg = false, status = 'running'})
> > +
> > + -- promotion should turn rm
> > + cg.master:eval([[
> > + box.ctl.promote()
> > + ]])
> > + while result_no < 7 do fiber.sleep(0.000001) end
> > + t.assert_equals(result,
> > + {is_ro = false, is_ro_cfg = false, status = 'running'})
> > +
> > + watcher:unregister()
> > + c:close()
> > +end
> > +
> > +
>
> 16. Extra empty line. In some other places too. Please, lets remove them.
13. Deleted extra empty line.
> > +g.before_test('test_box_election', function(cg)
> > +
> > + cg.cluster = cluster:new({})
> > +
> > + local box_cfg = {
> > + replication = {
> > + helpers.instance_uri('instance_', 1);
> > + helpers.instance_uri('instance_', 2);
> > + helpers.instance_uri('instance_', 3);
>
> 17. Too long indentation. Also you could write simpler:
>
> helpers.instance_uri('instance_1')
> helpers.instance_uri('instance_2')
> ...
>
> No need to pass the numbers separately.
14.
> +g.before_test('test_box_election', function(cg)
> + cg.cluster = cluster:new({})
> +
> + local box_cfg = {
> + replication = {
> + server.build_instance_uri('instance_1'),
> + server.build_instance_uri('instance_2'),
> + server.build_instance_uri('instance_3'),
> > + },
> > + replication_connect_quorum = 0,
> > + election_mode = 'off',
> > + replication_synchro_quorum = 2,
> > + replication_synchro_timeout = 1,
> > + replication_timeout = 0.25,
> > + election_timeout = 0.25,
> > + }
> > +
> > + cg.instance_1 = cg.cluster:build_server(
> > + {alias = 'instance_1', box_cfg = box_cfg})
> > +
> > + cg.instance_2 = cg.cluster:build_server(
> > + {alias = 'instance_2', box_cfg = box_cfg})
> > +
> > + cg.instance_3 = cg.cluster:build_server(
> > + {alias = 'instance_3', box_cfg = box_cfg})
> > +
> > + cg.cluster:add_server(cg.instance_1)
> > + cg.cluster:add_server(cg.instance_2)
> > + cg.cluster:add_server(cg.instance_3)
> > + cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
> > +end)
> > +
> > +
> > +g.after_test('test_box_election', function(cg)
> > + cg.cluster.servers = nil
> > + cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
> > +end)
> > +
> > +
> > +g.test_box_election = function(cg)
> > + local c = {}
> > + c[1] = net.connect(cg.instance_1.net_box_uri)
> > + c[2] = net.connect(cg.instance_2.net_box_uri)
> > + c[3] = net.connect(cg.instance_3.net_box_uri)
> > +
> > + local res = {}
> > + local res_n = {0, 0, 0}
> > +
> > + for i = 1, 3 do
> > + c[i]:watch('box.election',
> > + function(n, s)
> > + t.assert_equals(n, 'box.election')
> > + res[i] = s
> > + res_n[i] = res_n[i] + 1
> > + end)
> > + end
> > + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> > +
> > + -- verify all instances are in the same state
> > + t.assert_equals(res[1], res[2])
> > + t.assert_equals(res[1], res[3])
> > +
> > + -- wait for elections to complete, verify leader is the instance_1
> > + -- trying to avoid the exact number of term - it can vary
> > + local instance1_id = cg.instance_1:eval("return box.info.id")
>
> 18. You can do cg.instance_1:instance_id().
15.
> + -- wait for elections to complete, verify leader is the instance_1
> + -- trying to avoid the exact number of term - it can vary
> + local instance1_id = cg.instance_1:instance_id()
> > +
> > + cg.instance_1:eval("box.cfg{election_mode='candidate'}")
> > + cg.instance_2:eval("box.cfg{election_mode='voter'}")
> > + cg.instance_3:eval("box.cfg{election_mode='voter'}")
> > +
> > + cg.instance_1:wait_election_leader_found()
> > + cg.instance_2:wait_election_leader_found()
> > + cg.instance_3:wait_election_leader_found()
> > +
> > + t.assert_equals(res[1].leader, instance1_id)
> > + t.assert_equals(res[1].is_ro, false)
> > + t.assert_equals(res[1].role, 'leader')
>
> 19. You can use t.assert_covers() if you want not to check a
> specific term number.
>
> > + t.assert_equals(res[2].leader, instance1_id)
> > + t.assert_equals(res[2].is_ro, true)
> > + t.assert_equals(res[2].role, 'follower')
> > + t.assert_equals(res[3].leader, instance1_id)
> > + t.assert_equals(res[3].is_ro, true)
> > + t.assert_equals(res[3].role, 'follower')
> > + t.assert_equals(res[1].term, res[2].term)
> > + t.assert_equals(res[1].term, res[3].term)
> > +
> > + -- check the stepping down is working
> > + res_n = {0, 0, 0}
> > + cg.instance_1:eval("box.cfg{election_mode='voter'}")
> > + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> > +
> > + local expected = {is_ro = true, role = 'follower', term = res[1].term,
> > + leader = 0}
> > + t.assert_equals(res[1], expected)
> > + t.assert_equals(res[2], expected)
> > + t.assert_equals(res[3], expected)
> > +
> > + c[1]:close()
> > + c[2]:close()
> > + c[3]:close()
> > +end
>
> <...>
16.
> + cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
> + cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
> + cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
> +
> + cg.instance_1:wait_election_leader_found()
> + cg.instance_2:wait_election_leader_found()
> + cg.instance_3:wait_election_leader_found()
> +
> + t.assert_covers(res,
> + {
> + {leader = instance1_id, is_ro = false, role = 'leader', term = 2},
> + {leader = instance1_id, is_ro = true, role = 'follower', term = 2},
> + {leader = instance1_id, is_ro = true, role = 'follower', term = 2}
> + })
> +
> + -- check the stepping down is working
> + res_n = {0, 0, 0}
> + cg.instance_1:exec(function() box.cfg{election_mode='voter'} end)
> + t.helpers.retrying({}, function()
> + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> + end)
> +
> + local expected = {is_ro = true, role = 'follower', term = res[1].term,
> + leader = 0}
> + t.assert_covers(res, {expected, expected, expected})
> +
> + c[1]:close()
> + c[2]:close()
> + c[3]:close()
> +end
> > +g.test_box_id = function(cg)
> > + local c = net.connect(cg.instance.net_box_uri)
> > +
> > + local result = {}
> > + local result_no = 0
> > +
> > + local watcher = c:watch('box.id',
> > + function(name, state)
> > + assert(name == 'box.id')
> > + result = state
> > + result_no = result_no + 1
> > + end)
> > +
> > + -- initial state should arrive
> > + while result_no < 1 do fiber.sleep(0.001) end
> > + t.assert_equals(result, {id = 1,
> > + instance_uuid = '11111111-2222-0000-0000-333333333333',
> > + replicaset_uuid = 'abababab-0000-0000-0000-000000000001'})
>
> 20. Lets better use cg.instance.box_cfg.instance_uuid and
> cg.instance.box_cfg.replicaset_uuid instead of the hardcoded values.
17.
> +g.test_box_id = function(cg)
> + local c = net.connect(cg.instance.net_box_uri)
> +
> + local result = {}
> + local result_no = 0
> +
> + local watcher = c:watch('box.id',
> + function(name, state)
> + assert(name == 'box.id')
> + result = state
> + result_no = result_no + 1
> + end)
> +
> + -- initial state should arrive
> + t.helpers.retrying({}, function()
> + while result_no < 1 do fiber.sleep(0.00001) end
> + end)
> + t.assert_equals(result, {id = 1,
> + instance_uuid = cg.instance.box_cfg.instance_uuid,
> + replicaset_uuid = cg.instance.box_cfg.replicaset_uuid})
> +
> + watcher:unregister()
> + c:close()
> +end
> > +
> > + -- error when instance_uuid set dynamically
> > + t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically",
> > + function()
> > + cg.instance:eval(([[
> > + box.cfg{
> > + instance_uuid = "%s"
> > + }
> > + ]]):format(t.helpers.uuid('1', '2', '4')))
> > + end)
>
> 21. This and the case below don't seem related to your patch. This was
> banned beforehand and is already tested. I would suggest to drop these.
18. I deleted these parts of code.
> > +
> > + w = mp_encode_map(w, 3);
> > + w = mp_encode_str0(w, "id");
> > + w = mp_encode_uint(w, id);
> > + w = mp_encode_str0(w, "instance_uuid");
> > + w = mp_encode_uuid(w, &INSTANCE_UUID);
> > + w = mp_encode_str0(w, "replicaset_uuid");
> > + w = mp_encode_uuid(w, &REPLICASET_UUID);
> > +
> > + box_broadcast("box.id", strlen("box.id"), buf, w);
> > +
> > + assert((size_t)(w - buf) < 1024);
> > +}
> > diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
> > index 3c4176e59..93109562d 100644
> > --- a/src/box/mp_error.cc
> > +++ b/src/box/mp_error.cc
> > @@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error)
> > return data_size;
> > }
> >
> > -static inline char *
> > -mp_encode_str0(char *data, const char *str)
> > -{
> > - return mp_encode_str(data, str, strlen(str));
> > -}
>
> 7. This and the msgpuck submodule update better be done in a separate
> preparatory commit.
>
>
> > +
> > + -- error when replicaset_uuid set dynamically
> > + t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically",
> > + function()
> > + cg.instance:eval(([[
> > + box.cfg{
> > + replicaset_uuid = "%s"
> > + }
> > + ]]):format(t.helpers.uuid('ab', 2)))
> > + end)
> > +
> > + watcher:unregister()
> > + c:close()
> > +end
> > diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
> > index 777c68dff..6afb73b22 100644
> > --- a/test/unit/mp_error.cc
> > +++ b/test/unit/mp_error.cc
> > @@ -94,12 +94,6 @@ enum {
> > sizeof(standard_errors) / sizeof(standard_errors[0]),
> > };
> >
> > -static inline char *
> > -mp_encode_str0(char *data, const char *str)
> > -{
> > - return mp_encode_str(data, str, strlen(str));
> > -}
>
> 22. Should be in the separate commit which bumps msgpuck submodule.
19 Done.
https://github.com/tarantool/tarantool/commit/99d6b8d0660fdaa14a91e0e42f67de6b51997f02
^ permalink raw reply [flat|nested] 8+ messages in thread
* Re: [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub
2022-02-22 12:56 Yan Shtunder via Tarantool-patches
@ 2022-02-23 22:44 ` Vladislav Shpilevoy via Tarantool-patches
0 siblings, 0 replies; 8+ messages in thread
From: Vladislav Shpilevoy via Tarantool-patches @ 2022-02-23 22:44 UTC (permalink / raw)
To: Yan Shtunder; +Cc: tarantool-patches
Hi! Thanks for the patch, good job!
See some comments below.
On 22.02.2022 13:56, Yan Shtunder wrote:
> Added predefined system events: box.status, box.id, box.election and
> box.schema.
>
> Closes #6260
>
> @TarantoolBot document
> Title: Built-in events for pub/sub
>
> Built-in events are needed, first of all, in order to learn who is the
> master, unless it is defined in an application specific way. Knowing who
> is the master is necessary to send changes to a correct instance, and
> probably make reads of the most actual data if it is important. Also
> defined more built-in events for other mutable properties like replication
> list of the server, schema version change and instance state.
1. There is no an event about replication list of the server.
Also you need to provide more details. Which events are introduced? You
didn't give names. Which fields each of them has?
There are names in the example below, but it does not reveal the events'
content.
> Example usage:
>
> * Client:
> ```lua
> conn = net.box.connect(URI)
> -- Subscribe to updates of key 'box.id'
> w = conn:watch('box.id', function(key, value)
> assert(key == 'box.id')
> -- do something with value
> end)
> -- or to updates of key 'box.status'
> w = conn:watch('box.status', function(key, value)
> assert(key == 'box.status')
> -- do something with value
> end)
> -- or to updates of key 'box.election'
> w = conn:watch('box.election', function(key, value)
> assert(key == 'box.election')
> -- do something with value
> end)
> -- or to updates of key 'box.schema'
> w = conn:watch('box.schema', function(key, value)
> assert(key == 'box.schema')
> -- do something with value
> end)
> -- Unregister the watcher when it's no longer needed.
> w:unregister()
> ```
> ---
> Issue: https://github.com/tarantool/tarantool/issues/6260
> Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4
>
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index b85d279e3..cd48cd6aa 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -57,6 +57,14 @@
> #include "sequence.h"
> #include "sql.h"
> #include "constraint_id.h"
> +#include "box.h"
> +
> +static void
> +box_schema_version_bump(void)
> +{
> + ++schema_version;
> + box_broadcast_schema();
> +}
2. Please, move it below the comment on the next line. To keep
the functions grouped.
> /* {{{ Auxiliary functions and methods. */
>
> diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> new file mode 100644
> index 000000000..c11d79e30
> --- /dev/null
> +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua
> @@ -0,0 +1,294 @@
> +local t = require('luatest')
> +local net = require('net.box')
> +local cluster = require('test.luatest_helpers.cluster')
> +local server = require('test.luatest_helpers.server')
> +local fiber = require('fiber')
> +
> +local g = t.group('gh_6260')
> +
> +g.before_test('test_box_status', function(cg)
> + cg.cluster = cluster:new({})
> + cg.master = cg.cluster:build_server({alias = 'master'})
> + cg.cluster:add_server(cg.master)
> + cg.cluster:start()
> +end)
> +
> +g.after_test('test_box_status', function(cg)
> + cg.cluster.servers = nil
> + cg.cluster:drop()
> +end)
> +
> +g.test_box_status = function(cg)
> + local c = net.connect(cg.master.net_box_uri)
> +
> + local result = {}
> + local result_no = 0
> + local watcher = c:watch('box.status',
> + function(name, state)
> + assert(name == 'box.status')
> + result = state
> + result_no = result_no + 1
> + end)
> +
> + -- initial state should arrive
> + t.helpers.retrying({}, function()
> + while result_no < 1 do fiber.sleep(0.000001) end
> + end)
3. With retrying() you are not supposed to have 'while' loop. It is done
inside of the function for a limited time. The problem with custom
'while' loops is that people usually do not add any timeout and do not
handle exceptions. Please, take a look again at other usages of retrying()
for a reference. The same in the similar places below.
> +
> + t.assert_equals(result,
> + {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> + -- test orphan status appearance
> + cg.master:exec(function(repl)
> + box.cfg{
> + replication = repl,
> + replication_connect_timeout = 0.001,
4. Indentation is broken.
> + replication_timeout = 0.001,
> + }
> + end, {{server.build_instance_uri('master'), server.build_instance_uri('replica')}})
5. You are out of 80 symbols.
> + -- here we have 2 notifications: entering ro when can't connect
> + -- to master and the second one when going orphan
> + t.helpers.retrying({}, function()
> + while result_no < 3 do fiber.sleep(0.000001) end
> + end)
> + t.assert_equals(result,
> + {is_ro = true, is_ro_cfg = false, status = 'orphan'})
<...>
> +g.test_box_election = function(cg)
> + local c = {}
> + c[1] = net.connect(cg.instance_1.net_box_uri)
> + c[2] = net.connect(cg.instance_2.net_box_uri)
> + c[3] = net.connect(cg.instance_3.net_box_uri)
> +
> + local res = {}
> + local res_n = {0, 0, 0}
> +
> + for i = 1, 3 do
> + c[i]:watch('box.election',
> + function(n, s)
> + t.assert_equals(n, 'box.election')
> + res[i] = s
> + res_n[i] = res_n[i] + 1
> + end)
> + end
> + t.helpers.retrying({}, function()
> + while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> + end)
> +
> + -- verify all instances are in the same state
> + t.assert_equals(res[1], res[2])
> + t.assert_equals(res[1], res[3])
> +
> + -- wait for elections to complete, verify leader is the instance_1
> + -- trying to avoid the exact number of term - it can vary
6. You just said "trying to avoid the exact number of term" and used an
exact number of term a few lines below. Lets better not rely on an
exact term number. It is enough to test that it exists and is the same in
all 3 events.
t.assert_covers() allows to omit certain fields. Because it is not 'equal',
it is 'covers'. A subset of fields is enough. The others can be checked in
a special way. Like term.
> + local instance1_id = cg.instance_1:instance_id()
> +
> + cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end)
> + cg.instance_2:exec(function() box.cfg{election_mode='voter'} end)
> + cg.instance_3:exec(function() box.cfg{election_mode='voter'} end)
> +
> + cg.instance_1:wait_election_leader_found()
> + cg.instance_2:wait_election_leader_found()
> + cg.instance_3:wait_election_leader_found()
> +
> + t.assert_covers(res,
> + {
> + {leader = instance1_id, is_ro = false, role = 'leader', term = 2},
> + {leader = instance1_id, is_ro = true, role = 'follower', term = 2},
> + {leader = instance1_id, is_ro = true, role = 'follower', term = 2}
> + })
<...>
> Hi! Thank you for the review!
Lets next time put the comment responses before the new patch. I noticed
this part of the email only when finished the new review.
^ permalink raw reply [flat|nested] 8+ messages in thread
end of thread, other threads:[~2022-03-25 23:44 UTC | newest]
Thread overview: 8+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-03-24 14:01 [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub Yan Shtunder via Tarantool-patches
2022-03-25 23:44 ` Vladislav Shpilevoy via Tarantool-patches
-- strict thread matches above, loose matches on Subject: below --
2022-03-22 11:51 Yan Shtunder via Tarantool-patches
2022-03-22 22:55 ` Vladislav Shpilevoy via Tarantool-patches
2022-02-28 13:32 Yan Shtunder via Tarantool-patches
2022-03-01 22:44 ` Vladislav Shpilevoy via Tarantool-patches
2022-02-22 12:56 Yan Shtunder via Tarantool-patches
2022-02-23 22:44 ` Vladislav Shpilevoy via Tarantool-patches
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox