[tarantool-patches] [PATCH v3] box.ctl: implement a trigger on any control events

Konstantin Belyavskiy k.belyavskiy at tarantool.org
Thu Aug 9 11:38:07 MSK 2018


This patch is based on original Ilya Markov's patchset
(gh-3159-box-on-ctl-event)

Supported control events:
* Creation of system space.
* Completion of local recovery. Called on finish of bootstrap or
  finish of recovery.
* Switch to read only/read write mode.
* Replicaset add/remove. Called on changes replica set.
* Controlled shutdown.
* Replica error event. Replica fails with some error.

Errors inside triggers are logged and don't influence on instance
behaviour.

All supported control events are exported to Lua as a constants
within namespace box.ctl.event, here is a full list:
 - SYSTEM_SPACE_CREATE
 - LOCAL_RECOVERY
 - READ_ONLY
 - READ_WRITE
 - SHUTDOWN
 - REPLICASET_ADD
 - REPLICASET_REMOVE
 - REPLICA_CONNECTION_ERROR

Introduce new configuration option: 'on_ctl_event' with type
function and one one argument - context.
For every events context has required field 'type', one of
constants mentioned above.
For REPLICASET_ADD, REPLICASET_REMOVE and REPLICA_CONNECTION_ERROR
an additional field 'replica_id' is also available.

Usage example. In Lua script a callback function with actions on
specific controlled events must be defined, e.g:
local function onctl(ctx)
    if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
        -- specific action #1
    elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
        -- specific action #2
    ...
    end
end

Then this function passed as a parameter to box.cfg:
on_ctl_event = onctl

Closes #3159

Changes in V2:
Squashed 3 to 1
Small fixes: remove unused headers, rename function.
Design changes:
- remove fiber for ro condition checks.
- remove support for 'table' type callback.
- insert completion of recovery of all system spaces trigger
  right after sys spaces was recovered.
- add documentation.

Changes in V3:
- instead of all system spaces recovery, set trigger on each
  system space creation.

---
Ticket: https://github.com/tarantool/tarantool/issues/3159
Branch: kbelyavs/gh-3159-box-on-ctl-event
 src/box/CMakeLists.txt             |   1 +
 src/box/alter.cc                   |   5 +
 src/box/box.cc                     |  26 +++-
 src/box/box.h                      |   1 +
 src/box/ctl.c                      |  76 +++++++++++
 src/box/ctl.h                      |  81 ++++++++++++
 src/box/engine.c                   |   3 +
 src/box/lua/cfg.cc                 |  12 ++
 src/box/lua/ctl.c                  |  54 ++++++++
 src/box/lua/ctl.h                  |   2 +
 src/box/lua/load_cfg.lua           |  10 +-
 src/box/memtx_engine.c             |   1 +
 src/box/relay.cc                   |   7 +
 src/box/replication.cc             |  14 ++
 src/cfg.c                          |  17 +++
 src/cfg.h                          |   4 +
 test/replication/master_onctl.lua  |  37 ++++++
 test/replication/onctl.result      | 259 +++++++++++++++++++++++++++++++++++++
 test/replication/onctl.test.lua    | 107 +++++++++++++++
 test/replication/replica_onctl.lua |  34 +++++
 test/replication/suite.cfg         |   1 +
 21 files changed, 748 insertions(+), 4 deletions(-)
 create mode 100644 src/box/ctl.c
 create mode 100644 src/box/ctl.h
 create mode 100644 test/replication/master_onctl.lua
 create mode 100644 test/replication/onctl.result
 create mode 100644 test/replication/onctl.test.lua
 create mode 100644 test/replication/replica_onctl.lua

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index ad544270b..49e387fde 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -112,6 +112,7 @@ add_library(box STATIC
     journal.c
     wal.c
     call.c
+    ctl.c
     ${lua_sources}
     lua/init.c
     lua/call.c
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 3007a131d..d955bfc03 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -52,6 +52,7 @@
 #include "identifier.h"
 #include "version.h"
 #include "sequence.h"
+#include "ctl.h"
 
 /**
  * chap-sha1 of empty string, i.e.
@@ -1622,6 +1623,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		struct trigger *on_rollback =
 			txn_alter_trigger_new(on_create_space_rollback, space);
 		txn_on_rollback(txn, on_rollback);
+		if (space->def->id < BOX_SYSTEM_ID_MAX)
+			on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE);
 	} else if (new_tuple == NULL) { /* DELETE */
 		access_check_ddl(old_space->def->name, old_space->def->uid,
 				 SC_SPACE, PRIV_D, true);
@@ -1719,6 +1722,8 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
 		(void) new UpdateSchemaVersion(alter);
 		alter_space_do(txn, alter);
 		alter_guard.is_active = false;
+		if (alter->new_space->def->id < BOX_SYSTEM_ID_MAX)
+			on_ctl_event_type(CTL_EVENT_SYSTEM_SPACE_CREATE);
 	}
 }
 
