From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 2064128C88 for ; Thu, 9 Aug 2018 04:38:14 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id NoDc3fud62pM for ; Thu, 9 Aug 2018 04:38:14 -0400 (EDT) Received: from smtp51.i.mail.ru (smtp51.i.mail.ru [94.100.177.111]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 33C3C28C66 for ; Thu, 9 Aug 2018 04:38:13 -0400 (EDT) Received: by smtp51.i.mail.ru with esmtpa (envelope-from ) id 1fngSQ-00027k-Ic for tarantool-patches@freelists.org; Thu, 09 Aug 2018 11:38:10 +0300 From: Konstantin Belyavskiy Subject: [tarantool-patches] [PATCH v3] box.ctl: implement a trigger on any control events Date: Thu, 9 Aug 2018 11:38:07 +0300 Message-Id: <20180809083807.40096-1-k.belyavskiy@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org This patch is based on original Ilya Markov's patchset (gh-3159-box-on-ctl-event) Supported control events: * Creation of system space. * Completion of local recovery. Called on finish of bootstrap or finish of recovery. * Switch to read only/read write mode. * Replicaset add/remove. Called on changes replica set. * Controlled shutdown. * Replica error event. Replica fails with some error. Errors inside triggers are logged and don't influence on instance behaviour. All supported control events are exported to Lua as a constants within namespace box.ctl.event, here is a full list: - SYSTEM_SPACE_CREATE - LOCAL_RECOVERY - READ_ONLY - READ_WRITE - SHUTDOWN - REPLICASET_ADD - REPLICASET_REMOVE - REPLICA_CONNECTION_ERROR Introduce new configuration option: 'on_ctl_event' with type function and one one argument - context. For every events context has required field 'type', one of constants mentioned above. For REPLICASET_ADD, REPLICASET_REMOVE and REPLICA_CONNECTION_ERROR an additional field 'replica_id' is also available. Usage example. In Lua script a callback function with actions on specific controlled events must be defined, e.g: local function onctl(ctx) if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then -- specific action #1 elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then -- specific action #2 ... end end Then this function passed as a parameter to box.cfg: on_ctl_event = onctl Closes #3159 Changes in V2: Squashed 3 to 1 Small fixes: remove unused headers, rename function. Design changes: - remove fiber for ro condition checks. - remove support for 'table' type callback. - insert completion of recovery of all system spaces trigger right after sys spaces was recovered. - add documentation. Changes in V3: - instead of all system spaces recovery, set trigger on each system space creation. --- Ticket: https://github.com/tarantool/tarantool/issues/3159 Branch: kbelyavs/gh-3159-box-on-ctl-event src/box/CMakeLists.txt | 1 + src/box/alter.cc | 5 + src/box/box.cc | 26 +++- src/box/box.h | 1 + src/box/ctl.c | 76 +++++++++++ src/box/ctl.h | 81 ++++++++++++ src/box/engine.c | 3 + src/box/lua/cfg.cc | 12 ++ src/box/lua/ctl.c | 54 ++++++++ src/box/lua/ctl.h | 2 + src/box/lua/load_cfg.lua | 10 +- src/box/memtx_engine.c | 1 + src/box/relay.cc | 7 + src/box/replication.cc | 14 ++ src/cfg.c | 17 +++ src/cfg.h | 4 + test/replication/master_onctl.lua | 37 ++++++ test/replication/onctl.result | 259 +++++++++++++++++++++++++++++++++++++ test/replication/onctl.test.lua | 107 +++++++++++++++ test/replication/replica_onctl.lua | 34 +++++ test/replication/suite.cfg | 1 + 21 files changed, 748 insertions(+), 4 deletions(-) create mode 100644 src/box/ctl.c create mode 100644 src/box/ctl.h create mode 100644 test/replication/master_onctl.lua create mode 100644 test/replication/onctl.result create mode 100644 test/replication/onctl.test.lua create mode 100644 test/replication/replica_onctl.lua diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index ad544270b..49e387fde 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -112,6 +112,7 @@ add_library(box STATIC journal.c wal.c call.c + ctl.c ${lua_sources} lua/init.c lua/call.c diff --git a/src/box/alter.cc b/src/box/alter.cc index 3007a131d..d955bfc03 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -52,6 +52,7 @@ #include "identifier.h" #include "version.h" #include "sequence.h" +#include "ctl.h" /** * chap-sha1 of empty string, i.e. @@ -1622,6 +1623,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) struct trigger *on_rollback = txn_alter_trigger_new(on_create_space_rollback, space); txn_on_rollback(txn, on_rollback); + if (space->def->id < BOX_SYSTEM_ID_MAX) + on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE); } else if (new_tuple == NULL) { /* DELETE */ access_check_ddl(old_space->def->name, old_space->def->uid, SC_SPACE, PRIV_D, true); @@ -1719,6 +1722,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) (void) new UpdateSchemaVersion(alter); alter_space_do(txn, alter); alter_guard.is_active = false; + if (alter->new_space->def->id < BOX_SYSTEM_ID_MAX) + on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE); } } diff --git a/src/box/box.cc b/src/box/box.cc index ee12d5738..9d195322c 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -73,6 +73,7 @@ #include "call.h" #include "func.h" #include "sequence.h" +#include "ctl.h" static char status[64] = "unknown"; @@ -209,11 +210,22 @@ process_nop(struct request *request) return txn_commit_stmt(txn, request); } +static void +on_ro_cond_change(void) +{ + on_ctl_event_type(box_is_ro() ? CTL_EVENT_READ_ONLY: + CTL_EVENT_READ_WRITE); + fiber_cond_broadcast(&ro_cond); +} + void box_set_ro(bool ro) { + if (is_ro == ro) + return; /* nothing to do */ + is_ro = ro; - fiber_cond_broadcast(&ro_cond); + on_ro_cond_change(); } bool @@ -244,7 +256,7 @@ box_clear_orphan(void) return; /* nothing to do */ is_orphan = false; - fiber_cond_broadcast(&ro_cond); + on_ro_cond_change(); /* Update the title to reflect the new status. */ title("running"); @@ -840,6 +852,13 @@ box_set_net_msg_max(void) IPROTO_FIBER_POOL_SIZE_FACTOR); } +void +box_set_on_ctl_event(void) +{ + if (cfg_reset_on_ctl_event() < 0) + diag_raise(); +} + /* }}} configuration bindings */ /** @@ -1592,6 +1611,7 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) void box_free(void) { + on_ctl_event_type(CTL_EVENT_SHUTDOWN); /* * See gh-584 "box_free() is called even if box is not * initialized @@ -1932,7 +1952,6 @@ void box_init(void) { fiber_cond_create(&ro_cond); - user_cache_init(); /* * The order is important: to initialize sessions, @@ -1989,6 +2008,7 @@ box_cfg_xc(void) box_set_replication_connect_timeout(); box_set_replication_connect_quorum(); box_set_replication_skip_conflict(); + box_set_on_ctl_event(); replication_sync_lag = box_check_replication_sync_lag(); xstream_create(&join_stream, apply_initial_join_row); xstream_create(&subscribe_stream, apply_row); diff --git a/src/box/box.h b/src/box/box.h index e2e06d977..3e55ee6c4 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -193,6 +193,7 @@ void box_set_replication_connect_timeout(void); void box_set_replication_connect_quorum(void); void box_set_replication_skip_conflict(void); void box_set_net_msg_max(void); +void box_set_on_ctl_event(void); extern "C" { #endif /* defined(__cplusplus) */ diff --git a/src/box/ctl.c b/src/box/ctl.c new file mode 100644 index 000000000..d3a2077ad --- /dev/null +++ b/src/box/ctl.c @@ -0,0 +1,76 @@ +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include +#include +#include +#include "errcode.h" +#include "error.h" +#include + +RLIST_HEAD(on_ctl_event); + +const char* ctltype2str[CTL_LAST_POS_GUARD] = { + "system space create", // CTL_EVENT_SYSTEM_SPACE_CREATE + "local recovery", // CTL_EVENT_LOCAL_RECOVERY + "read only", // CTL_EVENT_READ_ONLY + "read write", // CTL_EVENT_READ_WRITE + "shutdown", // CTL_EVENT_SHUTDOWN + "replicaset add", // CTL_EVENT_REPLICASET_ADD + "replicaset remove", // CTL_EVENT_REPLICASET_REMOVE + "replica connect error", // CTL_EVENT_REPLICA_CONNECTION_ERROR +}; + +int +run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result) { + return trigger_run(&on_ctl_event, (void *) result); +} + +void +on_ctl_event_type(enum ctl_event_type type) +{ + struct on_ctl_event_ctx ctx = {}; + ctx.type = type; + if (run_on_ctl_event_triggers(&ctx) < 0) + say_error("ctl_trigger error in %s: %s", ctltype2str[type], + diag_last_error(diag_get())->errmsg); +} + +int +cfg_reset_on_ctl_event() +{ + if (cfg_reset_trigger("on_ctl_event", &on_ctl_event, + lbox_push_on_ctl_event, NULL) < 0) { + diag_set(ClientError, ER_CFG, "on_ctl_event", + "expected function or table"); + return -1; + } + return 0; +} diff --git a/src/box/ctl.h b/src/box/ctl.h new file mode 100644 index 000000000..751ba0149 --- /dev/null +++ b/src/box/ctl.h @@ -0,0 +1,81 @@ +#ifndef INCLUDES_TARANTOOL_CTL_H +#define INCLUDES_TARANTOOL_CTL_H + +/* + * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include +#include + +/** Global on-ctl_event triggers. */ +extern struct rlist on_ctl_event; + +enum ctl_event_type { + CTL_EVENT_SYSTEM_SPACE_CREATE, + CTL_EVENT_LOCAL_RECOVERY, + CTL_EVENT_READ_ONLY, + CTL_EVENT_READ_WRITE, + CTL_EVENT_SHUTDOWN, + CTL_EVENT_REPLICASET_ADD, + CTL_EVENT_REPLICASET_REMOVE, + CTL_EVENT_REPLICA_CONNECTION_ERROR, + CTL_LAST_POS_GUARD, +}; + +struct on_ctl_event_ctx { + enum ctl_event_type type; + uint32_t replica_id; +}; + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +/** + * Runs on_ctl_event triggers with specified context. + */ +int +run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result); + +/** + * Runs on_ctl_event trigger with specified type and + * log error if any. + */ +void +on_ctl_event_type(enum ctl_event_type type); + +int +cfg_reset_on_ctl_event(); +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* INCLUDES_TARANTOOL_LUA_CTL_H */ diff --git a/src/box/engine.c b/src/box/engine.c index 2a30dcddd..7f5082b69 100644 --- a/src/box/engine.c +++ b/src/box/engine.c @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "engine.h" +#include "ctl.h" #include #include @@ -73,6 +74,7 @@ engine_bootstrap(void) if (engine->vtab->bootstrap(engine) != 0) return -1; } + on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY); return 0; } @@ -111,6 +113,7 @@ engine_end_recovery(void) if (engine->vtab->end_recovery(engine) != 0) return -1; } + on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY); return 0; } diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index 0f6b8a5a3..baf4a99d5 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -252,6 +252,17 @@ lbox_cfg_set_net_msg_max(struct lua_State *L) return 0; } +static int +lbox_cfg_set_on_ctl_event(struct lua_State *L) +{ + try { + box_set_on_ctl_event(); + } catch (Exception *) { + luaT_error(L); + } + return 0; +} + static int lbox_cfg_set_worker_pool_threads(struct lua_State *L) { @@ -330,6 +341,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict}, {"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout}, {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, + {"cfg_set_on_ctl_event", lbox_cfg_set_on_ctl_event}, {NULL, NULL} }; diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c index 9a105ed5c..5b78875d5 100644 --- a/src/box/lua/ctl.c +++ b/src/box/lua/ctl.c @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include "lua/utils.h" @@ -64,9 +66,38 @@ lbox_ctl_wait_rw(struct lua_State *L) return 0; } + +int +lbox_push_on_ctl_event(struct lua_State *L, void *event) +{ + struct on_ctl_event_ctx *ctx = (struct on_ctl_event_ctx *) event; + lua_newtable(L); + lua_pushstring(L, "type"); + lua_pushinteger(L, ctx->type); + lua_settable(L, -3); + + if (ctx->type == CTL_EVENT_REPLICASET_ADD || + ctx->type == CTL_EVENT_REPLICASET_REMOVE || + ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) { + lua_pushstring(L, "replica_id"); + luaL_pushuint64(L, ctx->replica_id); + lua_settable(L, -3); + } + return 1; +} + +static int +lbox_on_ctl_event(struct lua_State *L) +{ + return lbox_trigger_reset(L, 2, &on_ctl_event, + lbox_push_on_ctl_event, NULL); +} + + static const struct luaL_Reg lbox_ctl_lib[] = { {"wait_ro", lbox_ctl_wait_ro}, {"wait_rw", lbox_ctl_wait_rw}, + {"on_ctl_event", lbox_on_ctl_event}, {NULL, NULL} }; @@ -75,4 +106,27 @@ box_lua_ctl_init(struct lua_State *L) { luaL_register_module(L, "box.ctl", lbox_ctl_lib); lua_pop(L, 1); + + luaL_findtable(L, LUA_GLOBALSINDEX, "box.ctl", 1); + lua_newtable(L); + lua_setfield(L, -2, "event"); + lua_getfield(L, -1, "event"); + + lua_pushnumber(L, CTL_EVENT_SYSTEM_SPACE_CREATE); + lua_setfield(L, -2, "SYSTEM_SPACE_CREATE"); + lua_pushnumber(L, CTL_EVENT_LOCAL_RECOVERY); + lua_setfield(L, -2, "LOCAL_RECOVERY"); + lua_pushnumber(L, CTL_EVENT_READ_ONLY); + lua_setfield(L, -2, "READ_ONLY"); + lua_pushnumber(L, CTL_EVENT_READ_WRITE); + lua_setfield(L, -2, "READ_WRITE"); + lua_pushnumber(L, CTL_EVENT_SHUTDOWN); + lua_setfield(L, -2, "SHUTDOWN"); + lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD); + lua_setfield(L, -2, "REPLICASET_ADD"); + lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE); + lua_setfield(L, -2, "REPLICASET_REMOVE"); + lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR); + lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR"); + lua_pop(L, 2); /* box, ctl */ } diff --git a/src/box/lua/ctl.h b/src/box/lua/ctl.h index e7c2edd15..ab63232dd 100644 --- a/src/box/lua/ctl.h +++ b/src/box/lua/ctl.h @@ -41,6 +41,8 @@ struct lua_State; void box_lua_ctl_init(struct lua_State *L); +int +lbox_push_on_ctl_event(struct lua_State *L, void *event); #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 0b668cdc6..035615f4b 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -1,7 +1,11 @@ -- load_cfg.lua - internal file local log = require('log') -local json = require('json') +local json = require("json").new() +json.cfg{ + encode_use_tostring = true, +} + local private = require('box.internal') local urilib = require('uri') local math = require('math') @@ -64,6 +68,7 @@ local default_cfg = { feedback_host = "https://feedback.tarantool.io", feedback_interval = 3600, net_msg_max = 768, + on_ctl_event = nil, } -- types of available options @@ -125,6 +130,7 @@ local template_cfg = { feedback_host = 'string', feedback_interval = 'number', net_msg_max = 'number', + on_ctl_event = 'function', } local function normalize_uri(port) @@ -216,6 +222,7 @@ local dynamic_cfg = { replicaset_uuid = check_replicaset_uuid, replication_skip_conflict = private.cfg_set_replication_skip_conflict, net_msg_max = private.cfg_set_net_msg_max, + on_ctl_event = private.cfg_set_on_ctl_event, } local dynamic_cfg_skip_at_load = { @@ -232,6 +239,7 @@ local dynamic_cfg_skip_at_load = { force_recovery = true, instance_uuid = true, replicaset_uuid = true, + on_ctl_event = true, } local function convert_gb(size) diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index f5ace9268..400fed95b 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -48,6 +48,7 @@ #include "replication.h" #include "schema.h" #include "gc.h" +#include "ctl.h" /* * Memtx yield-in-transaction trigger: roll back the effects diff --git a/src/box/relay.cc b/src/box/relay.cc index 05468f203..19b3f3b7a 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -52,6 +52,7 @@ #include "xrow_io.h" #include "xstream.h" #include "wal.h" +#include "ctl.h" /** * Cbus message to send status updates from relay to tx thread. @@ -551,6 +552,12 @@ relay_subscribe_f(va_list ap) if (!diag_is_empty(&relay->diag)) { /* An error has occurred while reading ACKs of xlog. */ diag_move(&relay->diag, diag_get()); + struct on_ctl_event_ctx ctx; + ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR; + ctx.replica_id = relay->replica->id; + if (run_on_ctl_event_triggers(&ctx) < 0) + say_error("ctl_trigger error in replica error: %s", + diag_last_error(diag_get())->errmsg); /* Reference the diag in the status. */ diag_add_error(&relay->diag, diag_last_error(diag_get())); } diff --git a/src/box/replication.cc b/src/box/replication.cc index 26bbbe32a..4ee8ae5a3 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -42,6 +42,7 @@ #include "relay.h" #include "vclock.h" /* VCLOCK_MAX */ #include "sio.h" +#include "ctl.h" uint32_t instance_id = REPLICA_ID_NIL; struct tt_uuid INSTANCE_UUID; @@ -173,6 +174,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) replica->uuid = *replica_uuid; replica_hash_insert(&replicaset.hash, replica); replica_set_id(replica, replica_id); + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD; + on_ctl_ctx.replica_id = replica_id; + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica add: %s", + diag_last_error(diag_get())->errmsg); return replica; } @@ -204,6 +211,10 @@ replica_clear_id(struct replica *replica) * Some records may arrive later on due to asynchronous nature of * replication. */ + struct on_ctl_event_ctx on_ctl_ctx; + on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE; + on_ctl_ctx.replica_id = replica->id; + replicaset.replica_by_id[replica->id] = NULL; replica->id = REPLICA_ID_NIL; /* @@ -223,6 +234,9 @@ replica_clear_id(struct replica *replica) replica_hash_remove(&replicaset.hash, replica); replica_delete(replica); } + if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) + say_error("ctl_trigger error in replica remove: %s", + diag_last_error(diag_get())->errmsg); } static void diff --git a/src/cfg.c b/src/cfg.c index 7c7d6e793..9dfcae162 100644 --- a/src/cfg.c +++ b/src/cfg.c @@ -153,3 +153,20 @@ cfg_getarr_elem(const char *name, int i) lua_pop(tarantool_L, 2); return val; } + +int +cfg_reset_trigger(const char *name, struct rlist *list, + lbox_push_event_f push_event, lbox_pop_event_f pop_event) +{ + cfg_get(name); + struct lua_State *L = tarantool_L; + if (lua_isnil(L, -1)) + return 0; + if (!lua_isfunction(L, -1)) + return -1; + lua_pushnil(L); + int rc = lbox_trigger_reset(L, lua_gettop(L), list, + push_event, pop_event); + lua_pop(L, 1); + return rc; +} diff --git a/src/cfg.h b/src/cfg.h index 8499388b8..d36465c49 100644 --- a/src/cfg.h +++ b/src/cfg.h @@ -32,6 +32,7 @@ */ #include +#include #if defined(__cplusplus) extern "C" { @@ -61,6 +62,9 @@ cfg_getarr_size(const char *name); const char * cfg_getarr_elem(const char *name, int i); +int +cfg_reset_trigger(const char *name, struct rlist *list, + lbox_push_event_f push_event, lbox_pop_event_f pop_event); #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua new file mode 100644 index 000000000..49f320d3e --- /dev/null +++ b/test/replication/master_onctl.lua @@ -0,0 +1,37 @@ +#!/usr/bin/env tarantool +os = require('os') + +SYSTEM_SPACE_CREATE = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} +REPLICA_CONNECTION_ERROR = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then + SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then + table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/onctl.result b/test/replication/onctl.result new file mode 100644 index 000000000..7b7208464 --- /dev/null +++ b/test/replication/onctl.result @@ -0,0 +1,259 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +test_run:cmd("create server master with script='replication/master_onctl.lua'") +--- +- true +... +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") +--- +- true +... +test_run:cmd("start server master") +--- +- true +... +test_run:cmd("switch master") +--- +- true +... +box.schema.user.grant('guest', 'replication') +--- +... +SYSTEM_SPACE_CREATE +--- +- 18 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +--- +- - 1 + - 1 +... +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE +--- +- - 1 +... +REPLICA_CONNECTION_ERROR +--- +- [] +... +REPLICASET_ADD = {} +--- +... +REPLICASET_REMOVE = {} +--- +... +new_replica_id = 0 +--- +... +deleted_replica_id = 0 +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +_ = box.ctl.on_ctl_event(on_ctl_new) +--- +... +test_run:cmd("start server replica") +--- +- true +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +REPLICA_CONNECTION_ERROR +--- +- [] +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 0 +... +test_run:cmd("switch replica") +--- +- true +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; +--- +... +function on_ctl_error(ctx) + error("trigger error") +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +SYSTEM_SPACE_CREATE +--- +- 18 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- [] +... +box.cfg{read_only = true} +--- +... +fiber = require("fiber") +--- +... +while READ_ONLY == 0 do fiber.sleep(0.001) end +--- +... +READ_ONLY +--- +- 2 +... +box.cfg{on_ctl_event = on_ctl_error} +--- +... +box.cfg{read_only = false} +--- +... +test_run:grep_log('replica', 'ctl_trigger error') +--- +- ctl_trigger error +... +box.cfg{on_ctl_event = on_ctl_shutdown} +--- +... +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica. +-- test_run:grep_log('replica', 'test replica shutdown', 10000, true) +test_run:cmd("switch master") +--- +- true +... +REPLICA_CONNECTION_ERROR +--- +- - 2 +... +box.schema.user.revoke('guest', 'replication') +--- +... +_ = box.space._cluster:delete{2} +--- +... +SYSTEM_SPACE_CREATE +--- +- 18 +... +LOCAL_RECOVERY +--- +- 1 +... +READ_ONLY +--- +- 1 +... +READ_WRITE +--- +- 1 +... +REPLICASET_ADD +--- +- - 2 +... +REPLICASET_REMOVE +--- +- - 2 +... +new_replica_id +--- +- 2 +... +deleted_replica_id +--- +- 2 +... +box.ctl.on_ctl_event(nil, on_ctl_new) +--- +... +-- cleanup +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server master") +--- +- true +... +test_run:cmd("cleanup server master") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua new file mode 100644 index 000000000..00f2d0109 --- /dev/null +++ b/test/replication/onctl.test.lua @@ -0,0 +1,107 @@ +env = require('test_run') +test_run = env.new() + +test_run:cmd("create server master with script='replication/master_onctl.lua'") +test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") + +test_run:cmd("start server master") +test_run:cmd("switch master") +box.schema.user.grant('guest', 'replication') + +SYSTEM_SPACE_CREATE +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +-- must be two entries. First from bootstrap.snap, second for current instance. +REPLICASET_ADD +-- must be one entry. Deletion of initial tuple in _cluster space. +REPLICASET_REMOVE +REPLICA_CONNECTION_ERROR + +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +new_replica_id = 0 +deleted_replica_id = 0 + +test_run:cmd("setopt delimiter ';'") +function on_ctl_new(ctx) + if ctx.type == box.ctl.event.REPLICASET_ADD then + new_replica_id = ctx.replica_id + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + deleted_replica_id = ctx.replica_id + end +end; +test_run:cmd("setopt delimiter ''"); + +_ = box.ctl.on_ctl_event(on_ctl_new) + +test_run:cmd("start server replica") + +REPLICASET_ADD +REPLICASET_REMOVE +REPLICA_CONNECTION_ERROR + +new_replica_id +deleted_replica_id + +test_run:cmd("switch replica") + +test_run:cmd("setopt delimiter ';'") +function on_ctl_shutdown(ctx) + if ctx.type == box.ctl.event.SHUTDOWN then + require("log").info("test replica shutdown") + end +end; + +function on_ctl_error(ctx) + error("trigger error") +end; + +test_run:cmd("setopt delimiter ''"); + +SYSTEM_SPACE_CREATE +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +box.cfg{read_only = true} +fiber = require("fiber") +while READ_ONLY == 0 do fiber.sleep(0.001) end +READ_ONLY + +box.cfg{on_ctl_event = on_ctl_error} +box.cfg{read_only = false} +test_run:grep_log('replica', 'ctl_trigger error') +box.cfg{on_ctl_event = on_ctl_shutdown} + +test_run:cmd("restart server replica") +-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica. +-- test_run:grep_log('replica', 'test replica shutdown', 10000, true) + +test_run:cmd("switch master") +REPLICA_CONNECTION_ERROR + +box.schema.user.revoke('guest', 'replication') +_ = box.space._cluster:delete{2} + +SYSTEM_SPACE_CREATE +LOCAL_RECOVERY +READ_ONLY +READ_WRITE +REPLICASET_ADD +REPLICASET_REMOVE + +new_replica_id +deleted_replica_id + +box.ctl.on_ctl_event(nil, on_ctl_new) + +-- cleanup +test_run:cmd("switch default") +test_run:cmd("stop server master") +test_run:cmd("cleanup server master") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua new file mode 100644 index 000000000..8228762a3 --- /dev/null +++ b/test/replication/replica_onctl.lua @@ -0,0 +1,34 @@ +#!/usr/bin/env tarantool + +SYSTEM_SPACE_CREATE = 0 +LOCAL_RECOVERY = 0 +READ_ONLY = 0 +READ_WRITE = 0 +REPLICASET_ADD = {} +REPLICASET_REMOVE = {} + +local function onctl(ctx) + if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then + SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1 + elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then + LOCAL_RECOVERY = LOCAL_RECOVERY + 1 + elseif ctx.type == box.ctl.event.READ_ONLY then + READ_ONLY = READ_ONLY + 1 + elseif ctx.type == box.ctl.event.READ_WRITE then + READ_WRITE = READ_WRITE + 1 + elseif ctx.type == box.ctl.event.REPLICASET_ADD then + table.insert(REPLICASET_ADD, ctx.replica_id) + elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then + table.insert(REPLICASET_REMOVE, ctx.replica_id) + end +end + +box.cfg({ + listen = os.getenv("LISTEN"), + replication = os.getenv("MASTER"), + memtx_memory = 107374182, + replication_connect_timeout = 0.5, + on_ctl_event = onctl, +}) + +require('console').listen(os.getenv('ADMIN')) diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 95e94e5a2..365a82512 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -6,6 +6,7 @@ "wal_off.test.lua": {}, "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, + "onctl.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} -- 2.14.3 (Apple Git-98)