[Tarantool-patches] [PATCH v3] net.box: add predefined system events for pub/sub

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Feb 15 00:35:37 MSK 2022


Hi! Thanks for the patch, good job!

See some new comments below!

On 12.02.2022 02:18, Yan Shtunder via Tarantool-patches wrote:
> Adding predefined system events: box.status, box.id, box.election
> and box.schema.
> 
> Closes #6260
> 
> NO_CHANGELOG=test stuff
> NO_DOC=testing stuff

1. This is a new public API. Lets add both a docbot request and a changelog.

> ---
> Issue: https://github.com/tarantool/tarantool/issues/6260
> Patch: https://github.com/tarantool/tarantool/tree/yshtunder/gh-6260-events-v3
> 
>  src/box/alter.cc                              |   9 +
>  src/box/box.cc                                | 103 +++++-
>  src/box/box.h                                 |  18 +
>  src/box/mp_error.cc                           |   6 -
>  src/box/raft.c                                |   1 +
>  src/box/replication.cc                        |  16 +
>  src/lib/msgpuck                               |   2 +-
>  src/lib/raft/raft.c                           |   1 +
>  .../gh_6260_add_builtin_events_test.lua       | 320 ++++++++++++++++++
>  test/unit/mp_error.cc                         |   6 -
>  10 files changed, 462 insertions(+), 20 deletions(-)
>  create mode 100644 test/app-luatest/gh_6260_add_builtin_events_test.lua
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 65c1cb952..e9899b968 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -1685,6 +1686,7 @@ UpdateSchemaVersion::alter(struct alter_space *alter)
>  {
>      (void)alter;
>      ++schema_version;
> +    box_broadcast_schema();

2. The schema version increment + its broadcast are duplicated 6 times. Lets
wrap it into a function like

	box_schema_version_bump()


3. A response to Vova's comment about box_broadcast_fmt() - while I am not
against that, I am just afraid it might not work with UUIDs. AFAIR we don't
have a formatter for UUID or any other MP_EXT type. You can still try to use
box_broadcast_fmt() for events having only MessagePack-native types though.
Such as schema version and election event.


4. There is a CI failure:
https://github.com/tarantool/tarantool/runs/5164254300?check_suite_focus=true.
Msgpuck submodule was bumped, but the new version doesn't update its test's
.result file.

> diff --git a/src/box/box.cc b/src/box/box.cc
> index 6a33203df..d72bd3dad 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc> @@ -161,6 +155,20 @@ static struct fiber_pool tx_fiber_pool;
>   */
>  static struct cbus_endpoint tx_prio_endpoint;
> 
> +static void
> +box_broadcast_status(void);
> +
> +static void
> +title(const char *new_status)
> +{
> +	snprintf(status, sizeof(status), "%s", new_status);
> +	title_set_status(new_status);
> +	title_update();
> +	systemd_snotify("STATUS=%s", status);
> +	/* Checking box.info.status change */

5. The comment narrates the code, doesn't tell anything new. I would
suggest to drop it. The same in other broadcast calls for all events.

> +	box_broadcast_status();
> +}
> +
>  void
>  box_update_ro_summary(void)
>  {
> @@ -3933,3 +3943,82 @@ box_reset_stat(void)
>  	engine_reset_stat();
>  	space_foreach(box_reset_space_stat, NULL);
>  }
> +
> +void
> +box_broadcast_id(void)
> +{
> +	char buf[1024];
> +	char *w = buf;
> +
> +	struct replica *replica = replica_by_uuid(&INSTANCE_UUID);
> +	uint32_t id = (replica == NULL) ? 0 : replica->id;

6. You don't need to get the replica. You can use the global variable
'instance_id'.

> +
> +	w = mp_encode_map(w, 3);
> +	w = mp_encode_str0(w, "id");
> +	w = mp_encode_uint(w, id);
> +	w = mp_encode_str0(w, "instance_uuid");
> +	w = mp_encode_uuid(w, &INSTANCE_UUID);
> +	w = mp_encode_str0(w, "replicaset_uuid");
> +	w = mp_encode_uuid(w, &REPLICASET_UUID);
> +
> +	box_broadcast("box.id", strlen("box.id"), buf, w);
> +
> +	assert((size_t)(w - buf) < 1024);
> +}
> diff --git a/src/box/mp_error.cc b/src/box/mp_error.cc
> index 3c4176e59..93109562d 100644
> --- a/src/box/mp_error.cc
> +++ b/src/box/mp_error.cc
> @@ -169,12 +169,6 @@ mp_sizeof_error_one(const struct error *error)
>  	return data_size;
>  }
> 
> -static inline char *
> -mp_encode_str0(char *data, const char *str)
> -{
> -	return mp_encode_str(data, str, strlen(str));
> -}

7. This and the msgpuck submodule update better be done in a separate
preparatory commit.

> -
>  static char *
>  mp_encode_error_one(char *data, const struct error *error)
>  {
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index bbf4b7417..05d5ffb58 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -183,8 +183,12 @@ replica_new(void)
>  		diag_raise();
>  	}
>  	replica->id = 0;
> +	/* Checking box.info.id change */
> +	box_broadcast_id();
>  	replica->anon = false;
>  	replica->uuid = uuid_nil;
> +	/* Checking box.info.uuid change */
> +	box_broadcast_id();

8. You don't need to broadcast on every line where a change happens.
Anyway nothing is sent until a next yield. You only need to broadcast
when all the assignments are done and you are leaving the context.


9. Here not necessarily the instance's own data is changed. It
is a generic replica constructor. It is called for every record in
_cluster space. You only need to broadcast the places where own IDs
of the current instance are changed. The global variables 'instance_id',
'INSTANCE_UUID', and 'REPLICASET_UUID'.

> diff --git a/src/lib/raft/raft.c b/src/lib/raft/raft.c
> index 89cfb3c17..489df235b 100644
> --- a/src/lib/raft/raft.c
> +++ b/src/lib/raft/raft.c
> @@ -494,6 +494,7 @@ raft_process_msg(struct raft *raft, const struct raft_msg *req, uint32_t source)
>  			 * it manually.
>  			 */
>  			raft->leader = 0;
> +			raft_schedule_broadcast(raft);

10. This should be a separate commit with its own unit test. Sergos already made
a PR, but it lacks a test. I suggest to finish it and rebase this patchset on
top of that commit.

>  			if (raft->is_candidate)
>  				raft_sm_schedule_new_election(raft);
>  		}
> diff --git a/test/app-luatest/gh_6260_add_builtin_events_test.lua b/test/app-luatest/gh_6260_add_builtin_events_test.lua
> new file mode 100644
> index 000000000..3c363fc9a
> --- /dev/null
> +++ b/test/app-luatest/gh_6260_add_builtin_events_test.lua

11. The test should be in box-luatest, not app-luatest.

> @@ -0,0 +1,320 @@
> +local t = require('luatest')
> +local net = require('net.box')
> +local cluster = require('test.luatest_helpers.cluster')
> +local helpers = require('test.luatest_helpers')
> +local fiber = require('fiber')
> +
> +local g = t.group('gh_6260')
> +
> +
> +g.before_test('test_box_status', function(cg)
> +    cg.cluster = cluster:new({})
> +
> +    local box_cfg = {
> +        read_only = false

12. It is false by default anyway. You don't need this config.

> +    }
> +
> +    cg.master = cg.cluster:build_server({alias = 'master', box_cfg = box_cfg})
> +    cg.cluster:add_server(cg.master)
> +    cg.cluster:start()
> +end)
> +
> +
> +g.after_test('test_box_status', function(cg)
> +    cg.cluster.servers = nil
> +    cg.cluster:drop()
> +end)
> +
> +
> +g.test_box_status = function(cg)
> +    local c = net.connect(cg.master.net_box_uri)
> +
> +    local result = {}
> +    local result_no = 0
> +    local watcher = c:watch('box.status',
> +                            function(name, state)
> +                                assert(name == 'box.status')
> +                                result = state
> +                                result_no = result_no + 1
> +                            end)
> +
> +    -- initial state should arrive
> +    while result_no < 1 do fiber.sleep(0.001) end

13. Lets better use t.helpers.retrying(). In other places too. See how it is used
in existing tests for examples.

> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    -- test orphan status appearance
> +    cg.master:eval(([[
> +        box.cfg{
> +            replication = {
> +                "%s",
> +                "%s"
> +            },
> +            replication_connect_timeout = 0.001,
> +            replication_timeout = 0.001,
> +        }
> +    ]]):format(helpers.instance_uri('master'),
> +               helpers.instance_uri('replica')))

14. Well, while this way of doing box.cfg certainly works, you might
want to consider an alternative via :exec(). Probably you didn't find how
to pass parameters into it? Here is how:

	cg.master:exec(function(repl)
		box.cfg{
			replication = repl,
			...
		}
	end, {{helpers.instance_uri('master'),
		helpers.instance_uri('replica')}})

> +    -- here we have 2 notifications: entering ro when can't connect
> +    -- to master and the second one when going orphan
> +    while result_no < 3 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = false, status = 'orphan'})
> +
> +    -- test ro_cfg appearance
> +    cg.master:eval([[
> +        box.cfg{
> +            replication = {},
> +            read_only = true,
> +        }
> +    ]])

15. Lets better use :exec().

> +    while result_no < 4 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = true, status = 'running'})
> +
> +    -- reset to rw
> +    cg.master:eval([[
> +        box.cfg{
> +            read_only = false,
> +        }
> +    ]])
> +    while result_no < 5 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    -- turning manual election mode puts into ro
> +    cg.master:eval([[
> +        box.cfg{
> +            election_mode = 'manual',
> +        }
> +    ]])
> +    while result_no < 6 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = true, is_ro_cfg = false, status = 'running'})
> +
> +    -- promotion should turn rm
> +    cg.master:eval([[
> +        box.ctl.promote()
> +    ]])
> +    while result_no < 7 do fiber.sleep(0.000001) end
> +    t.assert_equals(result,
> +                    {is_ro = false, is_ro_cfg = false, status = 'running'})
> +
> +    watcher:unregister()
> +    c:close()
> +end
> +
> +

16. Extra empty line. In some other places too. Please, lets remove them.

> +g.before_test('test_box_election', function(cg)
> +
> +    cg.cluster = cluster:new({})
> +
> +    local box_cfg = {
> +        replication = {
> +                        helpers.instance_uri('instance_', 1);
> +                        helpers.instance_uri('instance_', 2);
> +                        helpers.instance_uri('instance_', 3);

17. Too long indentation. Also you could write simpler:

	helpers.instance_uri('instance_1')
	helpers.instance_uri('instance_2')
	...

No need to pass the numbers separately.

> +        },
> +        replication_connect_quorum = 0,
> +        election_mode = 'off',
> +        replication_synchro_quorum = 2,
> +        replication_synchro_timeout = 1,
> +        replication_timeout = 0.25,
> +        election_timeout = 0.25,
> +    }
> +
> +    cg.instance_1 = cg.cluster:build_server(
> +        {alias = 'instance_1', box_cfg = box_cfg})
> +
> +    cg.instance_2 = cg.cluster:build_server(
> +        {alias = 'instance_2', box_cfg = box_cfg})
> +
> +    cg.instance_3 = cg.cluster:build_server(
> +        {alias = 'instance_3', box_cfg = box_cfg})
> +
> +    cg.cluster:add_server(cg.instance_1)
> +    cg.cluster:add_server(cg.instance_2)
> +    cg.cluster:add_server(cg.instance_3)
> +    cg.cluster:start({cg.instance_1, cg.instance_2, cg.instance_3})
> +end)
> +
> +
> +g.after_test('test_box_election', function(cg)
> +    cg.cluster.servers = nil
> +    cg.cluster:drop({cg.instance_1, cg.instance_2, cg.instance_3})
> +end)
> +
> +
> +g.test_box_election = function(cg)
> +    local c = {}
> +    c[1] = net.connect(cg.instance_1.net_box_uri)
> +    c[2] = net.connect(cg.instance_2.net_box_uri)
> +    c[3] = net.connect(cg.instance_3.net_box_uri)
> +
> +    local res = {}
> +    local res_n = {0, 0, 0}
> +
> +    for i = 1, 3 do
> +        c[i]:watch('box.election',
> +                   function(n, s)
> +                       t.assert_equals(n, 'box.election')
> +                       res[i] = s
> +                       res_n[i] = res_n[i] + 1
> +                   end)
> +    end
> +    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> +
> +    -- verify all instances are in the same state
> +    t.assert_equals(res[1], res[2])
> +    t.assert_equals(res[1], res[3])
> +
> +    -- wait for elections to complete, verify leader is the instance_1
> +    -- trying to avoid the exact number of term - it can vary
> +    local instance1_id = cg.instance_1:eval("return box.info.id")

18. You can do cg.instance_1:instance_id().

> +
> +    cg.instance_1:eval("box.cfg{election_mode='candidate'}")
> +    cg.instance_2:eval("box.cfg{election_mode='voter'}")
> +    cg.instance_3:eval("box.cfg{election_mode='voter'}")
> +
> +    cg.instance_1:wait_election_leader_found()
> +    cg.instance_2:wait_election_leader_found()
> +    cg.instance_3:wait_election_leader_found()
> +
> +    t.assert_equals(res[1].leader, instance1_id)
> +    t.assert_equals(res[1].is_ro, false)
> +    t.assert_equals(res[1].role, 'leader')

19. You can use t.assert_covers() if you want not to check a
specific term number.

> +    t.assert_equals(res[2].leader, instance1_id)
> +    t.assert_equals(res[2].is_ro, true)
> +    t.assert_equals(res[2].role, 'follower')
> +    t.assert_equals(res[3].leader, instance1_id)
> +    t.assert_equals(res[3].is_ro, true)
> +    t.assert_equals(res[3].role, 'follower')
> +    t.assert_equals(res[1].term, res[2].term)
> +    t.assert_equals(res[1].term, res[3].term)
> +
> +    -- check the stepping down is working
> +    res_n = {0, 0, 0}
> +    cg.instance_1:eval("box.cfg{election_mode='voter'}")
> +    while res_n[1] + res_n[2] + res_n[3] < 3 do fiber.sleep(0.00001) end
> +
> +    local expected = {is_ro = true, role = 'follower', term = res[1].term,
> +                      leader = 0}
> +    t.assert_equals(res[1], expected)
> +    t.assert_equals(res[2], expected)
> +    t.assert_equals(res[3], expected)
> +
> +    c[1]:close()
> +    c[2]:close()
> +    c[3]:close()
> +end

<...>

> +g.test_box_id = function(cg)
> +    local c = net.connect(cg.instance.net_box_uri)
> +
> +    local result = {}
> +    local result_no = 0
> +
> +    local watcher = c:watch('box.id',
> +                            function(name, state)
> +                                assert(name == 'box.id')
> +                                result = state
> +                                result_no = result_no + 1
> +                            end)
> +
> +    -- initial state should arrive
> +    while result_no < 1 do fiber.sleep(0.001) end
> +    t.assert_equals(result, {id = 1,
> +                    instance_uuid = '11111111-2222-0000-0000-333333333333',
> +                    replicaset_uuid = 'abababab-0000-0000-0000-000000000001'})

20. Lets better use cg.instance.box_cfg.instance_uuid and
cg.instance.box_cfg.replicaset_uuid instead of the hardcoded values.

> +
> +    -- error when instance_uuid set dynamically
> +    t.assert_error_msg_content_equals("Can't set option 'instance_uuid' dynamically",
> +                                function()
> +                                    cg.instance:eval(([[
> +                                        box.cfg{
> +                                            instance_uuid = "%s"
> +                                        }
> +                                    ]]):format(t.helpers.uuid('1', '2', '4')))
> +                                end)

21. This and the case below don't seem related to your patch. This was
banned beforehand and is already tested. I would suggest to drop these.

> +
> +    -- error when replicaset_uuid set dynamically
> +    t.assert_error_msg_content_equals("Can't set option 'replicaset_uuid' dynamically",
> +                                function()
> +                                    cg.instance:eval(([[
> +                                        box.cfg{
> +                                            replicaset_uuid = "%s"
> +                                        }
> +                                    ]]):format(t.helpers.uuid('ab', 2)))
> +                                end)
> +
> +    watcher:unregister()
> +    c:close()
> +end
> diff --git a/test/unit/mp_error.cc b/test/unit/mp_error.cc
> index 777c68dff..6afb73b22 100644
> --- a/test/unit/mp_error.cc
> +++ b/test/unit/mp_error.cc
> @@ -94,12 +94,6 @@ enum {
>  		sizeof(standard_errors) / sizeof(standard_errors[0]),
>  };
> 
> -static inline char *
> -mp_encode_str0(char *data, const char *str)
> -{
> -	return mp_encode_str(data, str, strlen(str));
> -}

22. Should be in the separate commit which bumps msgpuck submodule.


More information about the Tarantool-patches mailing list