#include "lua/utils.h"
@@ -77,7 +78,8 @@ lbox_push_on_ctl_event(struct lua_State *L, void *event)
lua_settable(L, -3);
if (ctx->type == CTL_EVENT_REPLICASET_ADD ||
- ctx->type == CTL_EVENT_REPLICASET_REMOVE) {
+ ctx->type == CTL_EVENT_REPLICASET_REMOVE ||
+ ctx->type == CTL_EVENT_REPLICA_CONNECTION_ERROR) {
lua_pushstring(L, "replica_id");
luaL_pushuint64(L, ctx->replica_id);
lua_settable(L, -3);
@@ -125,5 +127,7 @@ box_lua_ctl_init(struct lua_State *L)
lua_setfield(L, -2, "REPLICASET_ADD");
lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
lua_setfield(L, -2, "REPLICASET_REMOVE");
+ lua_pushnumber(L, CTL_EVENT_REPLICA_CONNECTION_ERROR);
+ lua_setfield(L, -2, "REPLICA_CONNECTION_ERROR");
lua_pop(L, 2); /* box, ctl */
}
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a25cc54..d535f83 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -52,6 +52,7 @@
#include "xrow_io.h"
#include "xstream.h"
#include "wal.h"
+#include "ctl.h"
/**
* Cbus message to send status updates from relay to tx thread.
@@ -548,6 +549,12 @@ relay_subscribe_f(va_list ap)
if (!diag_is_empty(&relay->diag)) {
/* An error has occurred while reading ACKs of xlog. */
diag_move(&relay->diag, diag_get());
+ struct on_ctl_event_ctx ctx;
+ ctx.type = CTL_EVENT_REPLICA_CONNECTION_ERROR;
+ ctx.replica_id = relay->replica->id;
+ if (run_on_ctl_event_triggers(&ctx) < 0)
+ say_error("ctl_trigger error in replica error: %s",
+ diag_last_error(diag_get())->errmsg);
/* Reference the diag in the status. */
diag_add_error(&relay->diag, diag_last_error(diag_get()));
}
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
index e0eb39a..355e791 100644
--- a/test/replication/master_onctl.lua
+++ b/test/replication/master_onctl.lua
@@ -7,6 +7,7 @@ READ_ONLY = 0
READ_WRITE = 0
REPLICASET_ADD = {}
REPLICASET_REMOVE = {}
+REPLICA_CONNECTION_ERROR = {}
local function onctl(ctx)
if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
@@ -21,6 +22,8 @@ local function onctl(ctx)
table.insert(REPLICASET_ADD, ctx.replica_id)
elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICA_CONNECTION_ERROR then
+ table.insert(REPLICA_CONNECTION_ERROR, ctx.replica_id)
end
end
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
index 19b3e67..24a6683 100644
--- a/test/replication/onctl.result
+++ b/test/replication/onctl.result
@@ -50,6 +50,10 @@ REPLICASET_REMOVE
---
- - 1
...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
REPLICASET_ADD = {}
---
...
@@ -94,6 +98,10 @@ REPLICASET_REMOVE
---
- []
...
+REPLICA_CONNECTION_ERROR
+---
+- []
+...
new_replica_id
---
- 2
@@ -186,6 +194,10 @@ test_run:cmd("switch master")
---
- true
...
+REPLICA_CONNECTION_ERROR
+---
+- - 2
+...
box.schema.user.revoke('guest', 'replication')
---
...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
index ff6a898..352b118 100644
--- a/test/replication/onctl.test.lua
+++ b/test/replication/onctl.test.lua
@@ -16,6 +16,7 @@ READ_WRITE
REPLICASET_ADD
-- must be one entry. Deletion of initial tuple in _cluster space.
REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
REPLICASET_ADD = {}
REPLICASET_REMOVE = {}
@@ -39,6 +40,7 @@ test_run:cmd("start server replica")
REPLICASET_ADD
REPLICASET_REMOVE
+REPLICA_CONNECTION_ERROR
new_replica_id
deleted_replica_id
@@ -80,8 +82,9 @@ test_run:cmd("restart server replica")
-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
--test_run:grep_log('replica', 'test replica shutdown')
-
test_run:cmd("switch master")
+REPLICA_CONNECTION_ERROR
+
box.schema.user.revoke('guest', 'replication')
_ = box.space._cluster:delete{2}
--
2.7.4