diff --git a/src/box/box.cc b/src/box/box.cc
index ee12d5738..9d195322c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -73,6 +73,7 @@
 #include "call.h"
 #include "func.h"
 #include "sequence.h"
+#include "ctl.h"
 
 static char status[64] = "unknown";
 
@@ -209,11 +210,22 @@ process_nop(struct request *request)
 	return txn_commit_stmt(txn, request);
 }
 
+static void
+on_ro_cond_change(void)
+{
+	on_ctl_event_type(box_is_ro() ? CTL_EVENT_READ_ONLY:
+			  CTL_EVENT_READ_WRITE);
+	fiber_cond_broadcast(&ro_cond);
+}
+
 void
 box_set_ro(bool ro)
 {
+	if (is_ro == ro)
+		return; /* nothing to do */
+
 	is_ro = ro;
-	fiber_cond_broadcast(&ro_cond);
+	on_ro_cond_change();
 }
 
 bool
@@ -244,7 +256,7 @@ box_clear_orphan(void)
 		return; /* nothing to do */
 
 	is_orphan = false;
-	fiber_cond_broadcast(&ro_cond);
+	on_ro_cond_change();
 
 	/* Update the title to reflect the new status. */
 	title("running");
@@ -840,6 +852,13 @@ box_set_net_msg_max(void)
 				IPROTO_FIBER_POOL_SIZE_FACTOR);
 }
 
+void
+box_set_on_ctl_event(void)
+{
+	if (cfg_reset_on_ctl_event() < 0)
+		diag_raise();
+}
+
 /* }}} configuration bindings */
 
 /**
@@ -1592,6 +1611,7 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 void
 box_free(void)
 {
+	on_ctl_event_type(CTL_EVENT_SHUTDOWN);
 	/*
 	 * See gh-584 "box_free() is called even if box is not
 	 * initialized
@@ -1932,7 +1952,6 @@ void
 box_init(void)
 {
 	fiber_cond_create(&ro_cond);
-
 	user_cache_init();
 	/*
 	 * The order is important: to initialize sessions,
@@ -1989,6 +2008,7 @@ box_cfg_xc(void)
 	box_set_replication_connect_timeout();
 	box_set_replication_connect_quorum();
 	box_set_replication_skip_conflict();
+	box_set_on_ctl_event();
 	replication_sync_lag = box_check_replication_sync_lag();
 	xstream_create(&join_stream, apply_initial_join_row);
 	xstream_create(&subscribe_stream, apply_row);
diff --git a/src/box/box.h b/src/box/box.h
index e2e06d977..3e55ee6c4 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -193,6 +193,7 @@ void box_set_replication_connect_timeout(void);
 void box_set_replication_connect_quorum(void);
 void box_set_replication_skip_conflict(void);
 void box_set_net_msg_max(void);
+void box_set_on_ctl_event(void);
 
 extern "C" {
 #endif /* defined(__cplusplus) */
diff --git a/src/box/ctl.c b/src/box/ctl.c
new file mode 100644
index 000000000..d3a2077ad
--- /dev/null
+++ b/src/box/ctl.c
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <lib/small/small/rlist.h>
+#include <trigger.h>
+#include <box/ctl.h>
+#include "errcode.h"
+#include "error.h"
+#include <exception.h>
+
+RLIST_HEAD(on_ctl_event);
+
+const char* ctltype2str[CTL_LAST_POS_GUARD] = {
+	"system space create",   // CTL_EVENT_SYSTEM_SPACE_CREATE
+	"local recovery",	 // CTL_EVENT_LOCAL_RECOVERY
+	"read only",		 // CTL_EVENT_READ_ONLY
+	"read write",		 // CTL_EVENT_READ_WRITE
+	"shutdown",		 // CTL_EVENT_SHUTDOWN
+	"replicaset add",	 // CTL_EVENT_REPLICASET_ADD
+	"replicaset remove",	 // CTL_EVENT_REPLICASET_REMOVE
+	"replica connect error", // CTL_EVENT_REPLICA_CONNECTION_ERROR
+};
+
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result) {
+	return trigger_run(&on_ctl_event, (void *) result);
+}
+
+void
+on_ctl_event_type(enum ctl_event_type type)
+{
+	struct on_ctl_event_ctx ctx = {};
+	ctx.type = type;
+	if (run_on_ctl_event_triggers(&ctx) < 0)
+		say_error("ctl_trigger error in %s: %s", ctltype2str[type],
+                          diag_last_error(diag_get())->errmsg);
+}
+
+int
+cfg_reset_on_ctl_event()
+{
+	if (cfg_reset_trigger("on_ctl_event", &on_ctl_event,
+		lbox_push_on_ctl_event, NULL) < 0) {
+		diag_set(ClientError, ER_CFG, "on_ctl_event",
+			 "expected function or table");
+		return -1;
+	}
+	return 0;
+}
diff --git a/src/box/ctl.h b/src/box/ctl.h
new file mode 100644
index 000000000..751ba0149
--- /dev/null
+++ b/src/box/ctl.h
@@ -0,0 +1,81 @@
+#ifndef INCLUDES_TARANTOOL_CTL_H
+#define INCLUDES_TARANTOOL_CTL_H
+
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <cfg.h>
+#include <box/lua/ctl.h>
+
+/** Global on-ctl_event triggers. */
+extern struct rlist on_ctl_event;
+
+enum ctl_event_type {
+	CTL_EVENT_SYSTEM_SPACE_CREATE,
+	CTL_EVENT_LOCAL_RECOVERY,
+	CTL_EVENT_READ_ONLY,
+	CTL_EVENT_READ_WRITE,
+	CTL_EVENT_SHUTDOWN,
+	CTL_EVENT_REPLICASET_ADD,
+	CTL_EVENT_REPLICASET_REMOVE,
+	CTL_EVENT_REPLICA_CONNECTION_ERROR,
+	CTL_LAST_POS_GUARD,
+};
+
+struct on_ctl_event_ctx {
+	enum ctl_event_type type;
+	uint32_t replica_id;
+};
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+/**
+ * Runs on_ctl_event triggers with specified context.
+ */
+int
+run_on_ctl_event_triggers(const struct on_ctl_event_ctx *result);
+
+/**
+ * Runs on_ctl_event trigger with specified type and
+ * log error if any.
+ */
+void
+on_ctl_event_type(enum ctl_event_type type);
+
+int
+cfg_reset_on_ctl_event();
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* INCLUDES_TARANTOOL_LUA_CTL_H */
diff --git a/src/box/engine.c b/src/box/engine.c
index 2a30dcddd..7f5082b69 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -29,6 +29,7 @@
  * SUCH DAMAGE.
  */
 #include "engine.h"
+#include "ctl.h"
 
 #include <stdint.h>
 #include <string.h>
@@ -73,6 +74,7 @@ engine_bootstrap(void)
 		if (engine->vtab->bootstrap(engine) != 0)
 			return -1;
 	}
+	on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY);
 	return 0;
 }
 
@@ -111,6 +113,7 @@ engine_end_recovery(void)
 		if (engine->vtab->end_recovery(engine) != 0)
 			return -1;
 	}
+	on_ctl_event_type(CTL_EVENT_LOCAL_RECOVERY);
 	return 0;
 }
 
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 0f6b8a5a3..baf4a99d5 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -252,6 +252,17 @@ lbox_cfg_set_net_msg_max(struct lua_State *L)
 	return 0;
 }
 
