From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 7D0F326062 for ; Thu, 14 Jun 2018 11:04:42 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id UizYYgWCTWFj for ; Thu, 14 Jun 2018 11:04:42 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 9575026138 for ; Thu, 14 Jun 2018 11:04:41 -0400 (EDT) From: Ilya Markov Subject: [tarantool-patches] [box.ctl 2/3] box.ctl: Add on_ctl_event trigger calls Date: Thu, 14 Jun 2018 18:04:28 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: georgy@tarantool.org Cc: tarantool-patches@freelists.org Add following cases of triggers: * System space recovery. Called on finish of bootstrap or finish of join or snapshot recovery. * Local recovery. Called on finish of bootstrap or finish of recovery. * Read_only/read_write. Called on changes of read_only state of instance. * Shutdown. Called on controlled shutdown. * Replicaset_add/replicaset_remove. Called on changes in space _cluster. Errors inside triggers are logged and don't influence on instance behaviour. Continue #3159 --- src/box/alter.cc | 1 + src/box/box.cc | 44 ++++++- src/box/engine.c | 14 +++ src/box/lua/ctl.c | 4 +- src/box/memtx_engine.c | 2 +- src/box/replication.cc | 14 +++ test/replication/master_onctl.lua | 34 +++++ test/replication/onctl.result | 250 +++++++++++++++++++++++++++++++++++++ test/replication/onctl.test.lua | 105 ++++++++++++++++ test/replication/replica_onctl.lua | 34 +++++ test/replication/suite.cfg | 1 + 11 files changed, 498 insertions(+), 5 deletions(-) create mode 100644 test/replication/master_onctl.lua create mode 100644 test/replication/onctl.result create mode 100644 test/replication/onctl.test.lua create mode 100644 test/replication/replica_onctl.lua diff --git a/src/box/alter.cc b/src/box/alter.cc index 6f6fcb0..7ec548b 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -52,6 +52,7 @@ #include "identifier.h" #include "version.h" #include "sequence.h" +#include "ctl.h" /** * chap-sha1 of empty string, i.e. diff --git a/src/box/box.cc b/src/box/box.cc index 26277e7..1de37d2 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -113,6 +113,11 @@ static fiber_cond ro_cond; */ static bool is_orphan = true; +/** + * Fiber used for on_ctl_event trigger. + */ +static fiber *ro_checker; + /* Use the shared instance of xstream for all appliers */ static struct xstream join_stream; static struct xstream subscribe_stream; @@ -1573,6 +1578,9 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) void box_free(void) { + if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0) + say_error("ctl_trigger error in shutdown: %s", + diag_last_error(diag_get())->errmsg); /* * See gh-584 "box_free() is called even if box is not * initialized @@ -1592,7 +1600,8 @@ box_free(void) engine_shutdown(); wal_thread_stop(); } - + if (!fiber_is_dead(ro_checker)) + fiber_cancel(ro_checker); fiber_cond_destroy(&ro_cond); } @@ -1693,6 +1702,10 @@ bootstrap_from_master(struct replica *master) engine_begin_initial_recovery_xc(NULL); applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY); + if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); + /* * Process final data (WALs). */ @@ -1755,11 +1768,35 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events) cbus_process(endpoint); } +static int +check_ro_f(MAYBE_UNUSED va_list ap) +{ + double timeout = TIMEOUT_INFINITY; + struct on_ctl_event_ctx ctx; + memset(&ctx, 0, sizeof(ctx)); + while (true) { + if (box_wait_ro(!box_is_ro(), timeout) != 0) { + if (fiber_is_cancelled()) + break; + else + return -1; + } + if (run_on_ctl_event_trigger_type( + box_is_ro() ? CTL_EVENT_READ_ONLY: + CTL_EVENT_READ_WRITE) < 0) + say_error("ctl_trigger error in %s: %s", + box_is_ro() ? "read_only" :"read_write", + diag_last_error(diag_get())->errmsg); + } + return 0; +} + void box_init(void) { fiber_cond_create(&ro_cond); - + ro_checker = fiber_new_xc("check_ro", check_ro_f); + fiber_start(ro_checker, NULL); user_cache_init(); /* * The order is important: to initialize sessions, @@ -1885,6 +1922,9 @@ box_cfg_xc(void) */ memtx_engine_recover_snapshot_xc(memtx, &last_checkpoint_vclock); + if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); engine_begin_final_recovery_xc(); recovery_follow_local(recovery, &wal_stream.base, "hot_standby", diff --git a/src/box/engine.c b/src/box/engine.c index 82293fd..fa78753 100644 --- a/src/box/engine.c +++ b/src/box/engine.c @@ -29,10 +29,12 @@ * SUCH DAMAGE. */ #include "engine.h" +#include "ctl.h" #include #include #include +#include RLIST_HEAD(engines); @@ -73,6 +75,14 @@ engine_bootstrap(void) if (engine->vtab->bootstrap(engine) != 0) return -1; } + if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); + + if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) + say_error("ctl_trigger error in local recovery: %s", + diag_last_error(diag_get())->errmsg); + return 0; } @@ -111,6 +121,10 @@ engine_end_recovery(void) if (engine->vtab->end_recovery(engine) != 0) return -1; } + if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) + say_error("ctl_trigger error in local recovery: %s", + diag_last_error(diag_get())->errmsg); + return 0; } diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index 52f320a..5bd9be3 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -122,8 +122,8 @@ box_lua_ctl_init(struct lua_State *L) lua_pushnumber(L, CTL_EVENT_SHUTDOWN); lua_setfield(L, -2, "SHUTDOWN"); lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD); - lua_setfield(L, -2, "CTL_EVENT_REPLICASET_ADD"); + lua_setfield(L, -2, "REPLICASET_ADD"); lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE); - lua_setfield(L, -2, "CTL_EVENT_REPLICASET_REMOVE"); + lua_setfield(L, -2, "REPLICASET_REMOVE"); lua_pop(L, 2); /* box, ctl */ } diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index fac84ce..e737ea3 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -48,6 +48,7 @@ #include "replication.h" #include "schema.h" #include "gc.h" +#include "ctl.h" static void txn_on_yield_or_stop(struct trigger *trigger, void *event) @@ -197,7 +198,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, */ if (!xlog_cursor_is_eof(&cursor)) panic("snapshot `%s' has no EOF marker", filename); - return 0; } diff --git a/src/box/replication.cc b/src/box/replication.cc index c1e1769..75aecd0 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -41,6 +41,7 @@ #include "error.h" #include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ +#include "ctl.h" uint32_t instance_id = REPLICA_ID_NIL; struct tt_uuid INSTANCE_UUID; @@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) replica->uuid = *replica_uuid; replica_hash_insert(&replicaset.hash, replica); replica_set_id(replica, replica_id); + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD; + on_ctl_ctx.replica_id = replica_id; + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica add: %s", + diag_last_error(diag_get())->errmsg); return replica; } @@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica) * Some records may arrive later on due to asynchronous nature of * replication. */ + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE; + on_ctl_ctx.replica_id = replica->id; + replicaset.replica_by_id[replica->id] = NULL; replica->id = REPLICA_ID_NIL; if (replica_is_orphan(replica)) { replica_hash_remove(&replicaset.hash, replica); replica_delete(replica); } + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica remove: %s", + diag_last_error(diag_get())->errmsg); } static void diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua new file mode 100644 index 0000000..e0eb39a --- /dev/null +++ b/test/replication/master_onctl.lua @@ -0,0 +1,34 @@ +#!/usr/bin/env tarantool +os = require('os') + +SYSTEM_SPACE_RECOVERY = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then + SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/onctl.result b/test/replication/onctl.result new file mode 100644 index 0000000..19b3e67 --- /dev/null +++ b/test/replication/onctl.result @@ -0,0 +1,250 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +test_run:cmd("create server master with script='replication/master_onctl.lua'") +--- +- true +... +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") +--- +- true +... +test_run:cmd("start server master") +--- +- true +... +test_run:cmd("switch master") +--- +- true +... +box.schema.user.grant('guest', 'replication') +--- +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 0 +... +READ_WRITE +--- +- 1 +... +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +--- +- - 1 + - 1 +... +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE +--- +- - 1 +... +REPLICASET_ADD = {} +--- +... +REPLICASET_REMOVE = {} +--- +... +new_replica_id = 0 +--- +... +deleted_replica_id = 0 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +_ = box.ctl.on_ctl_event(on_ctl_new) +--- +... +test_run:cmd("start server replica") +--- +- true +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 0 +... +test_run:cmd("switch replica") +--- +- true +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; +--- +... +function on_ctl_error(ctx) + error("trigger error") +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 0 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +box.cfg{read_only = true} +--- +... +fiber = require("fiber") +--- +... +while READ_ONLY == 0 do fiber.sleep(0.001) end +--- +... +READ_ONLY +--- +- 1 +... +box.cfg{on_ctl_event = on_ctl_error} +--- +... +box.cfg{read_only = false} +--- +... +test_run:grep_log('replica', 'ctl_trigger error') +--- +- ctl_trigger error +... +box.cfg{on_ctl_event = {box.NULL, on_ctl_error}} +--- +... +box.cfg{on_ctl_event = on_ctl_shutdown} +--- +... +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica. +--test_run:grep_log('replica', 'test replica shutdown') +test_run:cmd("switch master") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +_ = box.space._cluster:delete{2} +--- +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 0 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- - 2 +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 2 +... +box.ctl.on_ctl_event(nil, on_ctl_new) +--- +... +-- cleanup +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server master") +--- +- true +... +test_run:cmd("cleanup server master") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua new file mode 100644 index 0000000..ff6a898 --- /dev/null +++ b/test/replication/onctl.test.lua @@ -0,0 +1,105 @@ +env = require('test_run') +test_run = env.new() + +test_run:cmd("create server master with script='replication/master_onctl.lua'") +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") + +test_run:cmd("start server master") +test_run:cmd("switch master") +box.schema.user.grant('guest', 'replication') + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE + +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +new_replica_id = 0 +deleted_replica_id = 0 + +test_run:cmd("setopt delimiter ';'") +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +test_run:cmd("setopt delimiter ''"); + +_ = box.ctl.on_ctl_event(on_ctl_new) + +test_run:cmd("start server replica") + +REPLICASET_ADD +REPLICASET_REMOVE + +new_replica_id +deleted_replica_id + +test_run:cmd("switch replica") + +test_run:cmd("setopt delimiter ';'") +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; + +function on_ctl_error(ctx) + error("trigger error") +end; + +test_run:cmd("setopt delimiter ''"); + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +box.cfg{read_only = true} +fiber = require("fiber") +while READ_ONLY == 0 do fiber.sleep(0.001) end +READ_ONLY + +box.cfg{on_ctl_event = on_ctl_error} +box.cfg{read_only = false} +test_run:grep_log('replica', 'ctl_trigger error') +box.cfg{on_ctl_event = {box.NULL, on_ctl_error}} +box.cfg{on_ctl_event = on_ctl_shutdown} + +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica. +--test_run:grep_log('replica', 'test replica shutdown') + + +test_run:cmd("switch master") +box.schema.user.revoke('guest', 'replication') +_ = box.space._cluster:delete{2} + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +new_replica_id +deleted_replica_id + +box.ctl.on_ctl_event(nil, on_ctl_new) + +-- cleanup +test_run:cmd("switch default") +test_run:cmd("stop server master") +test_run:cmd("cleanup server master") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua new file mode 100644 index 0000000..d6ce73c --- /dev/null +++ b/test/replication/replica_onctl.lua @@ -0,0 +1,34 @@ +#!/usr/bin/env tarantool + +SYSTEM_SPACE_RECOVERY = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then + SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 95e94e5..365a825 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -6,6 +6,7 @@ "wal_off.test.lua": {}, "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, + "onctl.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.7.4