* [tarantool-patches] [PATCH v3] box.ctl: implement a trigger on any control events
@ 2018-08-09 8:38 Konstantin Belyavskiy
0 siblings, 0 replies; only message in thread
From: Konstantin Belyavskiy @ 2018-08-09 8:38 UTC (permalink / raw)
To: tarantool-patches
This patch is based on original Ilya Markov's patchset
(gh-3159-box-on-ctl-event)
Supported control events:
* Creation of system space.
* Completion of local recovery. Called on finish of bootstrap or
finish of recovery.
* Switch to read only/read write mode.
* Replicaset add/remove. Called on changes replica set.
* Controlled shutdown.
* Replica error event. Replica fails with some error.
Errors inside triggers are logged and don't influence on instance
behaviour.
All supported control events are exported to Lua as a constants
within namespace box.ctl.event, here is a full list:
- SYSTEM_SPACE_CREATE
- LOCAL_RECOVERY
- READ_ONLY
- READ_WRITE
- SHUTDOWN
- REPLICASET_ADD
- REPLICASET_REMOVE
- REPLICA_CONNECTION_ERROR
Introduce new configuration option: 'on_ctl_event' with type
function and one one argument - context.
For every events context has required field 'type', one of
constants mentioned above.
For REPLICASET_ADD, REPLICASET_REMOVE and REPLICA_CONNECTION_ERROR
an additional field 'replica_id' is also available.
Usage example. In Lua script a callback function with actions on
specific controlled events must be defined, e.g:
local function onctl(ctx)
if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
-- specific action #1
elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
-- specific action #2
...
end
end
Then this function passed as a parameter to box.cfg:
on_ctl_event = onctl
Closes #3159
Changes in V2:
Squashed 3 to 1
Small fixes: remove unused headers, rename function.
Design changes:
- remove fiber for ro condition checks.
- remove support for 'table' type callback.
- insert completion of recovery of all system spaces trigger
right after sys spaces was recovered.
- add documentation.
Changes in V3:
- instead of all system spaces recovery, set trigger on each
system space creation.
---
Ticket: https://github.com/tarantool/tarantool/issues/3159
Branch: kbelyavs/gh-3159-box-on-ctl-event
src/box/CMakeLists.txt | 1 +
src/box/alter.cc | 5 +
src/box/box.cc | 26 +++-
src/box/box.h | 1 +
src/box/ctl.c | 76 +++++++++++
src/box/ctl.h | 81 ++++++++++++
src/box/engine.c | 3 +
src/box/lua/cfg.cc | 12 ++
src/box/lua/ctl.c | 54 ++++++++
src/box/lua/ctl.h | 2 +
src/box/lua/load_cfg.lua | 10 +-
src/box/memtx_engine.c | 1 +
src/box/relay.cc | 7 +
src/box/replication.cc | 14 ++
src/cfg.c | 17 +++
src/cfg.h | 4 +
test/replication/master_onctl.lua | 37 ++++++
test/replication/onctl.result | 259 +++++++++++++++++++++++++++++++++++++
test/replication/onctl.test.lua | 107 +++++++++++++++
test/replication/replica_onctl.lua | 34 +++++
test/replication/suite.cfg | 1 +
21 files changed, 748 insertions(+), 4 deletions(-)
create mode 100644 src/box/ctl.c
create mode 100644 src/box/ctl.h
create mode 100644 test/replication/master_onctl.lua
create mode 100644 test/replication/onctl.result
create mode 100644 test/replication/onctl.test.lua
create mode 100644 test/replication/replica_onctl.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ad544270b..49e387fde 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -112,6 +112,7 @@ add_library(box STATIC
journal.c
wal.c
call.c
+ ctl.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 3007a131d..d955bfc03 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -52,6 +52,7 @@
#include "identifier.h"
#include "version.h"
#include "sequence.h"
+#include "ctl.h"
/**
* chap-sha1 of empty string, i.e.
@@ -1622,6 +1623,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
struct trigger *on_rollback =
txn_alter_trigger_new(on_create_space_rollback, space);
txn_on_rollback(txn, on_rollback);
+ if (space->def->id < BOX_SYSTEM_ID_MAX)
+ on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE);
} else if (new_tuple == NULL) { /* DELETE */
access_check_ddl(old_space->def->name, old_space->def->uid,
SC_SPACE, PRIV_D, true);
@@ -1719,6 +1722,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
(void) new UpdateSchemaVersion(alter);
alter_space_do(txn, alter);
alter_guard.is_active = false;
+ if (alter->new_space->def->id < BOX_SYSTEM_ID_MAX)
+ on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE);
}
}
diff --git a/src/box/box.cc b/src/box/box.cc
index ee12d5738..9d195322c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -73,6 +73,7 @@
#include "call.h"
#include "func.h"
#include "sequence.h"
+#include "ctl.h"
static char status[64] = "unknown";
@@ -209,11 +210,22 @@ process_nop(struct request *request)
return txn_commit_stmt(txn, request);
}
+static void
+on_ro_cond_change(void)
+{
+ on_ctl_event_type(box_is_ro() ? CTL_EVENT_READ_ONLY:
+ CTL_EVENT_READ_WRITE);
+ fiber_cond_broadcast(&ro_cond);
+}
+
void
box_set_ro(bool ro)
{
+ if (is_ro == ro)
+ return; /* nothing to do */
+
is_ro = ro;
- fiber_cond_broadcast(&ro_cond);
+ on_ro_cond_change();
}
bool
@@ -244,7 +256,7 @@ box_clear_orphan(void)
return; /* nothing to do */
is_orphan = false;
- fiber_cond_broadcast(&ro_cond);
+ on_ro_cond_change();
/* Update the title to reflect the new status. */
title("running");
@@ -840,6 +852,13 @@ box_set_net_msg_max(void)
IPROTO_FIBER_POOL_SIZE_FACTOR);
}
+void
+box_set_on_ctl_event(void)
+{
+ if (cfg_reset_on_ctl_event() < 0)
+ diag_raise();
+}
+
/* }}} configuration bindings */
/**
@@ -1592,6 +1611,7 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
void
box_free(void)
{
+ on_ctl_event_type(CTL_EVENT_SHUTDOWN);
/*
* See gh-584 "box_free() is called even if box is not
* initialized
@@ -1932,7 +1952,6 @@ void
box_init(void)
{
fiber_cond_create(&ro_cond);
-
user_cache_init();
/*
* The order is important: to initialize sessions,
@@ -1989,6 +2008,7 @@ box_cfg_xc(void)
box_set_replication_connect_timeout();
box_set_replication_connect_quorum();
box_set_replication_skip_conflict();
+ box_set_on_ctl_event();
replication_sync_lag = box_check_replication_sync_lag();
xstream_create(&join_stream, apply_initial_join_row);
xstream_create(&subscribe_stream, apply_row);
diff --git a/src/box/box.h b/src/box/box.h
index e2e06d977..3e55ee6c4 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -193,6 +193,7 @@ void box_set_replication_connect_timeout(void);
void box_set_replication_connect_quorum(void);
void box_set_replication_skip_conflict(void);
void box_set_net_msg_max(void);
+void box_set_on_ctl_event(void);
extern "C" {
#endif /* defined(__cplusplus) */
diff --git a/src/box/ctl.c b/src/box/ctl.c
new file mode 100644
index 000000000..d3a2077ad
--- /dev/null
+++ b/src/box/ctl.c
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <lib/small/small/rlist.h>
+#include <trigger.h>
+#include <box/ctl.h>
+#include "errcode.h"
+#include "error.h"
+#include <exception.h>
+
+RLIST_HEAD(on_ctl_event);
+
+const char* ctltype2str[CTL_LAST_POS_GUARD] = {
+ "system space create", // CTL_EVENT_SYSTEM_SPACE_CREATE
+ "local recovery", // CTL_EVENT_LOCAL_RECOVERY
+ "read only", // CTL_EVENT_READ_ONLY
+ "read write", // CTL_EVENT_READ_WRITE
+ "shutdown", // CTL_EVENT_SHUTDOWN
+ "replicaset add", // CTL_EVENT_REPLICASET_ADD
+ "replicaset remove", // CTL_EVENT_REPLICASET_REMOVE
+ "replica connect error", // CTL_EVENT_REPLICA_CONNECTION_ERROR
+};
+
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result) {
+ return trigger_run(&on_ctl_event, (void *) result);
+}
+
+void
+on_ctl_event_type(enum ctl_event_type type)
+{
+ struct on_ctl_event_ctx ctx = {};
+ ctx.type = type;
+ if (run_on_ctl_event_triggers(&ctx) < 0)
+ say_error("ctl_trigger error in %s: %s", ctltype2str[type],
+ diag_last_error(diag_get())->errmsg);
+}
+
+int
+cfg_reset_on_ctl_event()
+{
+ if (cfg_reset_trigger("on_ctl_event", &on_ctl_event,
+ lbox_push_on_ctl_event, NULL) < 0) {
+ diag_set(ClientError, ER_CFG, "on_ctl_event",
+ "expected function or table");
+ return -1;
+ }
+ return 0;
+}
diff --git a/src/box/ctl.h b/src/box/ctl.h
new file mode 100644
index 000000000..751ba0149
--- /dev/null
+++ b/src/box/ctl.h
@@ -0,0 +1,81 @@
+#ifndef INCLUDES_TARANTOOL_CTL_H
+#define INCLUDES_TARANTOOL_CTL_H
+
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <cfg.h>
+#include <box/lua/ctl.h>
+
+/** Global on-ctl_event triggers. */
+extern struct rlist on_ctl_event;
+
+enum ctl_event_type {
+ CTL_EVENT_SYSTEM_SPACE_CREATE,
+ CTL_EVENT_LOCAL_RECOVERY,
+ CTL_EVENT_READ_ONLY,
+ CTL_EVENT_READ_WRITE,
+ CTL_EVENT_SHUTDOWN,
+ CTL_EVENT_REPLICASET_ADD,
+ CTL_EVENT_REPLICASET_REMOVE,
+ CTL_EVENT_REPLICA_CONNECTION_ERROR,
+ CTL_LAST_POS_GUARD,
+};
+
+struct on_ctl_event_ctx {
+ enum ctl_event_type type;
+ uint32_t replica_id;
+};
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Runs on_ctl_event triggers with specified context.
+ */
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result);
+
+/**
+ * Runs on_ctl_event trigger with specified type and
+ * log error if any.
+ */
+void
+on_ctl_event_type(enum ctl_event_type type);
+
+int
+cfg_reset_on_ctl_event();
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* INCLUDES_TARANTOOL_LUA_CTL_H */
diff --git a/src/box/engine.c b/src/box/engine.c
index 2a30dcddd..7f5082b69 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "engine.h"
+#include "ctl.h"
#include <stdint.h>
#include <string.h>
@@ -73,6 +74,7 @@ engine_bootstrap(void)
if (engine->vtab->bootstrap(engine) != 0)
return -1;
}
+ on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY);
return 0;
}
@@ -111,6 +113,7 @@ engine_end_recovery(void)
if (engine->vtab->end_recovery(engine) != 0)
return -1;
}
+ on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY);
return 0;
}
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 0f6b8a5a3..baf4a99d5 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -252,6 +252,17 @@ lbox_cfg_set_net_msg_max(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_on_ctl_event(struct lua_State *L)
+{
+ try {
+ box_set_on_ctl_event();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
static int
lbox_cfg_set_worker_pool_threads(struct lua_State *L)
{
@@ -330,6 +341,7 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
+ {"cfg_set_on_ctl_event", lbox_cfg_set_on_ctl_event},
{NULL, NULL}
};
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 9a105ed5c..5b78875d5 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -35,6 +35,8 @@
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
+#include <lua/trigger.h>
+#include <box/ctl.h>
#include "lua/utils.h"
@@ -64,9 +66,38 @@ lbox_ctl_wait_rw(struct lua_State *L)
return 0;
}
+
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event)
+{
+ struct on_ctl_event_ctx *ctx = (struct on_ctl_event_ctx *) event;
+ lua_newtable(L);
+ lua_pushstring(L, "type");
+ lua_pushinteger(L, ctx->type);
+ lua_settable(L, -3);
+
+ if (ctx->type == CTL_EVENT_REPLICASET_ADD ||
+ ctx->type == CTL_EVENT_REPLICASET_REMOVE ||
+ ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) {
+ lua_pushstring(L, "replica_id");
+ luaL_pushuint64(L, ctx->replica_id);
+ lua_settable(L, -3);
+ }
+ return 1;
+}
+
+static int
+lbox_on_ctl_event(struct lua_State *L)
+{
+ return lbox_trigger_reset(L, 2, &on_ctl_event,
+ lbox_push_on_ctl_event, NULL);
+}
+
+
static const struct luaL_Reg lbox_ctl_lib[] = {
{"wait_ro", lbox_ctl_wait_ro},
{"wait_rw", lbox_ctl_wait_rw},
+ {"on_ctl_event", lbox_on_ctl_event},
{NULL, NULL}
};
@@ -75,4 +106,27 @@ box_lua_ctl_init(struct lua_State *L)
{
luaL_register_module(L, "box.ctl", lbox_ctl_lib);
lua_pop(L, 1);
+
+ luaL_findtable(L, LUA_GLOBALSINDEX, "box.ctl", 1);
+ lua_newtable(L);
+ lua_setfield(L, -2, "event");
+ lua_getfield(L, -1, "event");
+
+ lua_pushnumber(L, CTL_EVENT_SYSTEM_SPACE_CREATE);
+ lua_setfield(L, -2, "SYSTEM_SPACE_CREATE");
+ lua_pushnumber(L, CTL_EVENT_LOCAL_RECOVERY);
+ lua_setfield(L, -2, "LOCAL_RECOVERY");
+ lua_pushnumber(L, CTL_EVENT_READ_ONLY);
+ lua_setfield(L, -2, "READ_ONLY");
+ lua_pushnumber(L, CTL_EVENT_READ_WRITE);
+ lua_setfield(L, -2, "READ_WRITE");
+ lua_pushnumber(L, CTL_EVENT_SHUTDOWN);
+ lua_setfield(L, -2, "SHUTDOWN");
+ lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD);
+ lua_setfield(L, -2, "REPLICASET_ADD");
+ lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
+ lua_setfield(L, -2, "REPLICASET_REMOVE");
+ lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR);
+ lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR");
+ lua_pop(L, 2); /* box, ctl */
}
diff --git a/src/box/lua/ctl.h b/src/box/lua/ctl.h
index e7c2edd15..ab63232dd 100644
--- a/src/box/lua/ctl.h
+++ b/src/box/lua/ctl.h
@@ -41,6 +41,8 @@ struct lua_State;
void
box_lua_ctl_init(struct lua_State *L);
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 0b668cdc6..035615f4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -1,7 +1,11 @@
-- load_cfg.lua - internal file
local log = require('log')
-local json = require('json')
+local json = require("json").new()
+json.cfg{
+ encode_use_tostring = true,
+}
+
local private = require('box.internal')
local urilib = require('uri')
local math = require('math')
@@ -64,6 +68,7 @@ local default_cfg = {
feedback_host = "https://feedback.tarantool.io",
feedback_interval = 3600,
net_msg_max = 768,
+ on_ctl_event = nil,
}
-- types of available options
@@ -125,6 +130,7 @@ local template_cfg = {
feedback_host = 'string',
feedback_interval = 'number',
net_msg_max = 'number',
+ on_ctl_event = 'function',
}
local function normalize_uri(port)
@@ -216,6 +222,7 @@ local dynamic_cfg = {
replicaset_uuid = check_replicaset_uuid,
replication_skip_conflict = private.cfg_set_replication_skip_conflict,
net_msg_max = private.cfg_set_net_msg_max,
+ on_ctl_event = private.cfg_set_on_ctl_event,
}
local dynamic_cfg_skip_at_load = {
@@ -232,6 +239,7 @@ local dynamic_cfg_skip_at_load = {
force_recovery = true,
instance_uuid = true,
replicaset_uuid = true,
+ on_ctl_event = true,
}
local function convert_gb(size)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f5ace9268..400fed95b 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "ctl.h"
/*
* Memtx yield-in-transaction trigger: roll back the effects
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 05468f203..19b3f3b7a 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -52,6 +52,7 @@
#include "xrow_io.h"
#include "xstream.h"
#include "wal.h"
+#include "ctl.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -551,6 +552,12 @@ relay_subscribe_f(va_list ap)
if (!diag_is_empty(&relay->diag)) {
/* An error has occurred while reading ACKs of xlog. */
diag_move(&relay->diag, diag_get());
+ struct on_ctl_event_ctx ctx;
+ ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR;
+ ctx.replica_id = relay->replica->id;
+ if (run_on_ctl_event_triggers(&ctx) < 0)
+ say_error("ctl_trigger error in replica error: %s",
+ diag_last_error(diag_get())->errmsg);
/* Reference the diag in the status. */
diag_add_error(&relay->diag, diag_last_error(diag_get()));
}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 26bbbe32a..4ee8ae5a3 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -42,6 +42,7 @@
#include "relay.h"
#include "vclock.h" /* VCLOCK_MAX */
#include "sio.h"
+#include "ctl.h"
uint32_t instance_id = REPLICA_ID_NIL;
struct tt_uuid INSTANCE_UUID;
@@ -173,6 +174,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
replica->uuid = *replica_uuid;
replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+ on_ctl_ctx.replica_id = replica_id;
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica add: %s",
+ diag_last_error(diag_get())->errmsg);
return replica;
}
@@ -204,6 +211,10 @@ replica_clear_id(struct replica *replica)
* Some records may arrive later on due to asynchronous nature of
* replication.
*/
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+ on_ctl_ctx.replica_id = replica->id;
+
replicaset.replica_by_id[replica->id] = NULL;
replica->id = REPLICA_ID_NIL;
/*
@@ -223,6 +234,9 @@ replica_clear_id(struct replica *replica)
replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica remove: %s",
+ diag_last_error(diag_get())->errmsg);
}
static void
diff --git a/src/cfg.c b/src/cfg.c
index 7c7d6e793..9dfcae162 100644
--- a/src/cfg.c
+++ b/src/cfg.c
@@ -153,3 +153,20 @@ cfg_getarr_elem(const char *name, int i)
lua_pop(tarantool_L, 2);
return val;
}
+
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+ lbox_push_event_f push_event, lbox_pop_event_f pop_event)
+{
+ cfg_get(name);
+ struct lua_State *L = tarantool_L;
+ if (lua_isnil(L, -1))
+ return 0;
+ if (!lua_isfunction(L, -1))
+ return -1;
+ lua_pushnil(L);
+ int rc = lbox_trigger_reset(L, lua_gettop(L), list,
+ push_event, pop_event);
+ lua_pop(L, 1);
+ return rc;
+}
diff --git a/src/cfg.h b/src/cfg.h
index 8499388b8..d36465c49 100644
--- a/src/cfg.h
+++ b/src/cfg.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
+#include <lua/trigger.h>
#if defined(__cplusplus)
extern "C" {
@@ -61,6 +62,9 @@ cfg_getarr_size(const char *name);
const char *
cfg_getarr_elem(const char *name, int i);
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+ lbox_push_event_f push_event, lbox_pop_event_f pop_event);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
new file mode 100644
index 000000000..49f320d3e
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,37 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_CREATE = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+REPLICA_CONNECTION_ERROR = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then
+ SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then
+ table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 000000000..7b7208464
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,259 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+---
+- - 1
+ - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+---
+...
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 2
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+test_run:cmd("switch master")
+---
+- true
+...
+REPLICA_CONNECTION_ERROR
+---
+- - 2
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 000000000..00f2d0109
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,107 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+
+test_run:cmd("switch master")
+REPLICA_CONNECTION_ERROR
+
+box.schema.user.revoke('guest', 'replication')
+_ = box.space._cluster:delete{2}
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+box.ctl.on_ctl_event(nil, on_ctl_new)
+
+-- cleanup
+test_run:cmd("switch default")
+test_run:cmd("stop server master")
+test_run:cmd("cleanup server master")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua
new file mode 100644
index 000000000..8228762a3
--- /dev/null
+++ b/test/replication/replica_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+
+SYSTEM_SPACE_CREATE = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then
+ SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = os.getenv("MASTER"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5a2..365a82512 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
"wal_off.test.lua": {},
"hot_standby.test.lua": {},
"rebootstrap.test.lua": {},
+ "onctl.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-08-09 8:38 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-09 8:38 [tarantool-patches] [PATCH v3] box.ctl: implement a trigger on any control events Konstantin Belyavskiy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox