Hi,
overall patch looks good with minor conplains (summary)
- unused headers
- is it worth it to create a separate fiber to check if read-only condition was changed?
- test SHUTDOWN should be implemented



Четверг, 14 июня 2018, 18:04 +03:00 от Ilya Markov <imarkov@tarantool.org>:

Add following cases of triggers:
* System space recovery. Called on finish of bootstrap or finish of join or
snapshot recovery.
* Local recovery. Called on finish of bootstrap or finish of recovery.
* Read_only/read_write. Called on changes of read_only state of
instance.
* Shutdown. Called on controlled shutdown.
* Replicaset_add/replicaset_remove. Called on changes in space _cluster.

Errors inside triggers are logged and don't influence on instance
behaviour.

Continue #3159
---
 src/box/alter.cc | 1 +
 src/box/box.cc | 44 ++++++-
 src/box/engine.c | 14 +++
 src/box/lua/ctl.c | 4 +-
 src/box/memtx_engine.c | 2 +-
 src/box/replication.cc | 14 +++
 test/replication/master_onctl.lua | 34 +++++
 test/replication/onctl.result | 250 +++++++++++++++++++++++++++++++++++++
 test/replication/onctl.test.lua | 105 ++++++++++++++++
 test/replication/replica_onctl.lua | 34 +++++
 test/replication/suite.cfg | 1 +
 11 files changed, 498 insertions(+), 5 deletions(-)
 create mode 100644 test/replication/master_onctl.lua
 create mode 100644 test/replication/onctl.result
 create mode 100644 test/replication/onctl.test.lua
 create mode 100644 test/replication/replica_onctl.lua

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 6f6fcb0..7ec548b 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -52,6 +52,7 @@
 #include "identifier.h"
 #include "version.h"
 #include "sequence.h"


+#include "ctl.h"
Not used.


 /**
  * chap-sha1 of empty string, i.e.
diff --git a/src/box/box.cc b/src/box/box.cc
index 26277e7..1de37d2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -113,6 +113,11 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan = true;

+/**
+ * Fiber used for on_ctl_event trigger.
+ */
+static fiber *ro_checker;
+
 /* Use the shared instance of xstream for all appliers */
 static struct xstream join_stream;
 static struct xstream subscribe_stream;
@@ -1573,6 +1578,9 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
 void
 box_free(void)
 {
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0)
+ say_error("ctl_trigger error in shutdown: %s",
+ diag_last_error(diag_get())->errmsg);
  /*
  * See gh-584 "box_free() is called even if box is not
  * initialized
@@ -1592,7 +1600,8 @@ box_free(void)
  engine_shutdown();
  wal_thread_stop();
  }
-
+ if (!fiber_is_dead(ro_checker))
+ fiber_cancel(ro_checker);
  fiber_cond_destroy(&ro_cond);
 }

@@ -1693,6 +1702,10 @@ bootstrap_from_master(struct replica *master)
  engine_begin_initial_recovery_xc(NULL);
  applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY);

+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
  /*
  * Process final data (WALs).
  */
@@ -1755,11 +1768,35 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
  cbus_process(endpoint);
 }

+static int
+check_ro_f(MAYBE_UNUSED va_list ap)
+{
+ double timeout = TIMEOUT_INFINITY;
+ struct on_ctl_event_ctx ctx;
+ memset(&ctx, 0, sizeof(ctx));
+ while (true) {
+ if (box_wait_ro(!box_is_ro(), timeout) != 0) {
+ if (fiber_is_cancelled())
+ break;
+ else
+ return -1;
+ }
+ if (run_on_ctl_event_trigger_type(
+ box_is_ro() ? CTL_EVENT_READ_ONLY:
+ CTL_EVENT_READ_WRITE) < 0)
+ say_error("ctl_trigger error in %s: %s",
+ box_is_ro() ? "read_only" :"read_write",
+ diag_last_error(diag_get())->errmsg);
+ }
+ return 0;
+}
+
Is it worth it to create a separate fiber? May be check it near or inside fiber_cond_broadcast(&ro_cond)?


 void
 box_init(void)
 {
  fiber_cond_create(&ro_cond);
-
+ ro_checker = fiber_new_xc("check_ro", check_ro_f);
+ fiber_start(ro_checker, NULL);
  user_cache_init();
  /*
  * The order is important: to initialize sessions,
@@ -1885,6 +1922,9 @@ box_cfg_xc(void)
  */
  memtx_engine_recover_snapshot_xc(memtx,
  &last_checkpoint_vclock);
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);

  engine_begin_final_recovery_xc();
  recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
diff --git a/src/box/engine.c b/src/box/engine.c
index 82293fd..fa78753 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -29,10 +29,12 @@
  * SUCH DAMAGE.
  */
 #include "engine.h"
+#include "ctl.h"

 #include <stdint.h>
 #include <string.h>
 #include <small/rlist.h>


+#include <fiber.h>
Not used.

 RLIST_HEAD(engines);

@@ -73,6 +75,14 @@ engine_bootstrap(void)
  if (engine->vtab->bootstrap(engine) != 0)
  return -1;
  }
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+ say_error("ctl_trigger error in system space recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+ say_error("ctl_trigger error in local recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
  return 0;
 }

@@ -111,6 +121,10 @@ engine_end_recovery(void)
  if (engine->vtab->end_recovery(engine) != 0)
  return -1;
  }
+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+ say_error("ctl_trigger error in local recovery: %s",
+ diag_last_error(diag_get())->errmsg);
+
  return 0;
 }

diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 52f320a..5bd9be3 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -122,8 +122,8 @@ box_lua_ctl_init(struct lua_State *L)
  lua_pushnumber(L, CTL_EVENT_SHUTDOWN);
  lua_setfield(L, -2, "SHUTDOWN");
  lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD);
- lua_setfield(L, -2, "CTL_EVENT_REPLICASET_ADD");
+ lua_setfield(L, -2, "REPLICASET_ADD");
  lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
- lua_setfield(L, -2, "CTL_EVENT_REPLICASET_REMOVE");
+ lua_setfield(L, -2, "REPLICASET_REMOVE");
  lua_pop(L, 2); /* box, ctl */
 }
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index fac84ce..e737ea3 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
 #include "replication.h"
 #include "schema.h"
 #include "gc.h"


+#include "ctl.h"
Also not used.

 static void
 txn_on_yield_or_stop(struct trigger *trigger, void *event)
@@ -197,7 +198,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
  */
  if (!xlog_cursor_is_eof(&cursor))
  panic("snapshot `%s' has no EOF marker", filename);
-
  return 0;
 }

diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e1769..75aecd0 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -41,6 +41,7 @@
 #include "error.h"
 #include "relay.h"
 #include "vclock.h" /* VCLOCK_MAX */
+#include "ctl.h"

 uint32_t instance_id = REPLICA_ID_NIL;
 struct tt_uuid INSTANCE_UUID;
@@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid)
  replica->uuid = *replica_uuid;
  replica_hash_insert(&replicaset.hash, replica);
  replica_set_id(replica, replica_id);
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+ on_ctl_ctx.replica_id = replica_id;
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica add: %s",
+ diag_last_error(diag_get())->errmsg);
  return replica;
 }

@@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica)
  * Some records may arrive later on due to asynchronous nature of
  * replication.
  */
+ struct on_ctl_event_ctx on_ctl_ctx;
+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+ on_ctl_ctx.replica_id = replica->id;
+
  replicaset.replica_by_id[replica->id] = NULL;
  replica->id = REPLICA_ID_NIL;
  if (replica_is_orphan(replica)) {
  replica_hash_remove(&replicaset.hash, replica);
  replica_delete(replica);
  }
+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+ say_error("ctl_trigger error in replica remove: %s",
+ diag_last_error(diag_get())->errmsg);
 }

 static void
diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua
new file mode 100644
index 0000000..e0eb39a
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 0000000..19b3e67
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,250 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+---
+- - 1
+ - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+---
+...
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 1
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+---
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown')
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 0000000..ff6a898
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,105 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+ if ctx.type == box.ctl.event.REPLICASET_ADD then
+ new_replica_id = ctx.replica_id
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ deleted_replica_id = ctx.replica_id
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+ if ctx.type == box.ctl.event.SHUTDOWN then
+ require("log").info("test replica shutdown")
+ end
+end;
+
+function on_ctl_error(ctx)
+ error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown')
It should be done

+
+
+test_run:cmd("switch master")
+box.schema.user.revoke('guest', 'replication')
+_ = box.space._cluster:delete{2}
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+box.ctl.on_ctl_event(nil, on_ctl_new)
+
+-- cleanup
+test_run:cmd("switch default")
+test_run:cmd("stop server master")
+test_run:cmd("cleanup server master")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua
new file mode 100644
index 0000000..d6ce73c
--- /dev/null
+++ b/test/replication/replica_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+ elseif ctx.type == box.ctl.event.READ_ONLY then
+ READ_ONLY = READ_ONLY + 1
+ elseif ctx.type == box.ctl.event.READ_WRITE then
+ READ_WRITE = READ_WRITE + 1
+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+ table.insert(REPLICASET_ADD, ctx.replica_id)
+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+ table.insert(REPLICASET_REMOVE, ctx.replica_id)
+ end
+end
+
+box.cfg({
+ listen = os.getenv("LISTEN"),
+ replication = os.getenv("MASTER"),
+ memtx_memory = 107374182,
+ replication_connect_timeout = 0.5,
+ on_ctl_event = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5..365a825 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
     "wal_off.test.lua": {},
     "hot_standby.test.lua": {},
     "rebootstrap.test.lua": {},
+ "onctl.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
--
2.7.4




Best regards,
Konstantin Belyavskiy
k.belyavskiy@tarantool.org