From: Konstantin Belyavskiy <k.belyavskiy@tarantool.org> To: tarantool-patches@freelists.org Subject: [tarantool-patches] [PATCH v2] box.ctl: implement a trigger on any control events Date: Fri, 3 Aug 2018 11:04:10 +0300 [thread overview] Message-ID: <20180803080410.14087-1-k.belyavskiy@tarantool.org> (raw) From: Ilya Markov <imarkov@tarantool.org> This patch is based on original Ilya Markov's patchset (gh-3159-box-on-ctl-event) Supported control events: * Completion of recovery of all system spaces. Called on finish of bootstrap or finish of join or snapshot recovery. * Completion of local recovery. Called on finish of bootstrap or finish of recovery. * Switch to read only/read write mode. * Replicaset add/remove. Called on changes replica set. * Controlled shutdown. * Replica error event. Replica fails with some error. Errors inside triggers are logged and don't influence on instance behaviour. All supported control events are exported to Lua as a constants within namespace box.ctl.event, here is a full list: - SYSTEM_SPACE_RECOVERY - LOCAL_RECOVERY - READ_ONLY - READ_WRITE - SHUTDOWN - REPLICASET_ADD - REPLICASET_REMOVE - REPLICA_CONNECTION_ERROR Introduce new configuration option: 'on_ctl_event' with type function and one one argument - context. For every events context has required field 'type', one of constants mentioned above. For REPLICASET_ADD, REPLICASET_REMOVE and REPLICA_CONNECTION_ERROR an additional field 'replica_id' is also available. Usage example. In Lua script a callback function with actions on specific controlled events must be defined, e.g: local function onctl(ctx) if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then -- specific action #1 elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then -- specific action #2 ... end end Then this function passed as a parameter to box.cfg: on_ctl_event = onctl Closes #3159 Changes in V2: Squashed 3 to 1 Small fixes: remove unused headers, rename function. Design changes: - remove fiber for ro condition checks. - remove support for 'table' type callback. - insert completion of recovery of all system spaces trigger right after sys spaces was recovered. - add documentation. --- Ticket: https://github.com/tarantool/tarantool/issues/3159 Branch: gh-3159-box-on-ctl-event-v2 src/box/CMakeLists.txt | 1 + src/box/box.cc | 63 ++++++++- src/box/box.h | 1 + src/box/ctl.c | 63 +++++++++ src/box/ctl.h | 79 +++++++++++ src/box/engine.c | 13 ++ src/box/lua/cfg.cc | 12 ++ src/box/lua/ctl.c | 54 ++++++++ src/box/lua/ctl.h | 2 + src/box/lua/load_cfg.lua | 10 +- src/box/memtx_engine.c | 27 +++- src/box/memtx_engine.h | 9 ++ src/box/relay.cc | 7 + src/box/replication.cc | 14 ++ src/cfg.c | 17 +++ src/cfg.h | 4 + test/replication/master_onctl.lua | 37 ++++++ test/replication/onctl.result | 259 +++++++++++++++++++++++++++++++++++++ test/replication/onctl.test.lua | 107 +++++++++++++++ test/replication/replica_onctl.lua | 34 +++++ test/replication/suite.cfg | 1 + 21 files changed, 807 insertions(+), 7 deletions(-) create mode 100644 src/box/ctl.c create mode 100644 src/box/ctl.h create mode 100644 test/replication/master_onctl.lua create mode 100644 test/replication/onctl.result create mode 100644 test/replication/onctl.test.lua create mode 100644 test/replication/replica_onctl.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 6b1ae3e80..74f6b5497 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -112,6 +112,7 @@ add_library(box STATIC journal.c wal.c call.c + ctl.c ${lua_sources} lua/init.c lua/call.c diff --git a/src/box/box.cc b/src/box/box.cc index 61bfa117d..b6e5c8c81 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -72,6 +72,7 @@ #include "call.h" #include "func.h" #include "sequence.h" +#include "ctl.h" static char status[64] = "unknown"; @@ -104,6 +105,7 @@ static struct gc_consumer *backup_gc; static bool is_box_configured = false; static bool is_ro = true; static fiber_cond ro_cond; +static bool sys_space_recovered = true; /** * The following flag is set if the instance failed to @@ -190,11 +192,25 @@ process_rw(struct request *request, struct space *space, struct tuple **result) return 0; } +static void +on_ro_cond_change(void) +{ + if (run_on_ctl_event_trigger_type(box_is_ro() ? CTL_EVENT_READ_ONLY: + CTL_EVENT_READ_WRITE) < 0) + say_error("ctl_trigger error in %s: %s", + box_is_ro() ? "read_only" :"read_write", + diag_last_error(diag_get())->errmsg); + fiber_cond_broadcast(&ro_cond); +} + void box_set_ro(bool ro) { + if (is_ro == ro) + return; /* nothing to do */ + is_ro = ro; - fiber_cond_broadcast(&ro_cond); + on_ro_cond_change(); } bool @@ -225,7 +241,7 @@ box_clear_orphan(void) return; /* nothing to do */ is_orphan = false; - fiber_cond_broadcast(&ro_cond); + on_ro_cond_change(); /* Update the title to reflect the new status. */ title("running"); @@ -320,6 +336,15 @@ apply_initial_join_row(struct xstream *stream, struct xrow_header *row) struct request request; xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type)); struct space *space = space_cache_find_xc(request.space_id); + say_info("QQQ recover_snap_row for space %d", space->def->id); + if (space->def->id > BOX_SYSTEM_ID_MAX && !sys_space_recovered) { + sys_space_recovered = true; + if (run_on_ctl_event_trigger_type( + CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space " + "recovery: %s", + diag_last_error(diag_get())->errmsg); + } /* no access checks here - applier always works with admin privs */ space_apply_initial_join_row_xc(space, &request); } @@ -823,6 +848,13 @@ box_set_net_msg_max(void) IPROTO_FIBER_POOL_SIZE_FACTOR); } +void +box_set_on_ctl_event_xc(void) +{ + if (cfg_reset_on_ctl_event() < 0) + diag_raise(); +} + /* }}} configuration bindings */ /** @@ -1565,6 +1597,9 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) void box_free(void) { + if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0) + say_error("ctl_trigger error in shutdown: %s", + diag_last_error(diag_get())->errmsg); /* * See gh-584 "box_free() is called even if box is not * initialized @@ -1683,8 +1718,17 @@ bootstrap_from_master(struct replica *master) * Process initial data (snapshot or dirty disk data). */ engine_begin_initial_recovery_xc(NULL); + set_sys_space_recovered(false); applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY); - + /** + * It Could be only system spaces, so check is_sys_space_recovered() + * and set trigger if it still false. + */ + if (!is_sys_space_recovered() && + run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); + set_sys_space_recovered(true); /* * Process final data (WALs). */ @@ -1751,7 +1795,6 @@ void box_init(void) { fiber_cond_create(&ro_cond); - user_cache_init(); /* * The order is important: to initialize sessions, @@ -1808,6 +1851,7 @@ box_cfg_xc(void) box_set_replication_connect_timeout(); box_set_replication_connect_quorum(); box_set_replication_skip_conflict(); + box_set_on_ctl_event_xc(); replication_sync_lag = box_check_replication_sync_lag(); xstream_create(&join_stream, apply_initial_join_row); xstream_create(&subscribe_stream, apply_row); @@ -1874,9 +1918,18 @@ box_cfg_xc(void) * recovery of system spaces issue DDL events in * other engines. */ + sys_space_recovered = false; memtx_engine_recover_snapshot_xc(memtx, &last_checkpoint_vclock); - + /** + * It Could be only system spaces, so check sys_space_recovered + * and set trigger if it still false. + */ + if (!sys_space_recovered && + run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); + sys_space_recovered = true; engine_begin_final_recovery_xc(); recovery_follow_local(recovery, &wal_stream.base, "hot_standby", cfg_getd("wal_dir_rescan_delay")); diff --git a/src/box/box.h b/src/box/box.h index d3967891d..086030191 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -191,6 +191,7 @@ void box_set_replication_connect_timeout(void); void box_set_replication_connect_quorum(void); void box_set_replication_skip_conflict(void); void box_set_net_msg_max(void); +void box_set_on_ctl_event_xc(void); extern "C" { #endif /* defined(__cplusplus) */ diff --git a/src/box/ctl.c b/src/box/ctl.c new file mode 100644 index 000000000..fc3ae37f7 --- /dev/null +++ b/src/box/ctl.c @@ -0,0 +1,63 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include <lib/small/small/rlist.h> +#include <trigger.h> +#include <box/ctl.h> +#include "errcode.h" +#include "error.h" +#include <exception.h> + +RLIST_HEAD(on_ctl_event); + +int +run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result) { + return trigger_run(&on_ctl_event, (void *) result); +} + +int +run_on_ctl_event_trigger_type(enum ctl_event_type type) +{ + struct on_ctl_event_ctx ctx = {}; + ctx.type = type; + return run_on_ctl_event_triggers(&ctx); +} + +int +cfg_reset_on_ctl_event() +{ + if (cfg_reset_trigger("on_ctl_event", &on_ctl_event, + lbox_push_on_ctl_event, NULL) < 0) { + diag_set(ClientError, ER_CFG, "on_ctl_event", + "expected function or table"); + return -1; + } + return 0; +} \ No newline at end of file diff --git a/src/box/ctl.h b/src/box/ctl.h new file mode 100644 index 000000000..6fae5e48d --- /dev/null +++ b/src/box/ctl.h @@ -0,0 +1,79 @@ +#ifndef INCLUDES_TARANTOOL_CTL_H +#define INCLUDES_TARANTOOL_CTL_H + +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <cfg.h> +#include <box/lua/ctl.h> + +/** Global on-ctl_event triggers. */ +extern struct rlist on_ctl_event; + +enum ctl_event_type { + CTL_EVENT_SYSTEM_SPACE_RECOVERY, + CTL_EVENT_LOCAL_RECOVERY, + CTL_EVENT_READ_ONLY, + CTL_EVENT_READ_WRITE, + CTL_EVENT_SHUTDOWN, + CTL_EVENT_REPLICASET_ADD, + CTL_EVENT_REPLICASET_REMOVE, + CTL_EVENT_REPLICA_CONNECTION_ERROR, +}; + +struct on_ctl_event_ctx { + enum ctl_event_type type; + uint32_t replica_id; +}; + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/** + * Runs on_ctl_event triggers with specified context. + */ +int +run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result); + +/** + * Runs on_ctl_event triggers with specified type. + */ +int +run_on_ctl_event_trigger_type(enum ctl_event_type type); + +int +cfg_reset_on_ctl_event(); +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* INCLUDES_TARANTOOL_LUA_CTL_H */ diff --git a/src/box/engine.c b/src/box/engine.c index 82293fd18..7f91fa2e5 100644 --- a/src/box/engine.c +++ b/src/box/engine.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "engine.h" +#include "ctl.h" #include <stdint.h> #include <string.h> @@ -73,6 +74,14 @@ engine_bootstrap(void) if (engine->vtab->bootstrap(engine) != 0) return -1; } + if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space recovery: %s", + diag_last_error(diag_get())->errmsg); + + if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) + say_error("ctl_trigger error in local recovery: %s", + diag_last_error(diag_get())->errmsg); + return 0; } @@ -111,6 +120,10 @@ engine_end_recovery(void) if (engine->vtab->end_recovery(engine) != 0) return -1; } + if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) + say_error("ctl_trigger error in local recovery: %s", + diag_last_error(diag_get())->errmsg); + return 0; } diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 5afebc94e..a077b663e 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -253,6 +253,17 @@ lbox_cfg_set_net_msg_max(struct lua_State *L) return 0; } +static int +lbox_cfg_set_on_ctl_event(struct lua_State *L) +{ + try { + box_set_on_ctl_event_xc(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_worker_pool_threads(struct lua_State *L) { @@ -331,6 +342,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, {"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout}, {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, + {"cfg_set_on_ctl_event", lbox_cfg_set_on_ctl_event}, {NULL, NULL} }; diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index 9a105ed5c..694d197e1 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -35,6 +35,8 @@ #include <lua.h> #include <lauxlib.h> #include <lualib.h> +#include <lua/trigger.h> +#include <box/ctl.h> #include "lua/utils.h" @@ -64,9 +66,38 @@ lbox_ctl_wait_rw(struct lua_State *L) return 0; } + +int +lbox_push_on_ctl_event(struct lua_State *L, void *event) +{ + struct on_ctl_event_ctx *ctx = (struct on_ctl_event_ctx *) event; + lua_newtable(L); + lua_pushstring(L, "type"); + lua_pushinteger(L, ctx->type); + lua_settable(L, -3); + + if (ctx->type == CTL_EVENT_REPLICASET_ADD || + ctx->type == CTL_EVENT_REPLICASET_REMOVE || + ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) { + lua_pushstring(L, "replica_id"); + luaL_pushuint64(L, ctx->replica_id); + lua_settable(L, -3); + } + return 1; +} + +static int +lbox_on_ctl_event(struct lua_State *L) +{ + return lbox_trigger_reset(L, 2, &on_ctl_event, + lbox_push_on_ctl_event, NULL); +} + + static const struct luaL_Reg lbox_ctl_lib[] = { {"wait_ro", lbox_ctl_wait_ro}, {"wait_rw", lbox_ctl_wait_rw}, + {"on_ctl_event", lbox_on_ctl_event}, {NULL, NULL} }; @@ -75,4 +106,27 @@ box_lua_ctl_init(struct lua_State *L) { luaL_register_module(L, "box.ctl", lbox_ctl_lib); lua_pop(L, 1); + + luaL_findtable(L, LUA_GLOBALSINDEX, "box.ctl", 1); + lua_newtable(L); + lua_setfield(L, -2, "event"); + lua_getfield(L, -1, "event"); + + lua_pushnumber(L, CTL_EVENT_SYSTEM_SPACE_RECOVERY); + lua_setfield(L, -2, "SYSTEM_SPACE_RECOVERY"); + lua_pushnumber(L, CTL_EVENT_LOCAL_RECOVERY); + lua_setfield(L, -2, "LOCAL_RECOVERY"); + lua_pushnumber(L, CTL_EVENT_READ_ONLY); + lua_setfield(L, -2, "READ_ONLY"); + lua_pushnumber(L, CTL_EVENT_READ_WRITE); + lua_setfield(L, -2, "READ_WRITE"); + lua_pushnumber(L, CTL_EVENT_SHUTDOWN); + lua_setfield(L, -2, "SHUTDOWN"); + lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD); + lua_setfield(L, -2, "REPLICASET_ADD"); + lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE); + lua_setfield(L, -2, "REPLICASET_REMOVE"); + lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR); + lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR"); + lua_pop(L, 2); /* box, ctl */ } diff --git a/src/box/lua/ctl.h b/src/box/lua/ctl.h index e7c2edd15..ab63232dd 100644 --- a/src/box/lua/ctl.h +++ b/src/box/lua/ctl.h @@ -41,6 +41,8 @@ struct lua_State; void box_lua_ctl_init(struct lua_State *L); +int +lbox_push_on_ctl_event(struct lua_State *L, void *event); #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 0b668cdc6..035615f4b 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -1,7 +1,11 @@ -- load_cfg.lua - internal file local log = require('log') -local json = require('json') +local json = require("json").new() +json.cfg{ + encode_use_tostring = true, +} + local private = require('box.internal') local urilib = require('uri') local math = require('math') @@ -64,6 +68,7 @@ local default_cfg = { feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, net_msg_max = 768, + on_ctl_event = nil, } -- types of available options @@ -125,6 +130,7 @@ local template_cfg = { feedback_host = 'string', feedback_interval = 'number', net_msg_max = 'number', + on_ctl_event = 'function', } local function normalize_uri(port) @@ -216,6 +222,7 @@ local dynamic_cfg = { replicaset_uuid = check_replicaset_uuid, replication_skip_conflict = private.cfg_set_replication_skip_conflict, net_msg_max = private.cfg_set_net_msg_max, + on_ctl_event = private.cfg_set_on_ctl_event, } local dynamic_cfg_skip_at_load = { @@ -232,6 +239,7 @@ local dynamic_cfg_skip_at_load = { force_recovery = true, instance_uuid = true, replicaset_uuid = true, + on_ctl_event = true, } local function convert_gb(size) diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index fac84ce11..93a8458c5 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -48,6 +48,21 @@ #include "replication.h" #include "schema.h" #include "gc.h" +#include "ctl.h" + +static bool sys_space_recovered = true; + +bool +is_sys_space_recovered(void) +{ + return sys_space_recovered; +} + +void +set_sys_space_recovered(bool val) +{ + sys_space_recovered = val; +} static void txn_on_yield_or_stop(struct trigger *trigger, void *event) @@ -197,7 +212,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, */ if (!xlog_cursor_is_eof(&cursor)) panic("snapshot `%s' has no EOF marker", filename); - return 0; } @@ -218,6 +232,17 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx, struct space *space = space_cache_find(request.space_id); if (space == NULL) return -1; + + say_info("QQQ recover_snap_row for space %d", space->def->id); + if (space->def->id > BOX_SYSTEM_ID_MAX && !sys_space_recovered) { + sys_space_recovered = true; + if (run_on_ctl_event_trigger_type( + CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) + say_error("ctl_trigger error in system space " + "recovery: %s", + diag_last_error(diag_get())->errmsg); + } + /* memtx snapshot must contain only memtx spaces */ if (space->engine != (struct engine *)memtx) { diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION); diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h index 0f8e92ee4..ef9cb10a6 100644 --- a/src/box/memtx_engine.h +++ b/src/box/memtx_engine.h @@ -171,6 +171,15 @@ struct memtx_gc_task { const struct memtx_gc_task_vtab *vtab; }; +/** + * Get and set methods for sys_space_recovered variable, needed + * for completion of recovery of all system spaces trigger. + */ +bool +is_sys_space_recovered(void); +void +set_sys_space_recovered(bool val); + /** * Schedule a garbage collection task for execution. */ diff --git a/src/box/relay.cc b/src/box/relay.cc index a25cc540b..d535f830b 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -52,6 +52,7 @@ #include "xrow_io.h" #include "xstream.h" #include "wal.h" +#include "ctl.h" /** * Cbus message to send status updates from relay to tx thread. @@ -548,6 +549,12 @@ relay_subscribe_f(va_list ap) if (!diag_is_empty(&relay->diag)) { /* An error has occurred while reading ACKs of xlog. */ diag_move(&relay->diag, diag_get()); + struct on_ctl_event_ctx ctx; + ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR; + ctx.replica_id = relay->replica->id; + if (run_on_ctl_event_triggers(&ctx) < 0) + say_error("ctl_trigger error in replica error: %s", + diag_last_error(diag_get())->errmsg); /* Reference the diag in the status. */ diag_add_error(&relay->diag, diag_last_error(diag_get())); } diff --git a/src/box/replication.cc b/src/box/replication.cc index c1e176984..75aecd03f 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -41,6 +41,7 @@ #include "error.h" #include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ +#include "ctl.h" uint32_t instance_id = REPLICA_ID_NIL; struct tt_uuid INSTANCE_UUID; @@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) replica->uuid = *replica_uuid; replica_hash_insert(&replicaset.hash, replica); replica_set_id(replica, replica_id); + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD; + on_ctl_ctx.replica_id = replica_id; + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica add: %s", + diag_last_error(diag_get())->errmsg); return replica; } @@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica) * Some records may arrive later on due to asynchronous nature of * replication. */ + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE; + on_ctl_ctx.replica_id = replica->id; + replicaset.replica_by_id[replica->id] = NULL; replica->id = REPLICA_ID_NIL; if (replica_is_orphan(replica)) { replica_hash_remove(&replicaset.hash, replica); replica_delete(replica); } + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica remove: %s", + diag_last_error(diag_get())->errmsg); } static void diff --git a/src/cfg.c b/src/cfg.c index 7c7d6e793..9dfcae162 100644 --- a/src/cfg.c +++ b/src/cfg.c @@ -153,3 +153,20 @@ cfg_getarr_elem(const char *name, int i) lua_pop(tarantool_L, 2); return val; } + +int +cfg_reset_trigger(const char *name, struct rlist *list, + lbox_push_event_f push_event, lbox_pop_event_f pop_event) +{ + cfg_get(name); + struct lua_State *L = tarantool_L; + if (lua_isnil(L, -1)) + return 0; + if (!lua_isfunction(L, -1)) + return -1; + lua_pushnil(L); + int rc = lbox_trigger_reset(L, lua_gettop(L), list, + push_event, pop_event); + lua_pop(L, 1); + return rc; +} diff --git a/src/cfg.h b/src/cfg.h index 8499388b8..d36465c49 100644 --- a/src/cfg.h +++ b/src/cfg.h @@ -32,6 +32,7 @@ */ #include <stdint.h> +#include <lua/trigger.h> #if defined(__cplusplus) extern "C" { @@ -61,6 +62,9 @@ cfg_getarr_size(const char *name); const char * cfg_getarr_elem(const char *name, int i); +int +cfg_reset_trigger(const char *name, struct rlist *list, + lbox_push_event_f push_event, lbox_pop_event_f pop_event); #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua new file mode 100644 index 000000000..355e79186 --- /dev/null +++ b/test/replication/master_onctl.lua @@ -0,0 +1,37 @@ +#!/usr/bin/env tarantool +os = require('os') + +SYSTEM_SPACE_RECOVERY = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} +REPLICA_CONNECTION_ERROR = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then + SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then + table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/onctl.result b/test/replication/onctl.result new file mode 100644 index 000000000..d9de479fe --- /dev/null +++ b/test/replication/onctl.result @@ -0,0 +1,259 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +test_run:cmd("create server master with script='replication/master_onctl.lua'") +--- +- true +... +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") +--- +- true +... +test_run:cmd("start server master") +--- +- true +... +test_run:cmd("switch master") +--- +- true +... +box.schema.user.grant('guest', 'replication') +--- +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +--- +- - 1 + - 1 +... +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE +--- +- - 1 +... +REPLICA_CONNECTION_ERROR +--- +- [] +... +REPLICASET_ADD = {} +--- +... +REPLICASET_REMOVE = {} +--- +... +new_replica_id = 0 +--- +... +deleted_replica_id = 0 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +_ = box.ctl.on_ctl_event(on_ctl_new) +--- +... +test_run:cmd("start server replica") +--- +- true +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +REPLICA_CONNECTION_ERROR +--- +- [] +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 0 +... +test_run:cmd("switch replica") +--- +- true +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; +--- +... +function on_ctl_error(ctx) + error("trigger error") +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +box.cfg{read_only = true} +--- +... +fiber = require("fiber") +--- +... +while READ_ONLY == 0 do fiber.sleep(0.001) end +--- +... +READ_ONLY +--- +- 2 +... +box.cfg{on_ctl_event = on_ctl_error} +--- +... +box.cfg{read_only = false} +--- +... +test_run:grep_log('replica', 'ctl_trigger error') +--- +- ctl_trigger error +... +box.cfg{on_ctl_event = on_ctl_shutdown} +--- +... +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica. +-- test_run:grep_log('replica', 'test replica shutdown', 10000, true) +test_run:cmd("switch master") +--- +- true +... +REPLICA_CONNECTION_ERROR +--- +- - 2 +... +box.schema.user.revoke('guest', 'replication') +--- +... +_ = box.space._cluster:delete{2} +--- +... +SYSTEM_SPACE_RECOVERY +--- +- 1 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- - 2 +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 2 +... +box.ctl.on_ctl_event(nil, on_ctl_new) +--- +... +-- cleanup +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server master") +--- +- true +... +test_run:cmd("cleanup server master") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua new file mode 100644 index 000000000..9518686ae --- /dev/null +++ b/test/replication/onctl.test.lua @@ -0,0 +1,107 @@ +env = require('test_run') +test_run = env.new() + +test_run:cmd("create server master with script='replication/master_onctl.lua'") +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") + +test_run:cmd("start server master") +test_run:cmd("switch master") +box.schema.user.grant('guest', 'replication') + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE +REPLICA_CONNECTION_ERROR + +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +new_replica_id = 0 +deleted_replica_id = 0 + +test_run:cmd("setopt delimiter ';'") +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +test_run:cmd("setopt delimiter ''"); + +_ = box.ctl.on_ctl_event(on_ctl_new) + +test_run:cmd("start server replica") + +REPLICASET_ADD +REPLICASET_REMOVE +REPLICA_CONNECTION_ERROR + +new_replica_id +deleted_replica_id + +test_run:cmd("switch replica") + +test_run:cmd("setopt delimiter ';'") +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; + +function on_ctl_error(ctx) + error("trigger error") +end; + +test_run:cmd("setopt delimiter ''"); + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +box.cfg{read_only = true} +fiber = require("fiber") +while READ_ONLY == 0 do fiber.sleep(0.001) end +READ_ONLY + +box.cfg{on_ctl_event = on_ctl_error} +box.cfg{read_only = false} +test_run:grep_log('replica', 'ctl_trigger error') +box.cfg{on_ctl_event = on_ctl_shutdown} + +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica. +-- test_run:grep_log('replica', 'test replica shutdown', 10000, true) + +test_run:cmd("switch master") +REPLICA_CONNECTION_ERROR + +box.schema.user.revoke('guest', 'replication') +_ = box.space._cluster:delete{2} + +SYSTEM_SPACE_RECOVERY +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +new_replica_id +deleted_replica_id + +box.ctl.on_ctl_event(nil, on_ctl_new) + +-- cleanup +test_run:cmd("switch default") +test_run:cmd("stop server master") +test_run:cmd("cleanup server master") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua new file mode 100644 index 000000000..d6ce73c82 --- /dev/null +++ b/test/replication/replica_onctl.lua @@ -0,0 +1,34 @@ +#!/usr/bin/env tarantool + +SYSTEM_SPACE_RECOVERY = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then + SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 95e94e5a2..365a82512 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -6,6 +6,7 @@ "wal_off.test.lua": {}, "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, + "onctl.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.14.3 (Apple Git-98)
reply other threads:[~2018-08-03 8:04 UTC|newest] Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20180803080410.14087-1-k.belyavskiy@tarantool.org \ --to=k.belyavskiy@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [tarantool-patches] [PATCH v2] box.ctl: implement a trigger on any control events' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox