From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 507FA2960D for ; Tue, 28 Aug 2018 12:19:17 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id FoQm87oeQfgS for ; Tue, 28 Aug 2018 12:19:17 -0400 (EDT) Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id CE6D929506 for ; Tue, 28 Aug 2018 12:19:16 -0400 (EDT) From: Georgy Kirichenko Subject: [tarantool-patches] [PATCH 2/2] On ctl event trigger Date: Tue, 28 Aug 2018 19:19:13 +0300 Message-Id: <7d9ccb894ef2915181454db39ead93497f9ff9aa.1535472838.git.georgy@tarantool.org> In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Georgy Kirichenko Introduce a ctl event trigger fired in cases of a bootstrap/recovery status changes, a space create/alter/drop action, an applier state change and shutdown. Trigger could be set with box.ctl_event even before the first box.cfg invocation to control recovery and bootstrap behavior. Event constants accessible via box.ctl_event.const() There are events: - RECOVERY - SPACE - SHUTDOWN - APPLIER A recovery event might have a status: * RECOVERY_SNAPSHOT_START * RECOVERY_SNAPSHOT_DONE * RECOVERY_HOT_STANDBY_START * RECOVERY_HOT_STANDBY_DONE * RECOVERY_XLOGS_DONE * RECOVERY_BOOTSTRAP_START * RECOVERY_BOOTSTRAP_DONE * RECOVERY_INITIAL_JOIN_START * RECOVERY_INITIAL_JOIN_DONE * RECOVERY_FINAL_JOIN_DONE A space event consists of space identifier and action: * SPACE_CREATE * SPACE_ALTER * SPACE_DELETE An applier event contains peer uuid and state: * APPLIER_OFF * APPLIER_CONNECT * APPLIER_CONNECTED * APPLIER_AUTH * APPLIER_READY * APPLIER_INITIAL_JOIN * APPLIER_FINAL_JOIN * APPLIER_JOINED * APPLIER_SYNC * APPLIER_FOLLOW * APPLIER_STOPPED * APPLIER_DISCONNECTED * APPLIER_LOADING Fixes: #3159 --- src/box/CMakeLists.txt | 2 + src/box/alter.cc | 16 ++ src/box/applier.cc | 6 + src/box/box.cc | 42 ++++ src/box/lua/init.c | 2 + src/box/lua/load_cfg.lua | 1 + test/box/ctl_event.result | 364 ++++++++++++++++++++++++++++++++++ test/box/ctl_event.test.lua | 76 +++++++ test/box/lua/trig_master.lua | 8 + test/box/lua/trig_replica.lua | 46 +++++ test/box/misc.result | 1 + 11 files changed, 564 insertions(+) create mode 100644 test/box/ctl_event.result create mode 100644 test/box/ctl_event.test.lua create mode 100644 test/box/lua/trig_master.lua create mode 100644 test/box/lua/trig_replica.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index cab6a2276..ee495a037 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -113,6 +113,7 @@ add_library(box STATIC journal.c wal.c call.c + ctl_event.c ${lua_sources} lua/init.c lua/call.c @@ -131,6 +132,7 @@ add_library(box STATIC lua/session.c lua/net_box.c lua/xlog.c + lua/ctl_event.c ${bin_sources}) target_link_libraries(box box_error tuple stat xrow xlog vclock crc32 scramble diff --git a/src/box/alter.cc b/src/box/alter.cc index b2758a4d9..8e44ada9b 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -52,6 +52,7 @@ #include "identifier.h" #include "version.h" #include "sequence.h" +#include "ctl_event.h" /** * chap-sha1 of empty string, i.e. @@ -1620,6 +1621,11 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) txn_alter_trigger_new(on_create_space_rollback, space); txn_on_rollback(txn, on_rollback); trigger_run_xc(&on_alter_space, space); + struct on_ctl_event event; + event.type = CTL_SPACE; + event.space.action = CTL_SPACE_CREATE; + event.space.space_id = old_id; + trigger_run(&on_ctl_trigger, &event); } else if (new_tuple == NULL) { /* DELETE */ access_check_ddl(old_space->def->name, old_space->def->id, old_space->def->uid, SC_SPACE, PRIV_D, true); @@ -1663,6 +1669,11 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) txn_alter_trigger_new(on_drop_space_rollback, space); txn_on_rollback(txn, on_rollback); trigger_run_xc(&on_alter_space, old_space); + struct on_ctl_event event; + event.type = CTL_SPACE; + event.space.action = CTL_SPACE_DELETE; + event.space.space_id = old_id; + trigger_run(&on_ctl_trigger, &event); } else { /* UPDATE, REPLACE */ assert(old_space != NULL && new_tuple != NULL); struct space_def *def = @@ -1720,6 +1731,11 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) alter_space_do(txn, alter); alter_guard.is_active = false; trigger_run_xc(&on_alter_space, alter->new_space); + struct on_ctl_event event; + event.type = CTL_SPACE; + event.space.action = CTL_SPACE_ALTER; + event.space.space_id = old_id; + trigger_run(&on_ctl_trigger, &event); } } diff --git a/src/box/applier.cc b/src/box/applier.cc index 28df8f7ca..4a42c6b43 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -48,6 +48,7 @@ #include "error.h" #include "session.h" #include "cfg.h" +#include "ctl_event.h" STRS(applier_state, applier_STATE); @@ -58,6 +59,11 @@ applier_set_state(struct applier *applier, enum applier_state state) say_debug("=> %s", applier_state_strs[state] + strlen("APPLIER_")); trigger_run_xc(&applier->on_state, applier); + struct on_ctl_event ctl_event; + ctl_event.type = CTL_APPLIER; + ctl_event.applier.replica_uuid = applier->uuid; + ctl_event.applier.status = state; + trigger_run(&on_ctl_trigger, &ctl_event); } /** diff --git a/src/box/box.cc b/src/box/box.cc index 0004140e9..01e512cc2 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -73,6 +73,7 @@ #include "call.h" #include "func.h" #include "sequence.h" +#include "ctl_event.h" static char status[64] = "unknown"; @@ -1602,6 +1603,9 @@ box_free(void) * initialized */ if (is_box_configured) { + struct on_ctl_event ctl_event; + ctl_event.type = CTL_SHUTDOWN; + trigger_run(&on_ctl_trigger, &ctl_event); #if 0 session_free(); user_cache_free(); @@ -1670,6 +1674,11 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid) if (boxk(IPROTO_DELETE, BOX_CLUSTER_ID, "[%u]", 1) != 0) diag_raise(); + struct on_ctl_event ctl_event; + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_BOOTSTRAP_START; + trigger_run(&on_ctl_trigger, &ctl_event); + /* Register the first replica in the replica set */ box_register_replica(replica_id, &INSTANCE_UUID); assert(replica_by_uuid(&INSTANCE_UUID)->id == 1); @@ -1690,6 +1699,9 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid) if (engine_begin_checkpoint() || engine_commit_checkpoint(&replicaset.vclock)) panic("failed to create a checkpoint"); + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_BOOTSTRAP_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); } /** @@ -1703,6 +1715,11 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid) static void bootstrap_from_master(struct replica *master) { + struct on_ctl_event ctl_event; + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_INITIAL_JOIN_START; + trigger_run(&on_ctl_trigger, &ctl_event); + struct applier *applier = master->applier; assert(applier != NULL); applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY); @@ -1725,6 +1742,9 @@ bootstrap_from_master(struct replica *master) */ engine_begin_initial_recovery_xc(NULL); applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY); + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_INITIAL_JOIN_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); /* * Process final data (WALs). @@ -1735,6 +1755,9 @@ bootstrap_from_master(struct replica *master) journal_set(&journal.base); applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_FINAL_JOIN_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); /* Clear the pointer to journal before it goes out of scope */ journal_set(NULL); @@ -1883,13 +1906,22 @@ local_recovery(const struct tt_uuid *instance_uuid, */ memtx_engine_recover_snapshot_xc(memtx, checkpoint_vclock); + struct on_ctl_event ctl_event; + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_SNAPSHOT_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); + engine_begin_final_recovery_xc(); recover_remaining_wals(recovery, &wal_stream.base, NULL, false); /* * Leave hot standby mode, if any, only after * acquiring the lock. */ + if (wal_dir_lock < 0) { + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_HOT_STANDBY_START; + trigger_run(&on_ctl_trigger, &ctl_event); title("hot_standby"); say_info("Entering hot standby mode"); recovery_follow_local(recovery, &wal_stream.base, "hot_standby", @@ -1908,11 +1940,17 @@ local_recovery(const struct tt_uuid *instance_uuid, * applied in hot standby mode. */ vclock_copy(&replicaset.vclock, &recovery->vclock); + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_HOT_STANDBY_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); box_listen(); box_sync_replication(false); } recovery_finalize(recovery); engine_end_recovery_xc(); + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_XLOGS_DONE; + trigger_run(&on_ctl_trigger, &ctl_event); /* Check replica set UUID. */ if (!tt_uuid_is_nil(replicaset_uuid) && @@ -2021,6 +2059,10 @@ box_cfg_xc(void) bool is_bootstrap_leader = false; if (last_checkpoint_lsn >= 0) { /* Recover the instance from the local directory */ + struct on_ctl_event ctl_event; + ctl_event.type = CTL_RECOVERY; + ctl_event.recovery.status = CTL_RECOVERY_SNAPSHOT_START; + trigger_run(&on_ctl_trigger, &ctl_event); local_recovery(&instance_uuid, &replicaset_uuid, &last_checkpoint_vclock); } else { diff --git a/src/box/lua/init.c b/src/box/lua/init.c index 694b5bfd3..140803384 100644 --- a/src/box/lua/init.c +++ b/src/box/lua/init.c @@ -52,6 +52,7 @@ #include "box/lua/stat.h" #include "box/lua/info.h" #include "box/lua/ctl.h" +#include "box/lua/ctl_event.h" #include "box/lua/session.h" #include "box/lua/net_box.h" #include "box/lua/cfg.h" @@ -306,6 +307,7 @@ box_lua_init(struct lua_State *L) box_lua_info_init(L); box_lua_stat_init(L); box_lua_ctl_init(L); + box_lua_ctl_event_init(L); box_lua_session_init(L); box_lua_xlog_init(L); luaopen_net_box(L); diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index c68a3583f..1935143a6 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -397,6 +397,7 @@ local box_cfg_guard_whitelist = { tuple = true; runtime = true; NULL = true; + ctl_event = true; }; local box = require('box') diff --git a/test/box/ctl_event.result b/test/box/ctl_event.result new file mode 100644 index 000000000..5da8cf40d --- /dev/null +++ b/test/box/ctl_event.result @@ -0,0 +1,364 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- create master instance +test_run:cmd("create server trig_master with script='box/lua/trig_master.lua'") +--- +- true +... +test_run:cmd("start server trig_master") +--- +- true +... +test_run:cmd("switch trig_master") +--- +- true +... +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- simple ctl_event_case +ctl_const = box.ctl_event.const() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_replace(old, new) + return box.tuple.new({new[1], new[2], 'M'}) +end; +--- +... +function on_ctl_trig(event) + if event.type == ctl_const.SPACE and + event.action == ctl_const.SPACE_CREATE then + local space = box.space[event.space_id] + space:before_replace(on_replace) + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +active_trig = box.ctl_event.on_ctl_event(on_ctl_trig) +--- +... +t1 = box.schema.space.create('trig1') +--- +... +_ = t1:create_index('pk') +--- +... +t1:replace({1, 2}) +--- +- [1, 2, 'M'] +... +t1:select() +--- +- - [1, 2, 'M'] +... +-- clear the trigger +box.ctl_event.on_ctl_event(nil, active_trig) +--- +... +t2 = box.schema.space.create('trig2') +--- +... +_ = t2:create_index('pk') +--- +... +t2:replace({1, 2}) +--- +- [1, 2] +... +t2:select() +--- +- - [1, 2] +... +test_run:cmd("push filter 'replica: [-0-9a-f]+' to 'server: '") +--- +- true +... +test_run:cmd("push filter 'space_id: [0-9]+' to 'space_id: '") +--- +- true +... +-- create replica and test bootstrap events +box.schema.user.grant('guest', 'replication', nil, nil, {if_not_exists = true}) +--- +... +test_run:cmd("create server trig_replica with rpl_master=trig_master, script='box/lua/trig_replica.lua'") +--- +- true +... +test_run:cmd("start server trig_replica") +--- +- true +... +t1:replace({2, 3}) +--- +- [2, 3, 'M'] +... +t1:select() +--- +- - [1, 2, 'M'] + - [2, 3, 'M'] +... +test_run:cmd("switch trig_replica") +--- +- true +... +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- list all events +events +--- +- - type: 4 + server: + status: 1 + - type: 1 + status: 8 + - type: 4 + server: + status: 2 + - type: 4 + server: + status: 4 + - type: 4 + server: + status: 5 + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 1 + status: 9 + - type: 4 + server: + status: 6 + - type: 2 + action: 1 + space_id: + - type: 2 + action: 1 + space_id: + - type: 1 + status: 10 + - type: 4 + server: + status: 7 + - type: 4 + server: + status: 4 + - type: 4 + server: + status: 8 + - type: 4 + server: + status: 9 +... +-- check before replace trigger +box.space.trig1:select() +--- +- - [1, 2, 'M', 'R'] + - [2, 3, 'M', 'R'] +... +test_run:cmd("switch trig_master") +--- +- true +... +test_run:cmd("stop server trig_replica") +--- +- true +... +t1:replace({3, 4}) +--- +- [3, 4, 'M'] +... +test_run:cmd("start server trig_replica") +--- +- true +... +test_run:cmd("switch trig_replica") +--- +- true +... +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- list all events +events +--- +- - type: 1 + status: 1 + - type: 4 + server: + status: 1 + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 2 + space_id: + - type: 2 + action: 1 + space_id: + - type: 2 + action: 1 + space_id: + - type: 1 + status: 2 + - type: 1 + status: 5 + - type: 4 + server: + status: 2 + - type: 4 + server: + status: 4 + - type: 4 + server: + status: 8 + - type: 4 + server: + status: 9 +... +-- check tuples changed only one time +box.space.trig1:select() +--- +- - [1, 2, 'M', 'R'] + - [2, 3, 'M', 'R'] + - [3, 4, 'M', 'R'] +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server trig_replica") +--- +- true +... +test_run:cmd("stop server trig_master") +--- +- true +... diff --git a/test/box/ctl_event.test.lua b/test/box/ctl_event.test.lua new file mode 100644 index 000000000..8cb97d80f --- /dev/null +++ b/test/box/ctl_event.test.lua @@ -0,0 +1,76 @@ +env = require('test_run') +test_run = env.new() + +-- create master instance +test_run:cmd("create server trig_master with script='box/lua/trig_master.lua'") +test_run:cmd("start server trig_master") +test_run:cmd("switch trig_master") +env = require('test_run') +test_run = env.new() + +-- simple ctl_event_case +ctl_const = box.ctl_event.const() +test_run:cmd("setopt delimiter ';'") +function on_replace(old, new) + return box.tuple.new({new[1], new[2], 'M'}) +end; +function on_ctl_trig(event) + if event.type == ctl_const.SPACE and + event.action == ctl_const.SPACE_CREATE then + local space = box.space[event.space_id] + space:before_replace(on_replace) + end +end; +test_run:cmd("setopt delimiter ''"); +active_trig = box.ctl_event.on_ctl_event(on_ctl_trig) +t1 = box.schema.space.create('trig1') +_ = t1:create_index('pk') +t1:replace({1, 2}) +t1:select() + +-- clear the trigger +box.ctl_event.on_ctl_event(nil, active_trig) +t2 = box.schema.space.create('trig2') +_ = t2:create_index('pk') +t2:replace({1, 2}) +t2:select() + +test_run:cmd("push filter 'replica: [-0-9a-f]+' to 'server: '") +test_run:cmd("push filter 'space_id: [0-9]+' to 'space_id: '") + +-- create replica and test bootstrap events +box.schema.user.grant('guest', 'replication', nil, nil, {if_not_exists = true}) +test_run:cmd("create server trig_replica with rpl_master=trig_master, script='box/lua/trig_replica.lua'") +test_run:cmd("start server trig_replica") +t1:replace({2, 3}) +t1:select() + +test_run:cmd("switch trig_replica") +env = require('test_run') +test_run = env.new() + +-- list all events +events + +-- check before replace trigger +box.space.trig1:select() + +test_run:cmd("switch trig_master") +test_run:cmd("stop server trig_replica") + +t1:replace({3, 4}) + +test_run:cmd("start server trig_replica") + +test_run:cmd("switch trig_replica") +env = require('test_run') +test_run = env.new() + +-- list all events +events + +-- check tuples changed only one time +box.space.trig1:select() +test_run:cmd("switch default") +test_run:cmd("stop server trig_replica") +test_run:cmd("stop server trig_master") diff --git a/test/box/lua/trig_master.lua b/test/box/lua/trig_master.lua new file mode 100644 index 000000000..fa253ba4f --- /dev/null +++ b/test/box/lua/trig_master.lua @@ -0,0 +1,8 @@ +#!/usr/bin/env tarantool +require('console').listen(os.getenv('ADMIN')) + +box.cfg({ + listen = os.getenv("LISTEN"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, +}) diff --git a/test/box/lua/trig_replica.lua b/test/box/lua/trig_replica.lua new file mode 100644 index 000000000..581bdfdb7 --- /dev/null +++ b/test/box/lua/trig_replica.lua @@ -0,0 +1,46 @@ +#!/usr/bin/env tarantool +require('console').listen(os.getenv('ADMIN')) + +events = {} + +local ctl_const = box.ctl_event.const() +local recovery_status = nil + +local function before_replace(old, new) + if recovery_status == ctl_const.RECOVERY_SNAPSHOT_START or + recovery_status == ctl_const.RECOVERY_SNAPSHOT_DONE then + -- local files + return new + end + if new == nil then + return new + end + local k = {new:unpack()} + table.insert(k, 'R') + return box.tuple.new(k) +end + + +local function ctl_event_trigger(event) + -- register the event + table.insert(events, event) + if event.type == ctl_const.RECOVERY then + recovery_status = event.status + end + if event.type == ctl_const.SPACE and + event.action == ctl_const.SPACE_CREATE then + if event.space_id > 511 then + local space = box.space[event.space_id] + space:before_replace(before_replace) + end + end +end + +box.ctl_event.on_ctl_event(ctl_event_trigger) + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, +}) diff --git a/test/box/misc.result b/test/box/misc.result index e213d7964..969081e59 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -62,6 +62,7 @@ t - cfg - commit - ctl + - ctl_event - error - feedback - index -- 2.18.0