static void
txn_on_yield_or_stop(struct trigger *trigger, void *event)
@@ -197,7 +198,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
*/
if (!xlog_cursor_is_eof(&cursor))
panic("snapshot `%s' has no EOF marker", filename);
-
return 0;
}
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e1769..75aecd0 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -41,6 +41,7 @@
#include "error.h"
#include "relay.h"
#include "vclock.h" /* VCLOCK_MAX */
+#include "ctl.h"
uint32_t instance_id = REPLICA_ID_NIL;
struct tt_uuid INSTANCE_UUID;
@@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
replica->uuid = *replica_uuid;
replica_hash_insert(&replicaset.hash, replica);
replica_set_id(replica, replica_id);
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+ on_ctl_ctx.replica_id = replica_id;
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica add: %s",
+ diag_last_error(diag_get())->errmsg);
return replica;
}
@@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica)
* Some records may arrive later on due to asynchronous nature of
* replication.
*/
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+ on_ctl_ctx.replica_id = replica->id;
+
replicaset.replica_by_id[replica->id] = NULL;
replica->id = REPLICA_ID_NIL;
if (replica_is_orphan(replica)) {
replica_hash_remove(&replicaset.hash, replica);
replica_delete(replica);
}
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica remove: %s",
+ diag_last_error(diag_get())->errmsg);
}
static void
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
new file mode 100644
index 0000000..e0eb39a
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 0000000..19b3e67
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,250 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+---
+- - 1
+ - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+---
+...
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 1
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+---
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown')
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 0000000..ff6a898
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,105 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown')