Hi, overall patch looks good with minor conplains (summary) - unused headers - is it worth it to create a separate fiber to check if read-only condition was changed? - test SHUTDOWN should be implemented >Четверг, 14 июня 2018, 18:04 +03:00 от Ilya Markov : > >Add following cases of triggers: >* System space recovery. Called on finish of bootstrap or finish of join or >snapshot recovery. >* Local recovery. Called on finish of bootstrap or finish of recovery. >* Read_only/read_write. Called on changes of read_only state of >instance. >* Shutdown. Called on controlled shutdown. >* Replicaset_add/replicaset_remove. Called on changes in space _cluster. > >Errors inside triggers are logged and don't influence on instance >behaviour. > >Continue #3159 >--- > src/box/alter.cc | 1 + > src/box/box.cc | 44 ++++++- > src/box/engine.c | 14 +++ > src/box/lua/ctl.c | 4 +- > src/box/memtx_engine.c | 2 +- > src/box/replication.cc | 14 +++ > test/replication/master_onctl.lua | 34 +++++ > test/replication/onctl.result | 250 +++++++++++++++++++++++++++++++++++++ > test/replication/onctl.test.lua | 105 ++++++++++++++++ > test/replication/replica_onctl.lua | 34 +++++ > test/replication/suite.cfg | 1 + > 11 files changed, 498 insertions(+), 5 deletions(-) > create mode 100644 test/replication/master_onctl.lua > create mode 100644 test/replication/onctl.result > create mode 100644 test/replication/onctl.test.lua > create mode 100644 test/replication/replica_onctl.lua > >diff --git a/src/box/alter.cc b/src/box/alter.cc >index 6f6fcb0..7ec548b 100644 >--- a/src/box/alter.cc >+++ b/src/box/alter.cc >@@ -52,6 +52,7 @@ > #include "identifier.h" > #include "version.h" > #include "sequence.h" > >+#include "ctl.h" Not used. > > > /** >  * chap-sha1 of empty string, i.e. >diff --git a/src/box/box.cc b/src/box/box.cc >index 26277e7..1de37d2 100644 >--- a/src/box/box.cc >+++ b/src/box/box.cc >@@ -113,6 +113,11 @@ static fiber_cond ro_cond; >  */ > static bool is_orphan = true; > >+/** >+ * Fiber used for on_ctl_event trigger. >+ */ >+static fiber *ro_checker; >+ > /* Use the shared instance of xstream for all appliers */ > static struct xstream join_stream; > static struct xstream subscribe_stream; >@@ -1573,6 +1578,9 @@ box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) > void > box_free(void) > { >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0) >+ say_error("ctl_trigger error in shutdown: %s", >+ diag_last_error(diag_get())->errmsg); >  /* >  * See gh-584 "box_free() is called even if box is not >  * initialized >@@ -1592,7 +1600,8 @@ box_free(void) >  engine_shutdown(); >  wal_thread_stop(); >  } >- >+ if (!fiber_is_dead(ro_checker)) >+ fiber_cancel(ro_checker); >  fiber_cond_destroy(&ro_cond); > } > >@@ -1693,6 +1702,10 @@ bootstrap_from_master(struct replica *master) >  engine_begin_initial_recovery_xc(NULL); >  applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY); > >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) >+ say_error("ctl_trigger error in system space recovery: %s", >+ diag_last_error(diag_get())->errmsg); >+ >  /* >  * Process final data (WALs). >  */ >@@ -1755,11 +1768,35 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events) >  cbus_process(endpoint); > } > >+static int >+check_ro_f(MAYBE_UNUSED va_list ap) >+{ >+ double timeout = TIMEOUT_INFINITY; >+ struct on_ctl_event_ctx ctx; >+ memset(&ctx, 0, sizeof(ctx)); >+ while (true) { >+ if (box_wait_ro(!box_is_ro(), timeout) != 0) { >+ if (fiber_is_cancelled()) >+ break; >+ else >+ return -1; >+ } >+ if (run_on_ctl_event_trigger_type( >+ box_is_ro() ? CTL_EVENT_READ_ONLY: >+ CTL_EVENT_READ_WRITE) < 0) >+ say_error("ctl_trigger error in %s: %s", >+ box_is_ro() ? "read_only" :"read_write", >+ diag_last_error(diag_get())->errmsg); >+ } >+ return 0; >+} >+ Is it worth it to create a separate fiber? May be check it near or inside fiber_cond_broadcast(&ro_cond)? > > void > box_init(void) > { >  fiber_cond_create(&ro_cond); >- >+ ro_checker = fiber_new_xc("check_ro", check_ro_f); >+ fiber_start(ro_checker, NULL); >  user_cache_init(); >  /* >  * The order is important: to initialize sessions, >@@ -1885,6 +1922,9 @@ box_cfg_xc(void) >  */ >  memtx_engine_recover_snapshot_xc(memtx, >  &last_checkpoint_vclock); >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) >+ say_error("ctl_trigger error in system space recovery: %s", >+ diag_last_error(diag_get())->errmsg); > >  engine_begin_final_recovery_xc(); >  recovery_follow_local(recovery, &wal_stream.base, "hot_standby", >diff --git a/src/box/engine.c b/src/box/engine.c >index 82293fd..fa78753 100644 >--- a/src/box/engine.c >+++ b/src/box/engine.c >@@ -29,10 +29,12 @@ >  * SUCH DAMAGE. >  */ > #include "engine.h" >+#include "ctl.h" > > #include > #include > #include > >+#include Not used. > > RLIST_HEAD(engines); > >@@ -73,6 +75,14 @@ engine_bootstrap(void) >  if (engine->vtab->bootstrap(engine) != 0) >  return -1; >  } >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0) >+ say_error("ctl_trigger error in system space recovery: %s", >+ diag_last_error(diag_get())->errmsg); >+ >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) >+ say_error("ctl_trigger error in local recovery: %s", >+ diag_last_error(diag_get())->errmsg); >+ >  return 0; > } > >@@ -111,6 +121,10 @@ engine_end_recovery(void) >  if (engine->vtab->end_recovery(engine) != 0) >  return -1; >  } >+ if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0) >+ say_error("ctl_trigger error in local recovery: %s", >+ diag_last_error(diag_get())->errmsg); >+ >  return 0; > } > >diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c >index 52f320a..5bd9be3 100644 >--- a/src/box/lua/ctl.c >+++ b/src/box/lua/ctl.c >@@ -122,8 +122,8 @@ box_lua_ctl_init(struct lua_State *L) >  lua_pushnumber(L, CTL_EVENT_SHUTDOWN); >  lua_setfield(L, -2, "SHUTDOWN"); >  lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD); >- lua_setfield(L, -2, "CTL_EVENT_REPLICASET_ADD"); >+ lua_setfield(L, -2, "REPLICASET_ADD"); >  lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE); >- lua_setfield(L, -2, "CTL_EVENT_REPLICASET_REMOVE"); >+ lua_setfield(L, -2, "REPLICASET_REMOVE"); >  lua_pop(L, 2); /* box, ctl */ > } >diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c >index fac84ce..e737ea3 100644 >--- a/src/box/memtx_engine.c >+++ b/src/box/memtx_engine.c >@@ -48,6 +48,7 @@ > #include "replication.h" > #include "schema.h" > #include "gc.h" > >+#include "ctl.h" Also not used. > > static void > txn_on_yield_or_stop(struct trigger *trigger, void *event) >@@ -197,7 +198,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx, >  */ >  if (!xlog_cursor_is_eof(&cursor)) >  panic("snapshot `%s' has no EOF marker", filename); >- >  return 0; > } > >diff --git a/src/box/replication.cc b/src/box/replication.cc >index c1e1769..75aecd0 100644 >--- a/src/box/replication.cc >+++ b/src/box/replication.cc >@@ -41,6 +41,7 @@ > #include "error.h" > #include "relay.h" > #include "vclock.h" /* VCLOCK_MAX */ >+#include "ctl.h" > > uint32_t instance_id = REPLICA_ID_NIL; > struct tt_uuid INSTANCE_UUID; >@@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid *replica_uuid) >  replica->uuid = *replica_uuid; >  replica_hash_insert(&replicaset.hash, replica); >  replica_set_id(replica, replica_id); >+ struct on_ctl_event_ctx on_ctl_ctx; >+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD; >+ on_ctl_ctx.replica_id = replica_id; >+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) >+ say_error("ctl_trigger error in replica add: %s", >+ diag_last_error(diag_get())->errmsg); >  return replica; > } > >@@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica) >  * Some records may arrive later on due to asynchronous nature of >  * replication. >  */ >+ struct on_ctl_event_ctx on_ctl_ctx; >+ on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE; >+ on_ctl_ctx.replica_id = replica->id; >+ >  replicaset.replica_by_id[replica->id] = NULL; >  replica->id = REPLICA_ID_NIL; >  if (replica_is_orphan(replica)) { >  replica_hash_remove(&replicaset.hash, replica); >  replica_delete(replica); >  } >+ if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0) >+ say_error("ctl_trigger error in replica remove: %s", >+ diag_last_error(diag_get())->errmsg); > } > > static void >diff --git a/test/replication/master_onctl.lua b/test/replication/master_onctl.lua >new file mode 100644 >index 0000000..e0eb39a >--- /dev/null >+++ b/test/replication/master_onctl.lua >@@ -0,0 +1,34 @@ >+#!/usr/bin/env tarantool >+os = require('os') >+ >+SYSTEM_SPACE_RECOVERY = 0 >+LOCAL_RECOVERY = 0 >+READ_ONLY = 0 >+READ_WRITE = 0 >+REPLICASET_ADD = {} >+REPLICASET_REMOVE = {} >+ >+local function onctl(ctx) >+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then >+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 >+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then >+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1 >+ elseif ctx.type == box.ctl.event.READ_ONLY then >+ READ_ONLY = READ_ONLY + 1 >+ elseif ctx.type == box.ctl.event.READ_WRITE then >+ READ_WRITE = READ_WRITE + 1 >+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then >+ table.insert(REPLICASET_ADD, ctx.replica_id) >+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then >+ table.insert(REPLICASET_REMOVE, ctx.replica_id) >+ end >+end >+ >+box.cfg({ >+ listen = os.getenv("LISTEN"), >+ memtx_memory = 107374182, >+ replication_connect_timeout = 0.5, >+ on_ctl_event = onctl, >+}) >+ >+require('console').listen(os.getenv('ADMIN')) >diff --git a/test/replication/onctl.result b/test/replication/onctl.result >new file mode 100644 >index 0000000..19b3e67 >--- /dev/null >+++ b/test/replication/onctl.result >@@ -0,0 +1,250 @@ >+env = require('test_run') >+--- >+... >+test_run = env.new() >+--- >+... >+test_run:cmd("create server master with script='replication/master_onctl.lua'") >+--- >+- true >+... >+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") >+--- >+- true >+... >+test_run:cmd("start server master") >+--- >+- true >+... >+test_run:cmd("switch master") >+--- >+- true >+... >+box.schema.user.grant('guest', 'replication') >+--- >+... >+SYSTEM_SPACE_RECOVERY >+--- >+- 1 >+... >+LOCAL_RECOVERY >+--- >+- 1 >+... >+READ_ONLY >+--- >+- 0 >+... >+READ_WRITE >+--- >+- 1 >+... >+-- must be two entries. First from bootstrap.snap, second for current instance. >+REPLICASET_ADD >+--- >+- - 1 >+ - 1 >+... >+-- must be one entry. Deletion of initial tuple in _cluster space. >+REPLICASET_REMOVE >+--- >+- - 1 >+... >+REPLICASET_ADD = {} >+--- >+... >+REPLICASET_REMOVE = {} >+--- >+... >+new_replica_id = 0 >+--- >+... >+deleted_replica_id = 0 >+--- >+... >+test_run:cmd("setopt delimiter ';'") >+--- >+- true >+... >+function on_ctl_new(ctx) >+ if ctx.type == box.ctl.event.REPLICASET_ADD then >+ new_replica_id = ctx.replica_id >+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then >+ deleted_replica_id = ctx.replica_id >+ end >+end; >+--- >+... >+test_run:cmd("setopt delimiter ''"); >+--- >+- true >+... >+_ = box.ctl.on_ctl_event(on_ctl_new) >+--- >+... >+test_run:cmd("start server replica") >+--- >+- true >+... >+REPLICASET_ADD >+--- >+- - 2 >+... >+REPLICASET_REMOVE >+--- >+- [] >+... >+new_replica_id >+--- >+- 2 >+... >+deleted_replica_id >+--- >+- 0 >+... >+test_run:cmd("switch replica") >+--- >+- true >+... >+test_run:cmd("setopt delimiter ';'") >+--- >+- true >+... >+function on_ctl_shutdown(ctx) >+ if ctx.type == box.ctl.event.SHUTDOWN then >+ require("log").info("test replica shutdown") >+ end >+end; >+--- >+... >+function on_ctl_error(ctx) >+ error("trigger error") >+end; >+--- >+... >+test_run:cmd("setopt delimiter ''"); >+--- >+- true >+... >+SYSTEM_SPACE_RECOVERY >+--- >+- 1 >+... >+LOCAL_RECOVERY >+--- >+- 1 >+... >+READ_ONLY >+--- >+- 0 >+... >+READ_WRITE >+--- >+- 1 >+... >+REPLICASET_ADD >+--- >+- - 2 >+... >+REPLICASET_REMOVE >+--- >+- [] >+... >+box.cfg{read_only = true} >+--- >+... >+fiber = require("fiber") >+--- >+... >+while READ_ONLY == 0 do fiber.sleep(0.001) end >+--- >+... >+READ_ONLY >+--- >+- 1 >+... >+box.cfg{on_ctl_event = on_ctl_error} >+--- >+... >+box.cfg{read_only = false} >+--- >+... >+test_run:grep_log('replica', 'ctl_trigger error') >+--- >+- ctl_trigger error >+... >+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}} >+--- >+... >+box.cfg{on_ctl_event = on_ctl_shutdown} >+--- >+... >+test_run:cmd("restart server replica") >+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica. >+--test_run:grep_log('replica', 'test replica shutdown') >+test_run:cmd("switch master") >+--- >+- true >+... >+box.schema.user.revoke('guest', 'replication') >+--- >+... >+_ = box.space._cluster:delete{2} >+--- >+... >+SYSTEM_SPACE_RECOVERY >+--- >+- 1 >+... >+LOCAL_RECOVERY >+--- >+- 1 >+... >+READ_ONLY >+--- >+- 0 >+... >+READ_WRITE >+--- >+- 1 >+... >+REPLICASET_ADD >+--- >+- - 2 >+... >+REPLICASET_REMOVE >+--- >+- - 2 >+... >+new_replica_id >+--- >+- 2 >+... >+deleted_replica_id >+--- >+- 2 >+... >+box.ctl.on_ctl_event(nil, on_ctl_new) >+--- >+... >+-- cleanup >+test_run:cmd("switch default") >+--- >+- true >+... >+test_run:cmd("stop server master") >+--- >+- true >+... >+test_run:cmd("cleanup server master") >+--- >+- true >+... >+test_run:cmd("stop server replica") >+--- >+- true >+... >+test_run:cmd("cleanup server replica") >+--- >+- true >+... >diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua >new file mode 100644 >index 0000000..ff6a898 >--- /dev/null >+++ b/test/replication/onctl.test.lua >@@ -0,0 +1,105 @@ >+env = require('test_run') >+test_run = env.new() >+ >+test_run:cmd("create server master with script='replication/master_onctl.lua'") >+test_run:cmd("create server replica with rpl_master=master, script='replication/replica_onctl.lua'") >+ >+test_run:cmd("start server master") >+test_run:cmd("switch master") >+box.schema.user.grant('guest', 'replication') >+ >+SYSTEM_SPACE_RECOVERY >+LOCAL_RECOVERY >+READ_ONLY >+READ_WRITE >+-- must be two entries. First from bootstrap.snap, second for current instance. >+REPLICASET_ADD >+-- must be one entry. Deletion of initial tuple in _cluster space. >+REPLICASET_REMOVE >+ >+REPLICASET_ADD = {} >+REPLICASET_REMOVE = {} >+ >+new_replica_id = 0 >+deleted_replica_id = 0 >+ >+test_run:cmd("setopt delimiter ';'") >+function on_ctl_new(ctx) >+ if ctx.type == box.ctl.event.REPLICASET_ADD then >+ new_replica_id = ctx.replica_id >+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then >+ deleted_replica_id = ctx.replica_id >+ end >+end; >+test_run:cmd("setopt delimiter ''"); >+ >+_ = box.ctl.on_ctl_event(on_ctl_new) >+ >+test_run:cmd("start server replica") >+ >+REPLICASET_ADD >+REPLICASET_REMOVE >+ >+new_replica_id >+deleted_replica_id >+ >+test_run:cmd("switch replica") >+ >+test_run:cmd("setopt delimiter ';'") >+function on_ctl_shutdown(ctx) >+ if ctx.type == box.ctl.event.SHUTDOWN then >+ require("log").info("test replica shutdown") >+ end >+end; >+ >+function on_ctl_error(ctx) >+ error("trigger error") >+end; >+ >+test_run:cmd("setopt delimiter ''"); >+ >+SYSTEM_SPACE_RECOVERY >+LOCAL_RECOVERY >+READ_ONLY >+READ_WRITE >+REPLICASET_ADD >+REPLICASET_REMOVE >+ >+box.cfg{read_only = true} >+fiber = require("fiber") >+while READ_ONLY == 0 do fiber.sleep(0.001) end >+READ_ONLY >+ >+box.cfg{on_ctl_event = on_ctl_error} >+box.cfg{read_only = false} >+test_run:grep_log('replica', 'ctl_trigger error') >+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}} >+box.cfg{on_ctl_event = on_ctl_shutdown} >+ >+test_run:cmd("restart server replica") >+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica. >+--test_run:grep_log('replica', 'test replica shutdown') It should be done > >+ >+ >+test_run:cmd("switch master") >+box.schema.user.revoke('guest', 'replication') >+_ = box.space._cluster:delete{2} >+ >+SYSTEM_SPACE_RECOVERY >+LOCAL_RECOVERY >+READ_ONLY >+READ_WRITE >+REPLICASET_ADD >+REPLICASET_REMOVE >+ >+new_replica_id >+deleted_replica_id >+ >+box.ctl.on_ctl_event(nil, on_ctl_new) >+ >+-- cleanup >+test_run:cmd("switch default") >+test_run:cmd("stop server master") >+test_run:cmd("cleanup server master") >+test_run:cmd("stop server replica") >+test_run:cmd("cleanup server replica") >diff --git a/test/replication/replica_onctl.lua b/test/replication/replica_onctl.lua >new file mode 100644 >index 0000000..d6ce73c >--- /dev/null >+++ b/test/replication/replica_onctl.lua >@@ -0,0 +1,34 @@ >+#!/usr/bin/env tarantool >+ >+SYSTEM_SPACE_RECOVERY = 0 >+LOCAL_RECOVERY = 0 >+READ_ONLY = 0 >+READ_WRITE = 0 >+REPLICASET_ADD = {} >+REPLICASET_REMOVE = {} >+ >+local function onctl(ctx) >+ if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then >+ SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1 >+ elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then >+ LOCAL_RECOVERY = LOCAL_RECOVERY + 1 >+ elseif ctx.type == box.ctl.event.READ_ONLY then >+ READ_ONLY = READ_ONLY + 1 >+ elseif ctx.type == box.ctl.event.READ_WRITE then >+ READ_WRITE = READ_WRITE + 1 >+ elseif ctx.type == box.ctl.event.REPLICASET_ADD then >+ table.insert(REPLICASET_ADD, ctx.replica_id) >+ elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then >+ table.insert(REPLICASET_REMOVE, ctx.replica_id) >+ end >+end >+ >+box.cfg({ >+ listen = os.getenv("LISTEN"), >+ replication = os.getenv("MASTER"), >+ memtx_memory = 107374182, >+ replication_connect_timeout = 0.5, >+ on_ctl_event = onctl, >+}) >+ >+require('console').listen(os.getenv('ADMIN')) >diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg >index 95e94e5..365a825 100644 >--- a/test/replication/suite.cfg >+++ b/test/replication/suite.cfg >@@ -6,6 +6,7 @@ >     "wal_off.test.lua": {}, >     "hot_standby.test.lua": {}, >     "rebootstrap.test.lua": {}, >+ "onctl.test.lua": {}, >     "*": { >         "memtx": {"engine": "memtx"}, >         "vinyl": {"engine": "vinyl"} >-- >2.7.4 > > Best regards, Konstantin Belyavskiy k.belyavskiy@tarantool.org