+static int
+lbox_cfg_set_on_ctl_event(struct lua_State *L)
+{
+	try {
+		box_set_on_ctl_event();
+	} catch (Exception *) {
+		luaT_error(L);
+	}
+	return 0;
+}
+
 static int
 lbox_cfg_set_worker_pool_threads(struct lua_State *L)
 {
@@ -330,6 +341,7 @@ box_lua_cfg_init(struct lua_State *L)
 		{"cfg_set_replication_skip_conflict", lbox_cfg_set_replication_skip_conflict},
 		{"cfg_set_replication_connect_timeout", lbox_cfg_set_replication_connect_timeout},
 		{"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max},
+		{"cfg_set_on_ctl_event", lbox_cfg_set_on_ctl_event},
 		{NULL, NULL}
 	};
 
diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 9a105ed5c..5b78875d5 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -35,6 +35,8 @@
 #include <lua.h>
 #include <lauxlib.h>
 #include <lualib.h>
+#include <lua/trigger.h>
+#include <box/ctl.h>
 
 #include "lua/utils.h"
 
@@ -64,9 +66,38 @@ lbox_ctl_wait_rw(struct lua_State *L)
 	return 0;
 }
 
+
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event)
+{
+	struct on_ctl_event_ctx *ctx = (struct on_ctl_event_ctx *) event;
+	lua_newtable(L);
+	lua_pushstring(L, "type");
+	lua_pushinteger(L, ctx->type);
+	lua_settable(L, -3);
+
+	if (ctx->type == CTL_EVENT_REPLICASET_ADD ||
+		ctx->type == CTL_EVENT_REPLICASET_REMOVE ||
+		ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) {
+		lua_pushstring(L, "replica_id");
+		luaL_pushuint64(L, ctx->replica_id);
+		lua_settable(L, -3);
+	}
+	return 1;
+}
+
+static int
+lbox_on_ctl_event(struct lua_State *L)
+{
+	return lbox_trigger_reset(L, 2, &on_ctl_event,
+				  lbox_push_on_ctl_event, NULL);
+}
+
+
 static const struct luaL_Reg lbox_ctl_lib[] = {
 	{"wait_ro", lbox_ctl_wait_ro},
 	{"wait_rw", lbox_ctl_wait_rw},
+	{"on_ctl_event", lbox_on_ctl_event},
 	{NULL, NULL}
 };
 
@@ -75,4 +106,27 @@ box_lua_ctl_init(struct lua_State *L)
 {
 	luaL_register_module(L, "box.ctl", lbox_ctl_lib);
 	lua_pop(L, 1);
+
+	luaL_findtable(L, LUA_GLOBALSINDEX, "box.ctl", 1);
+	lua_newtable(L);
+	lua_setfield(L, -2, "event");
+	lua_getfield(L, -1, "event");
+
+	lua_pushnumber(L, CTL_EVENT_SYSTEM_SPACE_CREATE);
+	lua_setfield(L, -2, "SYSTEM_SPACE_CREATE");
+	lua_pushnumber(L, CTL_EVENT_LOCAL_RECOVERY);
+	lua_setfield(L, -2, "LOCAL_RECOVERY");
+	lua_pushnumber(L, CTL_EVENT_READ_ONLY);
+	lua_setfield(L, -2, "READ_ONLY");
+	lua_pushnumber(L, CTL_EVENT_READ_WRITE);
+	lua_setfield(L, -2, "READ_WRITE");
+	lua_pushnumber(L, CTL_EVENT_SHUTDOWN);
+	lua_setfield(L, -2, "SHUTDOWN");
+	lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD);
+	lua_setfield(L, -2, "REPLICASET_ADD");
+	lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
+	lua_setfield(L, -2, "REPLICASET_REMOVE");
+	lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR);
+	lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR");
+	lua_pop(L, 2); /* box, ctl */
 }
diff --git a/src/box/lua/ctl.h b/src/box/lua/ctl.h
index e7c2edd15..ab63232dd 100644
--- a/src/box/lua/ctl.h
+++ b/src/box/lua/ctl.h
@@ -41,6 +41,8 @@ struct lua_State;
 void
 box_lua_ctl_init(struct lua_State *L);
 
+int
+lbox_push_on_ctl_event(struct lua_State *L, void *event);
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 0b668cdc6..035615f4b 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -1,7 +1,11 @@
 -- load_cfg.lua - internal file
 
 local log = require('log')
-local json = require('json')
+local json = require("json").new()
+json.cfg{
+    encode_use_tostring    = true,
+}
+
 local private = require('box.internal')
 local urilib = require('uri')
 local math = require('math')
@@ -64,6 +68,7 @@ local default_cfg = {
     feedback_host         = "https://feedback.tarantool.io",
     feedback_interval     = 3600,
     net_msg_max           = 768,
+    on_ctl_event          = nil,
 }
 
 -- types of available options
@@ -125,6 +130,7 @@ local template_cfg = {
     feedback_host         = 'string',
     feedback_interval     = 'number',
     net_msg_max           = 'number',
+    on_ctl_event          = 'function',
 }
 
 local function normalize_uri(port)
@@ -216,6 +222,7 @@ local dynamic_cfg = {
     replicaset_uuid         = check_replicaset_uuid,
     replication_skip_conflict = private.cfg_set_replication_skip_conflict,
     net_msg_max             = private.cfg_set_net_msg_max,
+    on_ctl_event            = private.cfg_set_on_ctl_event,
 }
 
 local dynamic_cfg_skip_at_load = {
@@ -232,6 +239,7 @@ local dynamic_cfg_skip_at_load = {
     force_recovery          = true,
     instance_uuid           = true,
     replicaset_uuid         = true,
+    on_ctl_event            = true,
 }
 
 local function convert_gb(size)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index f5ace9268..400fed95b 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
 #include "replication.h"
 #include "schema.h"
 #include "gc.h"
+#include "ctl.h"
 
 /*
  * Memtx yield-in-transaction trigger: roll back the effects
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 05468f203..19b3f3b7a 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -52,6 +52,7 @@
 #include "xrow_io.h"
 #include "xstream.h"
 #include "wal.h"
+#include "ctl.h"
 
 /**
  * Cbus message to send status updates from relay to tx thread.
@@ -551,6 +552,12 @@ relay_subscribe_f(va_list ap)
 	if (!diag_is_empty(&relay->diag)) {
 		/* An error has occurred while reading ACKs of xlog. */
 		diag_move(&relay->diag, diag_get());
+		struct on_ctl_event_ctx ctx;
+		ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR;
+		ctx.replica_id = relay->replica->id;
+		if (run_on_ctl_event_triggers(&ctx) < 0)
+			say_error("ctl_trigger error in replica error: %s",
+				  diag_last_error(diag_get())->errmsg);
 		/* Reference the diag in the status. */
 		diag_add_error(&relay->diag, diag_last_error(diag_get()));
 	}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 26bbbe32a..4ee8ae5a3 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -42,6 +42,7 @@
 #include "relay.h"
 #include "vclock.h" /* VCLOCK_MAX */
 #include "sio.h"
+#include "ctl.h"
 
 uint32_t instance_id = REPLICA_ID_NIL;
 struct tt_uuid INSTANCE_UUID;
@@ -173,6 +174,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
 	replica->uuid = *replica_uuid;
 	replica_hash_insert(&replicaset.hash, replica);
 	replica_set_id(replica, replica_id);
+	struct on_ctl_event_ctx on_ctl_ctx;
+	on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+	on_ctl_ctx.replica_id = replica_id;
+	if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+		say_error("ctl_trigger error in replica add: %s",
+			  diag_last_error(diag_get())->errmsg);
 	return replica;
 }
 
@@ -204,6 +211,10 @@ replica_clear_id(struct replica *replica)
 	 * Some records may arrive later on due to asynchronous nature of
 	 * replication.
 	 */
+	struct on_ctl_event_ctx on_ctl_ctx;
+	on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+	on_ctl_ctx.replica_id = replica->id;
+
 	replicaset.replica_by_id[replica->id] = NULL;
 	replica->id = REPLICA_ID_NIL;
 	/*
@@ -223,6 +234,9 @@ replica_clear_id(struct replica *replica)
 		replica_hash_remove(&replicaset.hash, replica);
 		replica_delete(replica);
 	}
+	if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+		say_error("ctl_trigger error in replica remove: %s",
+			  diag_last_error(diag_get())->errmsg);
 }
 
 static void
diff --git a/src/cfg.c b/src/cfg.c
index 7c7d6e793..9dfcae162 100644
--- a/src/cfg.c
+++ b/src/cfg.c
@@ -153,3 +153,20 @@ cfg_getarr_elem(const char *name, int i)
 	lua_pop(tarantool_L, 2);
 	return val;
 }
+
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+		  lbox_push_event_f push_event, lbox_pop_event_f pop_event)
+{
+	cfg_get(name);
+	struct lua_State *L = tarantool_L;
+	if (lua_isnil(L, -1))
+		return 0;
+	if (!lua_isfunction(L, -1))
+		return -1;
+	lua_pushnil(L);
+	int rc = lbox_trigger_reset(L, lua_gettop(L), list,
+				    push_event, pop_event);
+	lua_pop(L, 1);
+	return rc;
+}
diff --git a/src/cfg.h b/src/cfg.h
index 8499388b8..d36465c49 100644
--- a/src/cfg.h
+++ b/src/cfg.h
@@ -32,6 +32,7 @@
  */
 
 #include <stdint.h>
+#include <lua/trigger.h>
 
 #if defined(__cplusplus)
 extern "C" {
@@ -61,6 +62,9 @@ cfg_getarr_size(const char *name);
 const char *
 cfg_getarr_elem(const char *name, int i);
 
+int
+cfg_reset_trigger(const char *name, struct rlist *list,
+		  lbox_push_event_f push_event, lbox_pop_event_f pop_event);
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
new file mode 100644
index 000000000..49f320d3e
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,37 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_CREATE = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+REPLICA_CONNECTION_ERROR = {}
+
+local function onctl(ctx)
+    if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then
+        SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1
+    elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+        LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.READ_ONLY then
+        READ_ONLY = READ_ONLY + 1
+    elseif ctx.type == box.ctl.event.READ_WRITE then
+        READ_WRITE = READ_WRITE + 1
+    elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+        table.insert(REPLICASET_ADD, ctx.replica_id)
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        table.insert(REPLICASET_REMOVE, ctx.replica_id)
+    elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then
+        table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id)
+    end
+end
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    memtx_memory        = 107374182,
+    replication_connect_timeout = 0.5,
+    on_ctl_event        = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 000000000..7b7208464
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,259 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+---
+- - 1
+  - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+    if ctx.type == box.ctl.event.REPLICASET_ADD then
+        new_replica_id = ctx.replica_id
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        deleted_replica_id = ctx.replica_id
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+    if ctx.type == box.ctl.event.SHUTDOWN then
+        require("log").info("test replica shutdown")
+    end
+end;
+---
+...
+function on_ctl_error(ctx)
+    error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 2
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+test_run:cmd("switch master")
+---
+- true
+...
+REPLICA_CONNECTION_ERROR
+---
+- - 2
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_CREATE
+---
+- 18
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 1
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 000000000..00f2d0109
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,107 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+    if ctx.type == box.ctl.event.REPLICASET_ADD then
+        new_replica_id = ctx.replica_id
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        deleted_replica_id = ctx.replica_id
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+    if ctx.type == box.ctl.event.SHUTDOWN then
+        require("log").info("test replica shutdown")
+    end
+end;
+
+function on_ctl_error(ctx)
+    error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, wait for pull request on grep_log to grep logs of killed replica.
+-- test_run:grep_log('replica', 'test replica shutdown', 10000, true)
+
+test_run:cmd("switch master")
+REPLICA_CONNECTION_ERROR
+
+box.schema.user.revoke('guest', 'replication')
+_ = box.space._cluster:delete{2}
+
+SYSTEM_SPACE_CREATE
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+box.ctl.on_ctl_event(nil, on_ctl_new)
+
+-- cleanup
+test_run:cmd("switch default")
+test_run:cmd("stop server master")
+test_run:cmd("cleanup server master")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua
new file mode 100644
index 000000000..8228762a3
--- /dev/null
+++ b/test/replication/replica_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+
+SYSTEM_SPACE_CREATE = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+    if ctx.type == box.ctl.event.SYSTEM_SPACE_CREATE then
+        SYSTEM_SPACE_CREATE = SYSTEM_SPACE_CREATE + 1
+    elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+        LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.READ_ONLY then
+        READ_ONLY = READ_ONLY + 1
+    elseif ctx.type == box.ctl.event.READ_WRITE then
+        READ_WRITE = READ_WRITE + 1
+    elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+        table.insert(REPLICASET_ADD, ctx.replica_id)
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        table.insert(REPLICASET_REMOVE, ctx.replica_id)
+    end
+end
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    replication         = os.getenv("MASTER"),
+    memtx_memory        = 107374182,
+    replication_connect_timeout = 0.5,
+    on_ctl_event        = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5a2..365a82512 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
     "wal_off.test.lua": {},
     "hot_standby.test.lua": {},
     "rebootstrap.test.lua": {},
+    "onctl.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.14.3 (Apple Git-98)





More information about the Tarantool-patches mailing list