From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 543476ECDB; Thu, 24 Mar 2022 17:02:04 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 543476ECDB DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1648130524; bh=ZjZ3CmJekQQKNeDQG5zfxNWhzwFQC+eiA6gWa2oCWSU=; h=To:Cc:Date:Subject:List-Id:List-Unsubscribe:List-Archive: List-Post:List-Help:List-Subscribe:From:Reply-To:From; b=VJU1WCXeKJmLIdnQJk23HiAq+zDXcpu3TkuYli9vVYy/6yjBU0b+iImTxF7a26xzl 3CIGd6gv+Qsl+Iiw97d4FTdaEVIR+hl6LcLoOTelDQdHF37nEh7W0UuNV7pi/iYsY0 EwqC2EGLtdGGH250omyxWy1driHntWaK6qQ7Qr+I= Received: from mail-lj1-f172.google.com (mail-lj1-f172.google.com [209.85.208.172]) (using TLSv1.3 with cipher TLS_AES_128_GCM_SHA256 (128/128 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C14C76ECDB for ; Thu, 24 Mar 2022 17:02:02 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C14C76ECDB Received: by mail-lj1-f172.google.com with SMTP id g24so6290600lja.7 for ; Thu, 24 Mar 2022 07:02:02 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20210112; h=x-gm-message-state:from:to:cc:subject:date:message-id:mime-version :content-transfer-encoding; bh=bp8XcfH6lIK/J1EMpI7TSRjrpr2oJKqdQ91ktJCjSfo=; b=fxjQhELj85DnXB7xvR5GpdK33i8ApJ12bkHEiL1IIgLo8k6AcHOZEHopUlGfIijP5+ obYwhZaPMbThw0oMpkeTDa5alndWW97quMa5YqRLbWI9Cr93RC16rYc4F1wsaIwIsAh+ fLQtjxkdlBeMXEc6s/KwgPIs/z6zv5eciZ8vQBeTMmyZt3O21vQmE1dIBydl6xQN1wfO V8seTbEYLr9dSDNKY0C+uYcmO19Gp5izrA+1DRS1ecQb8u0FCn1xegwFVBh3hjDGRxB6 DX8KHWwhUfAVxdcnJHclQX4DvqjEZrg2VM9ZM870WABVGuI0koFYsvv5WyMAWrF4duiF SK7A== X-Gm-Message-State: AOAM530JrgLfIbeOFOgN0zw/fdbWSzViVYhfXO/vJWFknhs0vtC/zpwG K2dxL22TsJLV1opMoctyEdqCfeTER/E= X-Google-Smtp-Source: ABdhPJwNwpFFACR34JxI4QiFuWpAT4ft99ltrUr2yyxFvzBV+h35cV44pvQCrW7DJpvddekkF08acA== X-Received: by 2002:a2e:bf1e:0:b0:246:7ace:e115 with SMTP id c30-20020a2ebf1e000000b002467acee115mr4232870ljr.189.1648130521252; Thu, 24 Mar 2022 07:02:01 -0700 (PDT) Received: from localhost.localdomain (broadband-46-242-12-132.ip.moscow.rt.ru. [46.242.12.132]) by smtp.gmail.com with ESMTPSA id b14-20020a0565120b8e00b0044a29806f79sm346182lfv.259.2022.03.24.07.02.00 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Thu, 24 Mar 2022 07:02:00 -0700 (PDT) To: v.shpilevoy@tarantool.org Cc: tarantool-patches@dev.tarantool.org, Yan Shtunder Date: Thu, 24 Mar 2022 17:01:58 +0300 Message-Id: <20220324140158.110418-1-ya.shtunder@gmail.com> X-Mailer: git-send-email 2.25.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v4 net.box] Add predefined system events for pub/sub X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Yan Shtunder via Tarantool-patches Reply-To: Yan Shtunder Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" Added predefined system events: box.status, box.id, box.election and box.schema. Closes #6260 @TarantoolBot document Title: Built-in events for pub/sub Built-in events are needed, first of all, in order to learn who is the master, unless it is defined in an application specific way. Knowing who is the master is necessary to send changes to a correct instance, and probably make reads of the most actual data if it is important. Also defined more built-in events for other mutable properties like leader state change, his election role and election term, schema version change and instance state. Built-in events have a special naming schema - their name always starts with box.. The prefix is reserved for built-in events. Creating new events with this prefix is banned. Below is a list of all the events + their names and values: 1. box.id Description - identification of the instance. Changes are extra rare. Some values never change or change only once. For example, instance UUID never changes after the first box.cfg. But is not known before box.cfg is called. Replicaset UUID is unknown until the instance joins to a replicaset or bootsa new one, but the events are supposed to start working before that - right at listen launch. Instance numeric ID is known only after registration. On anonymous replicas is 0 until they are registered officially. Value - { MP_STR “id”: MP_UINT; box.info.id, MP_STR “instance_uuid”: MP_UUID; box.info.uuid, MP_STR “replicaset_uuid”: MP_UUID box.info.cluster.uuid, } 2. box.status Description - generic blob about instance status. Its most commonly used and not frequently changed config options and box.info fields. Value - { MP_STR “is_ro”: MP_BOOL box.info.ro, MP_STR “is_ro_cfg”: MP_BOOL box.cfg.read_only, MP_STR “status”: MP_STR box.info.status, } 3. box.election Description - all the needed parts of box.info.election needed to find who is the most recent writable leader. Value - { MP_STR “term”: MP_UINT box.info.election.term, MP_STR “role”: MP_STR box.info.election.state, MP_STR “is_ro”: MP_BOOL box.info.ro, MP_STR “leader”: MP_UINT box.info.election.leader, } 4. box.schema Description - schema-related data. Currently it is only version. Value - { MP_STR “version”: MP_UINT schema_version, } Built-in events can't be override. Meaning, users can't be able to call box.broadcast(‘box.id’, any_data) etc. The events are available from the very beginning as not MP_NIL. It's necessary for supported local subscriptions. Otherwise no way to detect whether an event is even supported at all by this Tarantool version. If events are broadcast before box.cfg{}, then the following values will available: box.id = {} box.schema = {} box.status = {} box.election = {} This way the users will be able to distinguish an event being not supported at all from box.cfg{} being not called yet. Otherwise they would need to parse _TARANTOOL version string locally and peer_version in netbox. Example usage: * Client: ```lua conn = net.box.connect(URI) -- Subscribe to updates of key 'box.id' w = conn:watch('box.id', function(key, value) assert(key == 'box.id') -- do something with value end) -- or to updates of key 'box.status' w = conn:watch('box.status', function(key, value) assert(key == 'box.status') -- do something with value end) -- or to updates of key 'box.election' w = conn:watch('box.election', function(key, value) assert(key == 'box.election') -- do something with value end) -- or to updates of key 'box.schema' w = conn:watch('box.schema', function(key, value) assert(key == 'box.schema') -- do something with value end) -- Unregister the watcher when it's no longer needed. w:unregister() ``` --- Issue: https://github.com/tarantool/tarantool/issues/6260 Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v4 Hi, thanks for the useful review! > Hi! Good job on the fixes! > See 3 comments below, almost finished! > > > Built-in events can't be override. Meaning, users can't be able to call > > box.broadcast(‘box.id’, any_data) etc. > > > > The events are available from the very beginning as not MP_NIL. It will come > > in handy when local subscriptions will be supported (not via the network). > > 1. 'when local subscriptions will be supported (not via the network).' - they > are already supported. I wrote this sentence before Vova managed to implement > `box.watch()`. > > diff --git a/src/box/alter.cc b/src/box/alter.cc > > index 45a6b7f41..342f893b8 100644 > > --- a/src/box/alter.cc > > +++ b/src/box/alter.cc > > @@ -57,9 +57,17 @@ > > #include "sequence.h" > > #include "sql.h" > > #include "constraint_id.h" > > +#include "box.h" > > > > /* {{{ Auxiliary functions and methods. */ > > > > +static void > > +box_schema_version_bump(void) > > +{ > > + ++schema_version; > > + box_broadcast_schema(false); > > 2. The main problem with adding flags as booleans in function arguments is > that when you call the function, you can't tell from the call what does the > argument mean. For instance, if I don't know/remember the function's signature, > I can't tell what 'false' stands for. > > Normally such cases are ironed out by either splitting the function in 2 > (inside they can call some third function with this flag as an argument), or > by just not using the flag. > > Here, for example, the following can be done - just use these functions only > after box.cfg. Before that, as a first step, you broadcast the sys events > explicitly, manually: > > ==================== > diff --git a/src/box/alter.cc b/src/box/alter.cc > index 342f893b8..a7598a88f 100644 > --- a/src/box/alter.cc > +++ b/src/box/alter.cc > @@ -65,7 +65,7 @@ static void > box_schema_version_bump(void) > { > ++schema_version; > - box_broadcast_schema(false); > + box_broadcast_schema(); > } > > static int > @@ -4264,7 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event) > if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0) > return -1; > REPLICASET_UUID = uu; > - box_broadcast_id(false); > + box_broadcast_id(); > say_info("cluster uuid %s", tt_uuid_str(&uu)); > } else if (strcmp(key, "version") == 0) { > if (new_tuple != NULL) { > diff --git a/src/box/box.cc b/src/box/box.cc > index 9be1a5bbf..62693b3db 100644 > --- a/src/box/box.cc > +++ b/src/box/box.cc > @@ -155,7 +155,7 @@ static struct fiber_pool tx_fiber_pool; > static struct cbus_endpoint tx_prio_endpoint; > > static void > -box_broadcast_status(bool is_outside_box_cfg); > +box_broadcast_status(void); > > static void > title(const char *new_status) > @@ -164,7 +164,7 @@ title(const char *new_status) > title_set_status(new_status); > title_update(); > systemd_snotify("STATUS=%s", status); > - box_broadcast_status(false); > + box_broadcast_status(); > } > > void > @@ -180,8 +180,8 @@ box_update_ro_summary(void) > if (is_ro_summary) > engine_switch_to_ro(); > fiber_cond_broadcast(&ro_cond); > - box_broadcast_status(false); > - box_broadcast_election(false); > + box_broadcast_status(); > + box_broadcast_election(); > } > > const char * > @@ -3367,7 +3367,7 @@ bootstrap(const struct tt_uuid *instance_uuid, > else > tt_uuid_create(&INSTANCE_UUID); > > - box_broadcast_id(false); > + box_broadcast_id(); > say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID)); > > /* > @@ -3646,10 +3646,15 @@ box_init(void) > sequence_init(); > box_raft_init(); > box_watcher_init(); > - box_broadcast_id(true); > - box_broadcast_status(true); > - box_broadcast_election(true); > - box_broadcast_schema(true); > + > + /* > + * Default built-in events to help users distinguish an event being not > + * supported from box.cfg not being called yet. > + */ > + box_broadcast_fmt("box.status", "{}"); > + box_broadcast_fmt("box.election", "{}"); > + box_broadcast_fmt("box.id", "{}"); > + box_broadcast_fmt("box.schema", "{}"); > } > > bool > @@ -3952,22 +3957,17 @@ box_reset_stat(void) > } > > void > -box_broadcast_id(bool is_outside_box_cfg) > +box_broadcast_id(void) > { > char buf[1024]; > char *w = buf; > - > - if (is_outside_box_cfg) > - w = mp_encode_map(w, 0); > - else { > - w = mp_encode_map(w, 3); > - w = mp_encode_str0(w, "id"); > - w = mp_encode_uint(w, instance_id); > - w = mp_encode_str0(w, "instance_uuid"); > - w = mp_encode_uuid(w, &INSTANCE_UUID); > - w = mp_encode_str0(w, "replicaset_uuid"); > - w = mp_encode_uuid(w, &REPLICASET_UUID); > - } > + w = mp_encode_map(w, 3); > + w = mp_encode_str0(w, "id"); > + w = mp_encode_uint(w, instance_id); > + w = mp_encode_str0(w, "instance_uuid"); > + w = mp_encode_uuid(w, &INSTANCE_UUID); > + w = mp_encode_str0(w, "replicaset_uuid"); + w = mp_encode_uuid(w, &REPLICASET_UUID); > > box_broadcast("box.id", strlen("box.id"), buf, w); > > @@ -3975,22 +3975,17 @@ box_broadcast_id(bool is_outside_box_cfg) > } > > static void > -box_broadcast_status(bool is_outside_box_cfg) > +box_broadcast_status(void) > { > char buf[1024]; > char *w = buf; > - > - if (is_outside_box_cfg) > - w = mp_encode_map(w, 0); > - else { > - w = mp_encode_map(w, 3); > - w = mp_encode_str0(w, "is_ro"); > - w = mp_encode_bool(w, box_is_ro()); > - w = mp_encode_str0(w, "is_ro_cfg"); > - w = mp_encode_bool(w, cfg_geti("read_only")); > - w = mp_encode_str0(w, "status"); > - w = mp_encode_str0(w, box_status()); > - } > + w = mp_encode_map(w, 3); > + w = mp_encode_str0(w, "is_ro"); > + w = mp_encode_bool(w, box_is_ro()); > + w = mp_encode_str0(w, "is_ro_cfg"); > + w = mp_encode_bool(w, cfg_geti("read_only")); > + w = mp_encode_str0(w, "status"); > + w = mp_encode_str0(w, box_status()); > > box_broadcast("box.status", strlen("box.status"), buf, w); > > @@ -3998,26 +3993,21 @@ box_broadcast_status(bool is_outside_box_cfg) > } > > void > -box_broadcast_election(bool is_outside_box_cfg) > +box_broadcast_election(void) > { > struct raft *raft = box_raft(); > > char buf[1024]; > char *w = buf; > - > - if (is_outside_box_cfg) > - w = mp_encode_map(w, 0); > - else { > - w = mp_encode_map(w, 4); > - w = mp_encode_str0(w, "term"); > - w = mp_encode_uint(w, raft->term); > - w = mp_encode_str0(w, "role"); > - w = mp_encode_str0(w, raft_state_str(raft->state)); > - w = mp_encode_str0(w, "is_ro"); > - w = mp_encode_bool(w, box_is_ro()); > - w = mp_encode_str0(w, "leader"); > - w = mp_encode_uint(w, raft->leader); > - } > + w = mp_encode_map(w, 4); > + w = mp_encode_str0(w, "term"); > + w = mp_encode_uint(w, raft->term); > + w = mp_encode_str0(w, "role"); > + w = mp_encode_str0(w, raft_state_str(raft->state)); > + w = mp_encode_str0(w, "is_ro"); > + w = mp_encode_bool(w, box_is_ro()); > + w = mp_encode_str0(w, "leader"); > + w = mp_encode_uint(w, raft->leader); > > box_broadcast("box.election", strlen("box.election"), buf, w); > > @@ -4025,18 +4015,13 @@ box_broadcast_election(bool is_outside_box_cfg) > } > > void > -box_broadcast_schema(bool is_outside_box_cfg) > +box_broadcast_schema(void) > { > char buf[1024]; > char *w = buf; > - > - if (is_outside_box_cfg) > - w = mp_encode_map(w, 0); > - else { > - w = mp_encode_map(w, 1); > - w = mp_encode_str0(w, "version"); > - w = mp_encode_uint(w, box_schema_version()); > - } > + w = mp_encode_map(w, 1); > + w = mp_encode_str0(w, "version"); > + w = mp_encode_uint(w, box_schema_version()); > > box_broadcast("box.schema", strlen("box.schema"), buf, w); > > diff --git a/src/box/box.h b/src/box/box.h > index 41f60c0ea..766568368 100644 > --- a/src/box/box.h > +++ b/src/box/box.h > @@ -567,19 +567,19 @@ boxk(int type, uint32_t space_id, const char *format, ...); > * Broadcast the identification of the instance > */ > void > -box_broadcast_id(bool is_outside_box_cfg); > +box_broadcast_id(void); > > /** > * Broadcast the current election state of RAFT machinery > */ > void > -box_broadcast_election(bool is_outside_box_cfg); > +box_broadcast_election(void); > > /** > * Broadcast the current schema version > */ > void > -box_broadcast_schema(bool is_outside_box_cfg); > +box_broadcast_schema(void); > > #if defined(__cplusplus) > } /* extern "C" */ > diff --git a/src/box/raft.c b/src/box/raft.c > index 259be639e..efc7e1038 100644 > --- a/src/box/raft.c > +++ b/src/box/raft.c > @@ -170,7 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event) > * writable only after it clears its synchro queue. > */ > box_update_ro_summary(); > - box_broadcast_election(false); > + box_broadcast_election(); > if (raft->state != RAFT_STATE_LEADER) > return 0; > /* > diff --git a/src/box/replication.cc b/src/box/replication.cc > index 732fe6211..0da256721 100644 > --- a/src/box/replication.cc > +++ b/src/box/replication.cc > @@ -247,7 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id) > /* Assign local replica id */ > assert(instance_id == REPLICA_ID_NIL); > instance_id = replica_id; > - box_broadcast_id(false); > + box_broadcast_id(); > } else if (replica->anon) { > /* > * Set replica gc on its transition from > @@ -288,7 +288,7 @@ replica_clear_id(struct replica *replica) > /* See replica_check_id(). */ > assert(replicaset.is_joining); > instance_id = REPLICA_ID_NIL; > - box_broadcast_id(false); > + box_broadcast_id(); > } > replica->id = REPLICA_ID_NIL; > say_info("removed replica %s", tt_uuid_str(&replica->uuid)); > ==================== > > Now on each box_broadcast_() call you can tell that it > just broadcasts the event. No flags, not other parameters, simple to > read. > > diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua > > new file mode 100644 > > index 000000000..4170ce420 > > --- /dev/null > > +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua > > <...> > > > +g.test_sys_events_no_override = function(cg) > > + local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'} > > + > + for i = 1, #sys_events do > + t.assert_error_msg_content_equals("System event can't be override", > + function() > + cg.master:exec(function(sys_events, i) > + box.broadcast(sys_events[i], 'any_data') > + end, {sys_events, i}) > + end) > + end > > 3. Lua supports range loops. You can do this: > > ==================== > @@ -39,12 +39,12 @@ end) > g.test_sys_events_no_override = function(cg) > local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'} > > - for i = 1, #sys_events do > + for _, key in pairs(sys_events) do > t.assert_error_msg_content_equals("System event can't be override", > function() > - cg.master:exec(function(sys_events, i) > - box.broadcast(sys_events[i], 'any_data') > - end, {sys_events, i}) > + cg.master:exec(function(key) > + box.broadcast(key, 'any_data') > + end, {key}) > end) > end > ==================== > > Up to you. > > > Technically everything looks good except for the nits above. Now you also > should comb the patch to make the linters happy: luacheck and checkpatch. > > To run luacheck you need to install it. How - depends on your system. In > Linux I think it would be something like > > sudo apt install -y lua5.1 luarocks > sudo luarocks install luacheck > Or > > tarantoolctl rocks install luacheck (I didn't validate this) > > Then you run it with our config. This is how I do it in tarantool's root > dir (when I want to check 'test/' folder): > > luacheck --codes --config .luacheckrc test/ > > Then for checkpatch you need to clone the repository: > > https://github.com/tarantool/checkpatch > > For instance, I cloned it just in a neighbour folder near my tarantool/ > repo. Then from inside of tarantool/ I do this: > > ../checkpatch/checkpatch.pl --color=always --git head > > to check the latest commit. I suppose you can also specify head~1 for a > previous one, head~2 for pre-previous and so on. Maybe it also supports > git hash ranges, I didn't check yet. On your commit this linter raises some > errors. 1. I fixed the bugs following your comments. Now, I got OK from linter and luachek ==== .../unreleased/gh-6260-add-builtin-events.md | 4 + src/box/alter.cc | 21 +- src/box/box.cc | 121 ++++++- src/box/box.h | 18 + src/box/lua/watcher.c | 2 + src/box/raft.c | 1 + src/box/replication.cc | 2 + .../gh_6260_add_builtin_events_test.lua | 323 ++++++++++++++++++ 8 files changed, 478 insertions(+), 14 deletions(-) create mode 100644 changelogs/unreleased/gh-6260-add-builtin-events.md create mode 100644 test/box-luatest/gh_6260_add_builtin_events_test.lua diff --git a/changelogs/unreleased/gh-6260-add-builtin-events.md b/changelogs/unreleased/gh-6260-add-builtin-events.md new file mode 100644 index 000000000..1d13e410f --- /dev/null +++ b/changelogs/unreleased/gh-6260-add-builtin-events.md @@ -0,0 +1,4 @@ +## feature/core + +* Added predefined system events: `box.status`, `box.id`, `box.election` + and `box.schema` (gh-6260) diff --git a/src/box/alter.cc b/src/box/alter.cc index 45a6b7f41..a7598a88f 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -57,9 +57,17 @@ #include "sequence.h" #include "sql.h" #include "constraint_id.h" +#include "box.h" /* {{{ Auxiliary functions and methods. */ +static void +box_schema_version_bump(void) +{ + ++schema_version; + box_broadcast_schema(); +} + static int access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid, enum schema_object_type type, enum priv_type priv_type) @@ -1702,7 +1710,7 @@ void UpdateSchemaVersion::alter(struct alter_space *alter) { (void)alter; - ++schema_version; + box_schema_version_bump(); } /** @@ -2277,7 +2285,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * AlterSpaceOps are registered in case of space * create. */ - ++schema_version; + box_schema_version_bump(); /* * So may happen that until the DDL change record * is written to the WAL, the space is used for @@ -2391,7 +2399,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) * deleting the space from the space_cache, since no * AlterSpaceOps are registered in case of space drop. */ - ++schema_version; + box_schema_version_bump(); struct trigger *on_commit = txn_alter_trigger_new(on_drop_space_commit, old_space); if (on_commit == NULL) @@ -4256,6 +4264,7 @@ on_replace_dd_schema(struct trigger * /* trigger */, void *event) if (tuple_field_uuid(new_tuple, BOX_CLUSTER_FIELD_UUID, &uu) != 0) return -1; REPLICASET_UUID = uu; + box_broadcast_id(); say_info("cluster uuid %s", tt_uuid_str(&uu)); } else if (strcmp(key, "version") == 0) { if (new_tuple != NULL) { @@ -5102,7 +5111,7 @@ on_replace_dd_trigger(struct trigger * /* trigger */, void *event) txn_stmt_on_rollback(stmt, on_rollback); txn_stmt_on_commit(stmt, on_commit); - ++schema_version; + box_schema_version_bump(); return 0; } @@ -5629,7 +5638,7 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event) space_reset_fk_constraint_mask(child_space); space_reset_fk_constraint_mask(parent_space); } - ++schema_version; + box_schema_version_bump(); return 0; } @@ -5885,7 +5894,7 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event) if (trigger_run(&on_alter_space, space) != 0) return -1; - ++schema_version; + box_schema_version_bump(); return 0; } diff --git a/src/box/box.cc b/src/box/box.cc index 5414f3d9d..7f622c587 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -85,6 +85,7 @@ #include "audit.h" #include "trivia/util.h" #include "version.h" +#include "mp_uuid.h" static char status[64] = "unknown"; @@ -98,14 +99,6 @@ double txn_timeout_default; struct rlist box_on_shutdown_trigger_list = RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list); -static void title(const char *new_status) -{ - snprintf(status, sizeof(status), "%s", new_status); - title_set_status(new_status); - title_update(); - systemd_snotify("STATUS=%s", status); -} - const struct vclock *box_vclock = &replicaset.vclock; /** @@ -161,6 +154,28 @@ static struct fiber_pool tx_fiber_pool; */ static struct cbus_endpoint tx_prio_endpoint; +static void +builtin_events_init(void); + +/** + * Broadcast the current instance status + */ +static void +box_broadcast_status(void); + +/** + * Generate and update the instance status title + */ +static void +title(const char *new_status) +{ + snprintf(status, sizeof(status), "%s", new_status); + title_set_status(new_status); + title_update(); + systemd_snotify("STATUS=%s", status); + box_broadcast_status(); +} + void box_update_ro_summary(void) { @@ -174,6 +189,8 @@ box_update_ro_summary(void) if (is_ro_summary) engine_switch_to_ro(); fiber_cond_broadcast(&ro_cond); + box_broadcast_status(); + box_broadcast_election(); } const char * @@ -3359,6 +3376,7 @@ bootstrap(const struct tt_uuid *instance_uuid, else tt_uuid_create(&INSTANCE_UUID); + box_broadcast_id(); say_info("instance uuid %s", tt_uuid_str(&INSTANCE_UUID)); /* @@ -3637,6 +3655,12 @@ box_init(void) sequence_init(); box_raft_init(); box_watcher_init(); + + /* + * Default built-in events to help users distinguish an event + * being not supported from box.cfg not being called yet. + */ + builtin_events_init(); } bool @@ -3937,3 +3961,84 @@ box_reset_stat(void) engine_reset_stat(); space_foreach(box_reset_space_stat, NULL); } + +static void +builtin_events_init(void) +{ + box_broadcast_fmt("box.id", "{}"); + box_broadcast_fmt("box.schema", "{}"); + box_broadcast_fmt("box.status", "{}"); + box_broadcast_fmt("box.election", "{}"); +} + +void +box_broadcast_id(void) +{ + char buf[1024]; + char *w = buf; + w = mp_encode_map(w, 3); + w = mp_encode_str0(w, "id"); + w = mp_encode_uint(w, instance_id); + w = mp_encode_str0(w, "instance_uuid"); + w = mp_encode_uuid(w, &INSTANCE_UUID); + w = mp_encode_str0(w, "replicaset_uuid"); + w = mp_encode_uuid(w, &REPLICASET_UUID); + + box_broadcast("box.id", strlen("box.id"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +static void +box_broadcast_status(void) +{ + char buf[1024]; + char *w = buf; + w = mp_encode_map(w, 3); + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + w = mp_encode_str0(w, "is_ro_cfg"); + w = mp_encode_bool(w, cfg_geti("read_only")); + w = mp_encode_str0(w, "status"); + w = mp_encode_str0(w, box_status()); + + box_broadcast("box.status", strlen("box.status"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +void +box_broadcast_election(void) +{ + struct raft *raft = box_raft(); + + char buf[1024]; + char *w = buf; + w = mp_encode_map(w, 4); + w = mp_encode_str0(w, "term"); + w = mp_encode_uint(w, raft->term); + w = mp_encode_str0(w, "role"); + w = mp_encode_str0(w, raft_state_str(raft->state)); + w = mp_encode_str0(w, "is_ro"); + w = mp_encode_bool(w, box_is_ro()); + w = mp_encode_str0(w, "leader"); + w = mp_encode_uint(w, raft->leader); + + box_broadcast("box.election", strlen("box.election"), buf, w); + + assert((size_t)(w - buf) < 1024); +} + +void +box_broadcast_schema(void) +{ + char buf[1024]; + char *w = buf; + w = mp_encode_map(w, 1); + w = mp_encode_str0(w, "version"); + w = mp_encode_uint(w, box_schema_version()); + + box_broadcast("box.schema", strlen("box.schema"), buf, w); + + assert((size_t)(w - buf) < 1024); +} diff --git a/src/box/box.h b/src/box/box.h index b594f6646..766568368 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -563,6 +563,24 @@ box_process_rw(struct request *request, struct space *space, int boxk(int type, uint32_t space_id, const char *format, ...); +/** + * Broadcast the identification of the instance + */ +void +box_broadcast_id(void); + +/** + * Broadcast the current election state of RAFT machinery + */ +void +box_broadcast_election(void); + +/** + * Broadcast the current schema version + */ +void +box_broadcast_schema(void); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/lua/watcher.c b/src/box/lua/watcher.c index 01a307f1d..dd843863e 100644 --- a/src/box/lua/watcher.c +++ b/src/box/lua/watcher.c @@ -175,6 +175,8 @@ lbox_broadcast(struct lua_State *L) return luaL_error(L, "Usage: box.broadcast(key[, value])"); size_t key_len; const char *key = luaL_checklstring(L, 1, &key_len); + if (strncmp(key, "box.", 4) == 0) + return luaL_error(L, "System event can't be override"); struct ibuf *ibuf = cord_ibuf_take(); const char *data = NULL; const char *data_end = NULL; diff --git a/src/box/raft.c b/src/box/raft.c index be6009cc1..efc7e1038 100644 --- a/src/box/raft.c +++ b/src/box/raft.c @@ -170,6 +170,7 @@ box_raft_on_update_f(struct trigger *trigger, void *event) * writable only after it clears its synchro queue. */ box_update_ro_summary(); + box_broadcast_election(); if (raft->state != RAFT_STATE_LEADER) return 0; /* diff --git a/src/box/replication.cc b/src/box/replication.cc index bbf4b7417..0da256721 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -247,6 +247,7 @@ replica_set_id(struct replica *replica, uint32_t replica_id) /* Assign local replica id */ assert(instance_id == REPLICA_ID_NIL); instance_id = replica_id; + box_broadcast_id(); } else if (replica->anon) { /* * Set replica gc on its transition from @@ -287,6 +288,7 @@ replica_clear_id(struct replica *replica) /* See replica_check_id(). */ assert(replicaset.is_joining); instance_id = REPLICA_ID_NIL; + box_broadcast_id(); } replica->id = REPLICA_ID_NIL; say_info("removed replica %s", tt_uuid_str(&replica->uuid)); diff --git a/test/box-luatest/gh_6260_add_builtin_events_test.lua b/test/box-luatest/gh_6260_add_builtin_events_test.lua new file mode 100644 index 000000000..b3a67ee56 --- /dev/null +++ b/test/box-luatest/gh_6260_add_builtin_events_test.lua @@ -0,0 +1,323 @@ +local t = require('luatest') +local net = require('net.box') +local cluster = require('test.luatest_helpers.cluster') +local server = require('test.luatest_helpers.server') + +local g = t.group('gh_6260') + +g.test_subscriptions_outside_box_cfg = function() + local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'} + + for _, val in ipairs(sys_events) do + local result = {} + local result_no = 0 + local watcher = box.watch(val, + function(name, state) + assert(name == val) + result = state + result_no = result_no + 1 + end) + + t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end) + t.assert_equals(result, {}) + watcher:unregister() + end +end + +g.before_test('test_sys_events_no_override', function(cg) + cg.cluster = cluster:new({}) + cg.master = cg.cluster:build_server({alias = 'master'}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + +g.after_test('test_sys_events_no_override', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + +g.test_sys_events_no_override = function(cg) + local sys_events = {'box.id', 'box.status', 'box.election', 'box.schema'} + + for _, val in ipairs(sys_events) do + t.assert_error_msg_content_equals("System event can't be override", + function() + cg.master:exec(function(key) + box.broadcast(key, 'any_data') + end, {val}) + end) + end +end + +g.before_test('test_box_status', function(cg) + cg.cluster = cluster:new({}) + cg.master = cg.cluster:build_server({alias = 'master'}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + +g.after_test('test_box_status', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + +g.test_box_status = function(cg) + local c = net.connect(cg.master.net_box_uri) + + local result = {} + local result_no = 0 + local watcher = c:watch('box.status', + function(name, state) + assert(name == 'box.status') + result = state + result_no = result_no + 1 + end) + + -- initial state should arrive + t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end) + + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + -- test orphan status appearance + cg.master:exec(function(repl) + box.cfg{ + replication = repl, + replication_connect_timeout = 0.001, + replication_timeout = 0.001, + } + end, {{server.build_instance_uri('master'), + server.build_instance_uri('replica')}}) + -- here we have 2 notifications: entering ro when can't connect + -- to master and the second one when going orphan + t.helpers.retrying({}, function() t.assert_equals(result_no, 3) end) + t.assert_equals(result, + {is_ro = true, is_ro_cfg = false, status = 'orphan'}) + + -- test ro_cfg appearance + cg.master:exec(function() + box.cfg{ + replication = {}, + read_only = true, + } + end) + t.helpers.retrying({}, function() t.assert_equals(result_no, 4) end) + t.assert_equals(result, + {is_ro = true, is_ro_cfg = true, status = 'running'}) + + -- reset to rw + cg.master:exec(function() + box.cfg{ + read_only = false, + } + end) + t.helpers.retrying({}, function() t.assert_equals(result_no, 5) end) + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + -- turning manual election mode puts into ro + cg.master:exec(function() + box.cfg{ + election_mode = 'manual', + } + end) + t.helpers.retrying({}, function() t.assert_equals(result_no, 6) end) + t.assert_equals(result, + {is_ro = true, is_ro_cfg = false, status = 'running'}) + + -- promotion should turn rm + cg.master:exec(function() box.ctl.promote() end) + t.helpers.retrying({}, function() t.assert_equals(result_no, 7) end) + t.assert_equals(result, + {is_ro = false, is_ro_cfg = false, status = 'running'}) + + watcher:unregister() + c:close() +end + +g.before_test('test_box_election', function(cg) + cg.cluster = cluster:new({}) + + local box_cfg = { + replication = { + server.build_instance_uri('instance_1'), + server.build_instance_uri('instance_2'), + server.build_instance_uri('instance_3'), + }, + replication_connect_quorum = 0, + election_mode = 'off', + replication_synchro_quorum = 2, + replication_synchro_timeout = 1, + replication_timeout = 0.25, + election_timeout = 0.25, + } + + cg.instance_1 = cg.cluster:build_server( + {alias = 'instance_1', box_cfg = box_cfg}) + + cg.instance_2 = cg.cluster:build_server( + {alias = 'instance_2', box_cfg = box_cfg}) + + cg.instance_3 = cg.cluster:build_server( + {alias = 'instance_3', box_cfg = box_cfg}) + + cg.cluster:add_server(cg.instance_1) + cg.cluster:add_server(cg.instance_2) + cg.cluster:add_server(cg.instance_3) + cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + +g.after_test('test_box_election', function(cg) + cg.cluster.servers = nil + cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3}) +end) + +g.test_box_election = function(cg) + local c = {} + c[1] = net.connect(cg.instance_1.net_box_uri) + c[2] = net.connect(cg.instance_2.net_box_uri) + c[3] = net.connect(cg.instance_3.net_box_uri) + + local res = {} + local res_n = {0, 0, 0} + + for i = 1, 3 do + c[i]:watch('box.election', + function(n, s) + t.assert_equals(n, 'box.election') + res[i] = s + res_n[i] = res_n[i] + 1 + end) + end + t.helpers.retrying({}, function() + t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3) + end) + + -- verify all instances are in the same state + t.assert_equals(res[1], res[2]) + t.assert_equals(res[1], res[3]) + + -- wait for elections to complete, verify leader is the instance_1 + -- trying to avoid the exact number of term - it can vary + local instance1_id = cg.instance_1:instance_id() + + cg.instance_1:exec(function() box.cfg{election_mode='candidate'} end) + cg.instance_2:exec(function() box.cfg{election_mode='voter'} end) + cg.instance_3:exec(function() box.cfg{election_mode='voter'} end) + + cg.instance_1:wait_election_leader_found() + cg.instance_2:wait_election_leader_found() + cg.instance_3:wait_election_leader_found() + + t.assert_covers(res[1], + {leader = instance1_id, is_ro = false, role = 'leader'}) + t.assert_covers(res[2], + {leader = instance1_id, is_ro = true, role = 'follower'}) + t.assert_covers(res[3], + {leader = instance1_id, is_ro = true, role = 'follower'}) + + -- verify all terms are in the same state + t.assert_equals(res[1].term, res[2].term) + t.assert_equals(res[1].term, res[3].term) + + -- check the stepping down is working + res_n = {0, 0, 0} + cg.instance_1:exec(function() box.cfg{election_mode='voter'} end) + t.helpers.retrying({}, function() + t.assert_equals(res_n[1] + res_n[2] + res_n[3], 3) + end) + + local expected = {is_ro = true, role = 'follower', term = res[1].term, + leader = 0} + t.assert_covers(res, {expected, expected, expected}) + + c[1]:close() + c[2]:close() + c[3]:close() +end + +g.before_test('test_box_schema', function(cg) + cg.cluster = cluster:new({}) + cg.master = cg.cluster:build_server({alias = 'master'}) + cg.cluster:add_server(cg.master) + cg.cluster:start() +end) + +g.after_test('test_box_schema', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + +g.test_box_schema = function(cg) + local c = net.connect(cg.master.net_box_uri) + local version = 0 + local version_n = 0 + + local watcher = c:watch('box.schema', + function(n, s) + assert(n == 'box.schema') + version = s.version + version_n = version_n + 1 + end) + + cg.master:exec(function() box.schema.create_space('p') end) + t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end) + -- there'll be 2 changes - index and space + -- first version change, use it as initial value + local init_version = version + + version_n = 0 + cg.master:exec(function() box.space.p:create_index('i') end) + t.helpers.retrying({}, function() t.assert_equals(version_n, 1) end) + t.assert_equals(version, init_version + 1) + + version_n = 0 + cg.master:exec(function() box.space.p:drop() end) + t.helpers.retrying({}, function() t.assert_equals(version_n, 2) end) + -- there'll be 2 changes - index and space + t.assert_equals(version, init_version + 3) + + watcher:unregister() + c:close() +end + +g.before_test('test_box_id', function(cg) + cg.cluster = cluster:new({}) + + local box_cfg = { + replicaset_uuid = t.helpers.uuid('ab', 1), + instance_uuid = t.helpers.uuid('1', '2', '3') + } + + cg.instance = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg}) + cg.cluster:add_server(cg.instance) + cg.cluster:start() +end) + +g.after_test('test_box_id', function(cg) + cg.cluster.servers = nil + cg.cluster:drop() +end) + +g.test_box_id = function(cg) + local c = net.connect(cg.instance.net_box_uri) + + local result = {} + local result_no = 0 + + local watcher = c:watch('box.id', + function(name, state) + assert(name == 'box.id') + result = state + result_no = result_no + 1 + end) + + -- initial state should arrive + t.helpers.retrying({}, function() t.assert_equals(result_no, 1) end) + t.assert_equals(result, {id = 1, + instance_uuid = cg.instance.box_cfg.instance_uuid, + replicaset_uuid = cg.instance.box_cfg.replicaset_uuid}) + + watcher:unregister() + c:close() +end -- 2.25.1