* [tarantool-patches] [PATCH v2] box.ctl: implement a trigger on any control events
@ 2018-08-03 8:04 Konstantin Belyavskiy
0 siblings, 0 replies; only message in thread
From: Konstantin Belyavskiy @ 2018-08-03 8:04 UTC (permalink / raw)
To: tarantool-patches
From: Ilya Markov <imarkov@tarantool.org>
This patch is based on original Ilya Markov's patchset
(gh-3159-box-on-ctl-event)
Supported control events:
* Completion of recovery of all system spaces. Called on finish of
bootstrap or finish of join or snapshot recovery.
* Completion of local recovery. Called on finish of bootstrap or
finish of recovery.
* Switch to read only/read write mode.
* Replicaset add/remove. Called on changes replica set.
* Controlled shutdown.
* Replica error event. Replica fails with some error.
Errors inside triggers are logged and don't influence on instance
behaviour.
All supported control events are exported to Lua as a constants
within namespace box.ctl.event, here is a full list:
- SYSTEM_SPACE_RECOVERY
- LOCAL_RECOVERY
- READ_ONLY
- READ_WRITE
- SHUTDOWN
- REPLICASET_ADD
- REPLICASET_REMOVE
- REPLICA_CONNECTION_ERROR
Introduce new configuration option: 'on_ctl_event' with type
function and one one argument - context.
For every events context has required field 'type', one of
constants mentioned above.
For REPLICASET_ADD, REPLICASET_REMOVE and REPLICA_CONNECTION_ERROR
an additional field 'replica_id' is also available.
Usage example. In Lua script a callback function with actions on
specific controlled events must be defined, e.g:
local function onctl(ctx)
if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
-- specific action #1
elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
-- specific action #2
...
end
end
Then this function passed as a parameter to box.cfg:
on_ctl_event = onctl
Closes #3159
Changes in V2:
Squashed 3 to 1
Small fixes: remove unused headers, rename function.
Design changes:
- remove fiber for ro condition checks.
- remove support for 'table' type callback.
- insert completion of recovery of all system spaces trigger
right after sys spaces was recovered.
- add documentation.
---
Ticket: https://github.com/tarantool/tarantool/issues/3159
Branch: gh-3159-box-on-ctl-event-v2
src/box/CMakeLists.txt | 1 +
src/box/box.cc | 63 ++++++++-
src/box/box.h | 1 +
src/box/ctl.c | 63 +++++++++
src/box/ctl.h | 79 +++++++++++
src/box/engine.c | 13 ++
src/box/lua/cfg.cc | 12 ++
src/box/lua/ctl.c | 54 ++++++++
src/box/lua/ctl.h | 2 +
src/box/lua/load_cfg.lua | 10 +-
src/box/memtx_engine.c | 27 +++-
src/box/memtx_engine.h | 9 ++
src/box/relay.cc | 7 +
src/box/replication.cc | 14 ++
src/cfg.c | 17 +++
src/cfg.h | 4 +
test/replication/master_onctl.lua | 37 ++++++
test/replication/onctl.result | 259 +++++++++++++++++++++++++++++++++++++
test/replication/onctl.test.lua | 107 +++++++++++++++
test/replication/replica_onctl.lua | 34 +++++
test/replication/suite.cfg | 1 +
21 files changed, 807 insertions(+), 7 deletions(-)
create mode 100644 src/box/ctl.c
create mode 100644 src/box/ctl.h
create mode 100644 test/replication/master_onctl.lua
create mode 100644 test/replication/onctl.result
create mode 100644 test/replication/onctl.test.lua
create mode 100644 test/replication/replica_onctl.lua
diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 6b1ae3e80..74f6b5497 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -112,6 +112,7 @@ add_library(box STATIC
journal.c
wal.c
call.c
+ ctl.c
${lua_sources}
lua/init.c
lua/call.c
diff --git a/src/box/box.cc b/src/box/box.cc
index 61bfa117d..b6e5c8c81 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -72,6 +72,7 @@
#include "call.h"
#include "func.h"
#include "sequence.h"
+#include "ctl.h"
static char status[64] = "unknown";
@@ -104,6 +105,7 @@ static struct gc_consumer *backup_gc;
static bool is_box_configured = false;
static bool is_ro = true;
static fiber_cond ro_cond;
+static bool sys_space_recovered = true;
/**
* The following flag is set if the instance failed to
@@ -190,11 +192,25 @@ process_rw(struct request *request, struct space *space, struct tuple **result)
return 0;
}
+static void
+on_ro_cond_change(void)
+{
+ if (run_on_ctl_event_trigger_type(box_is_ro() ? CTL_EVENT_READ_ONLY:
+ CTL_EVENT_READ_WRITE) < 0)
+ say_error("ctl_trigger error in %s: %s",
+ box_is_ro() ? "read_only" :"read_write",
+ diag_last_error(diag_get())->errmsg);
+ fiber_cond_broadcast(&ro_cond);
+}
+
void
box_set_ro(bool ro)
{
+ if (is_ro == ro)
+ return; /* nothing to do */
+
is_ro = ro;
- fiber_cond_broadcast(&ro_cond);
+ on_ro_cond_change();
}
bool
@@ -225,7 +241,7 @@ box_clear_orphan(void)
return; /* nothing to do */
is_orphan = false;
- fiber_cond_broadcast(&ro_cond);
+ on_ro_cond_change();
/* Update the title to reflect the new status. */
title("running");
@@ -320,6 +336,15 @@ apply_initial_join_row(struct xstream *stream, struct xrow_header *row)
struct request request;
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
struct space *space = space_cache_find_xc(request.space_id);
+ say_info("QQQ recover_snap_row for space %d", space->def->id);
+ if (space->def->id > BOX_SYSTEM_ID_MAX && !sys_space_recovered) {
+ sys_space_recovered = true;
+ if (run_on_ctl_event_trigger_type(
+ CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space "
+ "recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+ }
/* no access checks here - applier always works with admin privs */
space_apply_initial_join_row_xc(space, &request);
}
@@ -823,6 +848,13 @@ box_set_net_msg_max(void)
IPROTO_FIBER_POOL_SIZE_FACTOR);
}
+void
+box_set_on_ctl_event_xc(void)
+{
+ if (cfg_reset_on_ctl_event() < 0)
+ diag_raise();
+}
+
/* }}} configuration bindings */
/**
@@ -1565,6 +1597,9 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
void
box_free(void)
{
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0)
+ say_error("ctl_trigger error in shutdown: %s",
+ diag_last_error(diag_get())->errmsg);
/*
* See gh-584 "box_free() is called even if box is not
* initialized
@@ -1683,8 +1718,17 @@ bootstrap_from_master(struct replica *master)
* Process initial data (snapshot or dirty disk data).
*/
engine_begin_initial_recovery_xc(NULL);
+ set_sys_space_recovered(false);
applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY);
-
+ /**
+ * It Could be only system spaces, so check is_sys_space_recovered()
+ * and set trigger if it still false.
+ */
+ if (!is_sys_space_recovered() &&
+ run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+ set_sys_space_recovered(true);
/*
* Process final data (WALs).
*/
@@ -1751,7 +1795,6 @@ void
box_init(void)
{
fiber_cond_create(&ro_cond);
-
user_cache_init();
/*
* The order is important: to initialize sessions,
@@ -1808,6 +1851,7 @@ box_cfg_xc(void)
box_set_replication_connect_timeout();
box_set_replication_connect_quorum();
box_set_replication_skip_conflict();
+ box_set_on_ctl_event_xc();
replication_sync_lag = box_check_replication_sync_lag();
xstream_create(&join_stream, apply_initial_join_row);
xstream_create(&subscribe_stream, apply_row);
@@ -1874,9 +1918,18 @@ box_cfg_xc(void)
* recovery of system spaces issue DDL events in
* other engines.
*/
+ sys_space_recovered = false;
memtx_engine_recover_snapshot_xc(memtx,
&last_checkpoint_vclock);
-
+ /**
+ * It Could be only system spaces, so check sys_space_recovered
+ * and set trigger if it still false.
+ */
+ if (!sys_space_recovered &&
+ run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+ sys_space_recovered = true;
engine_begin_final_recovery_xc();
recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
cfg_getd("wal_dir_rescan_delay"));
diff --git a/src/box/box.h b/src/box/box.h
index d3967891d..086030191 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -191,6 +191,7 @@ void box_set_replication_connect_timeout(void);
void box_set_replication_connect_quorum(void);
void box_set_replication_skip_conflict(void);
void box_set_net_msg_max(void);
+void box_set_on_ctl_event_xc(void);
extern "C" {
#endif /* defined(__cplusplus) */
diff --git a/src/box/ctl.c b/src/box/ctl.c
new file mode 100644
index 000000000..fc3ae37f7
--- /dev/null
+++ b/src/box/ctl.c
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <lib/small/small/rlist.h>
+#include <trigger.h>
+#include <box/ctl.h>
+#include "errcode.h"
+#include "error.h"
+#include <exception.h>
+
+RLIST_HEAD(on_ctl_event);
+
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result) {
+ return trigger_run(&on_ctl_event, (void *) result);
+}
+
+int
+run_on_ctl_event_trigger_type(enum ctl_event_type type)
+{
+ struct on_ctl_event_ctx ctx = {};
+ ctx.type = type;
+ return run_on_ctl_event_triggers(&ctx);
+}
+
+int
+cfg_reset_on_ctl_event()
+{
+ if (cfg_reset_trigger("on_ctl_event", &on_ctl_event,
+ lbox_push_on_ctl_event, NULL) < 0) {
+ diag_set(ClientError, ER_CFG, "on_ctl_event",
+ "expected function or table");
+ return -1;
+ }
+ return 0;
+}
\ No newline at end of file
diff --git a/src/box/ctl.h b/src/box/ctl.h
new file mode 100644
index 000000000..6fae5e48d
--- /dev/null
+++ b/src/box/ctl.h
@@ -0,0 +1,79 @@
+#ifndef INCLUDES_TARANTOOL_CTL_H
+#define INCLUDES_TARANTOOL_CTL_H
+
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <cfg.h>
+#include <box/lua/ctl.h>
+
+/** Global on-ctl_event triggers. */
+extern struct rlist on_ctl_event;
+
+enum ctl_event_type {
+ CTL_EVENT_SYSTEM_SPACE_RECOVERY,
+ CTL_EVENT_LOCAL_RECOVERY,
+ CTL_EVENT_READ_ONLY,
+ CTL_EVENT_READ_WRITE,
+ CTL_EVENT_SHUTDOWN,
+ CTL_EVENT_REPLICASET_ADD,
+ CTL_EVENT_REPLICASET_REMOVE,
+ CTL_EVENT_REPLICA_CONNECTION_ERROR,
+};
+
+struct on_ctl_event_ctx {
+ enum ctl_event_type type;
+ uint32_t replica_id;
+};
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Runs on_ctl_event triggers with specified context.
+ */
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result);
+
+/**
+ * Runs on_ctl_event triggers with specified type.
+ */
+int
+run_on_ctl_event_trigger_type(enum ctl_event_type type);
+
+int
+cfg_reset_on_ctl_event();
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* INCLUDES_TARANTOOL_LUA_CTL_H */
diff --git a/src/box/engine.c b/src/box/engine.c
index 82293fd18..7f91fa2e5 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -29,6 +29,7 @@
* SUCH DAMAGE.
*/
#include "engine.h"
+#include "ctl.h"
#include <stdint.h>
#include <string.h>
@@ -73,6 +74,14 @@ engine_bootstrap(void)
if (engine->vtab->bootstrap(engine) != 0)
return -1;
}
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+ say_error("ctl_trigger error in local recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
return 0;
}
@@ -111,6 +120,10 @@ engine_end_recovery(void)
if (engine->vtab->end_recovery(engine) != 0)
return -1;
}
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+ say_error("ctl_trigger error in local recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
return 0;
}
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 5afebc94e..a077b663e 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -253,6 +253,17 @@ lbox_cfg_set_net_msg_max(struct lua_State *L)
return 0;
}
+static int
+lbox_cfg_set_on_ctl_event(struct lua_State *L)
+{
+ try {
+ box_set_on_ctl_event_xc();
+ } catch (Exception *) {
+ luaT_error(L);
+ }
+ return 0;
+}
+
static int
lbox_cfg_set_worker_pool_threads(struct lua_State *L)
{
@@ -331,6 +342,7 @@ box_lua_cfg_init(struct lua_State *L)
{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
+ {"cfg_set_on_ctl_event", lbox_cfg_set_on_ctl_event},
{NULL, NULL}
};
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 9a105ed5c..694d197e1 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -35,6 +35,8 @@
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
+#include <lua/trigger.h>
+#include <box/ctl.h>
#include "lua/utils.h"
@@ -64,9 +66,38 @@ lbox_ctl_wait_rw(struct lua_State *L)
return 0;
}
+
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event)
+{
+ struct on_ctl_event_ctx *ctx = (struct on_ctl_event_ctx *) event;
+ lua_newtable(L);
+ lua_pushstring(L, "type");
+ lua_pushinteger(L, ctx->type);
+ lua_settable(L, -3);
+
+ if (ctx->type == CTL_EVENT_REPLICASET_ADD ||
+ ctx->type == CTL_EVENT_REPLICASET_REMOVE ||
+ ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) {
+ lua_pushstring(L, "replica_id");
+ luaL_pushuint64(L, ctx->replica_id);
+ lua_settable(L, -3);
+ }
+ return 1;
+}
+
+static int
+lbox_on_ctl_event(struct lua_State *L)
+{
+ return lbox_trigger_reset(L, 2, &on_ctl_event,
+ lbox_push_on_ctl_event, NULL);
+}
+
+
static const struct luaL_Reg lbox_ctl_lib[] = {
{"wait_ro", lbox_ctl_wait_ro},
{"wait_rw", lbox_ctl_wait_rw},
+ {"on_ctl_event", lbox_on_ctl_event},
{NULL, NULL}
};
@@ -75,4 +106,27 @@ box_lua_ctl_init(struct lua_State *L)
{
luaL_register_module(L, "box.ctl", lbox_ctl_lib);
lua_pop(L, 1);
+
+ luaL_findtable(L, LUA_GLOBALSINDEX, "box.ctl", 1);
+ lua_newtable(L);
+ lua_setfield(L, -2, "event");
+ lua_getfield(L, -1, "event");
+
+ lua_pushnumber(L, CTL_EVENT_SYSTEM_SPACE_RECOVERY);
+ lua_setfield(L, -2, "SYSTEM_SPACE_RECOVERY");
+ lua_pushnumber(L, CTL_EVENT_LOCAL_RECOVERY);
+ lua_setfield(L, -2, "LOCAL_RECOVERY");
+ lua_pushnumber(L, CTL_EVENT_READ_ONLY);
+ lua_setfield(L, -2, "READ_ONLY");
+ lua_pushnumber(L, CTL_EVENT_READ_WRITE);
+ lua_setfield(L, -2, "READ_WRITE");
+ lua_pushnumber(L, CTL_EVENT_SHUTDOWN);
+ lua_setfield(L, -2, "SHUTDOWN");
+ lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD);
+ lua_setfield(L, -2, "REPLICASET_ADD");
+ lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
+ lua_setfield(L, -2, "REPLICASET_REMOVE");
+ lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR);
+ lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR");
+ lua_pop(L, 2); /* box, ctl */
}
diff --git a/src/box/lua/ctl.h b/src/box/lua/ctl.h
index e7c2edd15..ab63232dd 100644
--- a/src/box/lua/ctl.h
+++ b/src/box/lua/ctl.h
@@ -41,6 +41,8 @@ struct lua_State;
void
box_lua_ctl_init(struct lua_State *L);
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 0b668cdc6..035615f4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -1,7 +1,11 @@
-- load_cfg.lua - internal file
local log = require('log')
-local json = require('json')
+local json = require("json").new()
+json.cfg{
+ encode_use_tostring = true,
+}
+
local private = require('box.internal')
local urilib = require('uri')
local math = require('math')
@@ -64,6 +68,7 @@ local default_cfg = {
feedback_host = "https://feedback.tarantool.io",
feedback_interval = 3600,
net_msg_max = 768,
+ on_ctl_event = nil,
}
-- types of available options
@@ -125,6 +130,7 @@ local template_cfg = {
feedback_host = 'string',
feedback_interval = 'number',
net_msg_max = 'number',
+ on_ctl_event = 'function',
}
local function normalize_uri(port)
@@ -216,6 +222,7 @@ local dynamic_cfg = {
replicaset_uuid = check_replicaset_uuid,
replication_skip_conflict = private.cfg_set_replication_skip_conflict,
net_msg_max = private.cfg_set_net_msg_max,
+ on_ctl_event = private.cfg_set_on_ctl_event,
}
local dynamic_cfg_skip_at_load = {
@@ -232,6 +239,7 @@ local dynamic_cfg_skip_at_load = {
force_recovery = true,
instance_uuid = true,
replicaset_uuid = true,
+ on_ctl_event = true,
}
local function convert_gb(size)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index fac84ce11..93a8458c5 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,21 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
+#include "ctl.h"
+
+static bool sys_space_recovered = true;
+
+bool
+is_sys_space_recovered(void)
+{
+ return sys_space_recovered;
+}
+
+void
+set_sys_space_recovered(bool val)
+{
+ sys_space_recovered = val;
+}
static void
txn_on_yield_or_stop(struct trigger *trigger, void *event)
@@ -197,7 +212,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
*/
if (!xlog_cursor_is_eof(&cursor))
panic("snapshot `%s' has no EOF marker", filename);
-
return 0;
}
@@ -218,6 +232,17 @@ memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct space *space = space_cache_find(request.space_id);
if (space == NULL)
return -1;
+
+ say_info("QQQ recover_snap_row for space %d", space->def->id);
+ if (space->def->id > BOX_SYSTEM_ID_MAX && !sys_space_recovered) {
+ sys_space_recovered = true;
+ if (run_on_ctl_event_trigger_type(
+ CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space "
+ "recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+ }
+
/* memtx snapshot must contain only memtx spaces */
if (space->engine != (struct engine *)memtx) {
diag_set(ClientError, ER_CROSS_ENGINE_TRANSACTION);
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index 0f8e92ee4..ef9cb10a6 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -171,6 +171,15 @@ struct memtx_gc_task {
const struct memtx_gc_task_vtab *vtab;
};
+/**
+ * Get and set methods for sys_space_recovered variable, needed
+ * for completion of recovery of all system spaces trigger.
+ */
+bool
+is_sys_space_recovered(void);
+void
+set_sys_space_recovered(bool val);
+
/**
* Schedule a garbage collection task for execution.
*/
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a25cc540b..d535f830b 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -52,6 +52,7 @@
#include "xrow_io.h"
#include "xstream.h"
#include "wal.h"
+#include "ctl.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -548,6 +549,12 @@ relay_subscribe_f(va_list ap)
if (!diag_is_empty(&relay->diag)) {
/* An error has occurred while reading ACKs of xlog. */
diag_move(&relay->diag, diag_get());
+ struct on_ctl_event_ctx ctx;
+ ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR;
+ ctx.replica_id = relay->replica->id;
+ if (run_on_ctl_event_triggers(&ctx) < 0)
+ say_error("ctl_trigger error in replica error: %s",
+ diag_last_error(diag_get())->errmsg);
/* Reference the diag in the status. */
diag_add_error(&relay->diag, diag_last_error(diag_get()));
}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e176984..75aecd03f 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -41,6 +41,7 @@
#include "error.h"
#include "relay.h"
#include "vclock.h" /* VCLOCK_MAX */
+#include "ctl.h"
uint32_t instance_id = REPLICA_ID_NIL;
struct tt_uuid INSTANCE_UUID;
@@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
replica->uuid = *replica_uuid;
replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+ on_ctl_ctx.replica_id = replica_id;
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica add: %s",
+ diag_last_error(diag_get())->errmsg);
return replica;
}
@@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica)
* Some records may arrive later on due to asynchronous nature of
* replication.
*/
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+ on_ctl_ctx.replica_id = replica->id;
+
replicaset.replica_by_id[replica->id] = NULL;
replica->id = REPLICA_ID_NIL;
if (replica_is_orphan(replica)) {
replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica remove: %s",
+ diag_last_error(diag_get())->errmsg);
}
static void
diff --git a/src/cfg.c b/src/cfg.c
index 7c7d6e793..9dfcae162 100644
--- a/src/cfg.c
+++ b/src/cfg.c
@@ -153,3 +153,20 @@ cfg_getarr_elem(const char *name, int i)
lua_pop(tarantool_L, 2);
return val;
}
+
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+ lbox_push_event_f push_event, lbox_pop_event_f pop_event)
+{
+ cfg_get(name);
+ struct lua_State *L = tarantool_L;
+ if (lua_isnil(L, -1))
+ return 0;
+ if (!lua_isfunction(L, -1))
+ return -1;
+ lua_pushnil(L);
+ int rc = lbox_trigger_reset(L, lua_gettop(L), list,
+ push_event, pop_event);
+ lua_pop(L, 1);
+ return rc;
+}
diff --git a/src/cfg.h b/src/cfg.h
index 8499388b8..d36465c49 100644
--- a/src/cfg.h
+++ b/src/cfg.h
@@ -32,6 +32,7 @@
*/
#include <stdint.h>
+#include <lua/trigger.h>
#if defined(__cplusplus)
extern "C" {
@@ -61,6 +62,9 @@ cfg_getarr_size(const char *name);
const char *
cfg_getarr_elem(const char *name, int i);
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+ lbox_push_event_f push_event, lbox_pop_event_f pop_event);
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
new file mode 100644
index 000000000..355e79186
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,37 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+REPLICA_CONNECTION_ERROR = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then
+ table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 000000000..d9de479fe
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,259 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+---
+- - 1
+ - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+---
+...
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 2
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+test_run:cmd("switch master")
+---
+- true
+...
+REPLICA_CONNECTION_ERROR
+---
+- - 2
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 000000000..9518686ae
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,107 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+
+test_run:cmd("switch master")
+REPLICA_CONNECTION_ERROR
+
+box.schema.user.revoke('guest', 'replication')
+_ = box.space._cluster:delete{2}
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+box.ctl.on_ctl_event(nil, on_ctl_new)
+
+-- cleanup
+test_run:cmd("switch default")
+test_run:cmd("stop server master")
+test_run:cmd("cleanup server master")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua
new file mode 100644
index 000000000..d6ce73c82
--- /dev/null
+++ b/test/replication/replica_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = os.getenv("MASTER"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5a2..365a82512 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
"wal_off.test.lua": {},
"hot_standby.test.lua": {},
"rebootstrap.test.lua": {},
+ "onctl.test.lua": {},
"*": {
"memtx": {"engine": "memtx"},
"vinyl": {"engine": "vinyl"}
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-08-03 8:04 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-08-03 8:04 [tarantool-patches] [PATCH v2] box.ctl: implement a trigger on any control events Konstantin Belyavskiy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox