From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 6/6] Introduce replica local spaces Date: Wed, 13 Jun 2018 19:10:38 +0300 Message-Id: <22474f0135864e15bb4aa2aed19662b7911aa225.1528906027.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: This patch introduces a new space option, is_local, which if specified on space creation will render all changes done locally to the space invisible to other replicas. The option can only be set on space creation and cannot be altered. Technically, to support this feature, we introduce a new header flag, IPROTO_IS_LOCAL, which is set for all rows corresponding to replica local spaces both in xlog and in snap. Relay won't send snapshot rows that are marked local. As for xlog rows, it will transform them to IPROTO_NOP so as to promote vclock on the replica without doing any actual data modification. The feature is currently supported for memtx spaces only, but it should be easy to implement it for vinyl spaces as well. Closes #3443 --- src/box/alter.cc | 4 + src/box/iproto_constants.c | 4 +- src/box/iproto_constants.h | 1 + src/box/lua/schema.lua | 9 +- src/box/lua/space.cc | 5 ++ src/box/lua/xlog.c | 5 ++ src/box/memtx_engine.c | 8 +- src/box/relay.cc | 17 +++- src/box/space.h | 12 ++- src/box/space_def.c | 2 + src/box/space_def.h | 5 ++ src/box/txn.c | 3 + src/box/vinyl.c | 5 ++ src/box/xrow.c | 9 ++ src/box/xrow.h | 1 + test/engine/iterator.result | 2 +- test/replication/local_spaces.result | 159 +++++++++++++++++++++++++++++++++ test/replication/local_spaces.test.lua | 54 +++++++++++ test/replication/suite.cfg | 1 + test/vinyl/ddl.result | 5 ++ test/vinyl/ddl.test.lua | 3 + 21 files changed, 304 insertions(+), 10 deletions(-) create mode 100644 test/replication/local_spaces.result create mode 100644 test/replication/local_spaces.test.lua diff --git a/src/box/alter.cc b/src/box/alter.cc index 6f6fcb09..9184a284 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -1628,6 +1628,10 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event) tnt_raise(ClientError, ER_ALTER_SPACE, space_name(old_space), "can not change space engine"); + if (def->opts.local != space_is_local(old_space)) + tnt_raise(ClientError, ER_ALTER_SPACE, + space_name(old_space), + "can not switch local flag"); /* * Allow change of space properties, but do it * in WAL-error-safe mode. diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 5c1d3a31..38e2bda9 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -40,10 +40,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x04 */ MP_DOUBLE, /* IPROTO_TIMESTAMP */ /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ + /* 0x07 */ MP_BOOL, /* IPROTO_IS_LOCAL */ /* }}} */ /* {{{ unused */ - /* 0x07 */ MP_UINT, /* 0x08 */ MP_UINT, /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, @@ -133,7 +133,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "timestamp", /* 0x04 */ "schema version", /* 0x05 */ "server version", /* 0x06 */ - NULL, /* 0x07 */ + "is local", /* 0x07 */ NULL, /* 0x08 */ NULL, /* 0x09 */ NULL, /* 0x0a */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index d1320de7..445d80d7 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -58,6 +58,7 @@ enum iproto_key { IPROTO_TIMESTAMP = 0x04, IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, + IPROTO_IS_LOCAL = 0x07, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 43c7d4e6..7dd73899 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -419,11 +419,13 @@ box.schema.space.create = function(name, options) user = 'string, number', format = 'table', temporary = 'boolean', + is_local = 'boolean', } local options_defaults = { engine = 'memtx', field_count = 0, temporary = false, + is_local = false, } check_param_table(options, options_template) options = update_param_table(options, options_defaults) @@ -462,6 +464,7 @@ box.schema.space.create = function(name, options) -- filter out global parameters from the options array local space_options = setmap({ temporary = options.temporary and true or nil, + is_local = options.is_local and true or nil, }) _space:insert{id, uid, name, options.engine, options.field_count, space_options, format} @@ -2330,7 +2333,11 @@ local function box_space_mt(tab) for k,v in pairs(tab) do -- skip system spaces and views if type(k) == 'string' and #k > 0 and k:sub(1,1) ~= '_' then - t[k] = { engine = v.engine, temporary = v.temporary } + t[k] = { + engine = v.engine, + temporary = v.temporary, + is_local = v.is_local, + } end end return t diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc index 52438275..e59e77bb 100644 --- a/src/box/lua/space.cc +++ b/src/box/lua/space.cc @@ -165,6 +165,11 @@ lbox_fillspace(struct lua_State *L, struct space *space, int i) lua_pushboolean(L, space_is_temporary(space)); lua_settable(L, i); + /* space.is_local */ + lua_pushstring(L, "is_local"); + lua_pushboolean(L, space_is_local(space)); + lua_settable(L, i); + /* space.name */ lua_pushstring(L, "name"); lua_pushstring(L, space_name(space)); diff --git a/src/box/lua/xlog.c b/src/box/lua/xlog.c index 2271c829..5e270f16 100644 --- a/src/box/lua/xlog.c +++ b/src/box/lua/xlog.c @@ -211,6 +211,11 @@ lbox_xlog_parser_iterate(struct lua_State *L) lua_pushinteger(L, row.replica_id); lua_settable(L, -3); /* replica_id */ } + if (row.is_local) { + lbox_xlog_pushkey(L, iproto_key_name(IPROTO_IS_LOCAL)); + lua_pushboolean(L, row.is_local); + lua_settable(L, -3); /* is_local */ + } if (row.tm != 0) { lbox_xlog_pushkey(L, iproto_key_name(IPROTO_TIMESTAMP)); lua_pushnumber(L, row.tm); diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index de9fd1ba..60959402 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -492,19 +492,20 @@ checkpoint_write_row(struct xlog *l, struct xrow_header *row) } static int -checkpoint_write_tuple(struct xlog *l, uint32_t space_id, +checkpoint_write_tuple(struct xlog *l, struct space *space, const char *data, uint32_t size) { struct request_replace_body body; body.m_body = 0x82; /* map of two elements. */ body.k_space_id = IPROTO_SPACE_ID; body.m_space_id = 0xce; /* uint32 */ - body.v_space_id = mp_bswap_u32(space_id); + body.v_space_id = mp_bswap_u32(space_id(space)); body.k_tuple = IPROTO_TUPLE; struct xrow_header row; memset(&row, 0, sizeof(struct xrow_header)); row.type = IPROTO_INSERT; + row.is_local = space_is_local(space); row.bodycnt = 2; row.body[0].iov_base = &body; @@ -629,8 +630,7 @@ checkpoint_f(va_list ap) struct snapshot_iterator *it = entry->iterator; for (data = it->next(it, &size); data != NULL; data = it->next(it, &size)) { - if (checkpoint_write_tuple(&snap, - space_id(entry->space), + if (checkpoint_write_tuple(&snap, entry->space, data, size) != 0) { xlog_close(&snap, false); return -1; diff --git a/src/box/relay.cc b/src/box/relay.cc index a25cc540..322266fc 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -620,7 +620,12 @@ static void relay_send_initial_join_row(struct xstream *stream, struct xrow_header *row) { struct relay *relay = container_of(stream, struct relay, stream); - relay_send(relay, row); + /* + * Ignore replica local requests as we don't need to promote + * vclock while sending a snapshot. + */ + if (!row->is_local) + relay_send(relay, row); } /** Send a single row to the client. */ @@ -630,6 +635,16 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) struct relay *relay = container_of(stream, struct relay, stream); assert(iproto_type_is_dml(packet->type)); /* + * Transform replica local requests to IPROTO_NOP so as to + * promote vclock on the replica without actually modifying + * any data. + */ + if (packet->is_local) { + packet->type = IPROTO_NOP; + packet->is_local = false; + packet->bodycnt = 0; + } + /* * We're feeding a WAL, thus responding to SUBSCRIBE request. * In that case, only send a row if it is not from the same replica * (i.e. don't send replica's own rows back) or if this row is diff --git a/src/box/space.h b/src/box/space.h index a024fdc8..2a852ed3 100644 --- a/src/box/space.h +++ b/src/box/space.h @@ -193,7 +193,17 @@ space_name(const struct space *space) /** Return true if space is temporary. */ static inline bool -space_is_temporary(struct space *space) { return space->def->opts.temporary; } +space_is_temporary(struct space *space) +{ + return space->def->opts.temporary; +} + +/** Return true if space is replica local. */ +static inline bool +space_is_local(struct space *space) +{ + return space->def->opts.local; +} void space_run_triggers(struct space *space, bool yesno); diff --git a/src/box/space_def.c b/src/box/space_def.c index 7349c214..0b6a6d59 100644 --- a/src/box/space_def.c +++ b/src/box/space_def.c @@ -35,10 +35,12 @@ const struct space_opts space_opts_default = { /* .temporary = */ false, + /* .local = */ false, }; const struct opt_def space_opts_reg[] = { OPT_DEF("temporary", OPT_BOOL, struct space_opts, temporary), + OPT_DEF("is_local", OPT_BOOL, struct space_opts, local), OPT_END, }; diff --git a/src/box/space_def.h b/src/box/space_def.h index 97c7e138..26e60d03 100644 --- a/src/box/space_def.h +++ b/src/box/space_def.h @@ -48,6 +48,11 @@ struct space_opts { * - changes are not part of a snapshot */ bool temporary; + /** + * The space is replica local: changes + * done to it are not replicated. + */ + bool local; }; extern const struct space_opts space_opts_default; diff --git a/src/box/txn.c b/src/box/txn.c index 362030a9..82248e34 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -64,6 +64,7 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request) row->lsn = 0; row->sync = 0; row->tm = 0; + row->is_local = false; row->bodycnt = xrow_encode_dml(request, row->body); if (row->bodycnt < 0) return -1; @@ -214,6 +215,8 @@ txn_commit_stmt(struct txn *txn, struct request *request) if (!space_is_temporary(stmt->space)) { if (txn_add_redo(stmt, request) != 0) goto fail; + if (space_is_local(stmt->space)) + stmt->row->is_local = true; ++txn->n_rows; } /* diff --git a/src/box/vinyl.c b/src/box/vinyl.c index d2e3da7e..d99fe220 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -595,6 +595,11 @@ vinyl_engine_check_space_def(struct space_def *def) def->name, "engine does not support temporary flag"); return -1; } + if (def->opts.local) { + diag_set(ClientError, ER_ALTER_SPACE, + def->name, "engine does not support local flag"); + return -1; + } return 0; } diff --git a/src/box/xrow.c b/src/box/xrow.c index 64d845f7..e5d8d045 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -106,6 +106,9 @@ error: case IPROTO_SCHEMA_VERSION: header->schema_version = mp_decode_uint(pos); break; + case IPROTO_IS_LOCAL: + header->is_local = mp_decode_bool(pos); + break; default: /* unknown header */ mp_next(pos); @@ -178,6 +181,12 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync, map_size++; } + if (header->is_local) { + d = mp_encode_uint(d, IPROTO_IS_LOCAL); + d = mp_encode_bool(d, header->is_local); + map_size++; + } + if (header->lsn) { d = mp_encode_uint(d, IPROTO_LSN); d = mp_encode_uint(d, header->lsn); diff --git a/src/box/xrow.h b/src/box/xrow.h index 1bb5f103..36211220 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -60,6 +60,7 @@ struct xrow_header { uint64_t sync; int64_t lsn; /* LSN must be signed for correct comparison */ double tm; + bool is_local; /* set for replica-local records */ int bodycnt; uint32_t schema_version; diff --git a/test/engine/iterator.result b/test/engine/iterator.result index 1bde10ea..91ad325f 100644 --- a/test/engine/iterator.result +++ b/test/engine/iterator.result @@ -4211,7 +4211,7 @@ s:replace{35} ... state, value = gen(param,state) --- -- error: 'builtin/box/schema.lua:1032: usage: next(param, state)' +- error: 'builtin/box/schema.lua:1035: usage: next(param, state)' ... value --- diff --git a/test/replication/local_spaces.result b/test/replication/local_spaces.result new file mode 100644 index 00000000..2ef74edd --- /dev/null +++ b/test/replication/local_spaces.result @@ -0,0 +1,159 @@ +env = require('test_run') +--- +... +test_run = env.new() +--- +... +-- +-- gh-3443: Check that changes done to spaces marked as local +-- are not replicated, but vclock is still promoted. +-- +s1 = box.schema.space.create('test1') +--- +... +_ = s1:create_index('pk') +--- +... +s2 = box.schema.space.create('test2', {is_local = true}) +--- +... +_ = s2:create_index('pk') +--- +... +s1.is_local +--- +- false +... +s2.is_local +--- +- true +... +_ = s1:insert{1} +--- +... +_ = s2:insert{1} +--- +... +box.snapshot() +--- +- ok +... +_ = s1:insert{2} +--- +... +_ = s2:insert{2} +--- +... +box.schema.user.grant('guest', 'replication') +--- +... +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +--- +- true +... +test_run:cmd("start server replica") +--- +- true +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test1.is_local +--- +- false +... +box.space.test2.is_local +--- +- true +... +box.space.test1:select() +--- +- - [1] + - [2] +... +box.space.test2:select() +--- +- [] +... +for i = 1, 3 do box.space.test2:insert{i, i} end +--- +... +test_run:cmd("switch default") +--- +- true +... +_ = s1:insert{3} +--- +... +_ = s2:insert{3} +--- +... +vclock = test_run:get_vclock('default') +--- +... +_ = test_run:wait_vclock('replica', vclock) +--- +... +test_run:cmd("switch replica") +--- +- true +... +box.space.test1:select() +--- +- - [1] + - [2] + - [3] +... +box.space.test2:select() +--- +- - [1, 1] + - [2, 2] + - [3, 3] +... +test_run:cmd("restart server replica") +box.space.test1:select() +--- +- - [1] + - [2] + - [3] +... +box.space.test2:select() +--- +- - [1, 1] + - [2, 2] + - [3, 3] +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server replica") +--- +- true +... +test_run:cmd("cleanup server replica") +--- +- true +... +box.schema.user.revoke('guest', 'replication') +--- +... +s1:select() +--- +- - [1] + - [2] + - [3] +... +s2:select() +--- +- - [1] + - [2] + - [3] +... +s1:drop() +--- +... +s2:drop() +--- +... diff --git a/test/replication/local_spaces.test.lua b/test/replication/local_spaces.test.lua new file mode 100644 index 00000000..804c507b --- /dev/null +++ b/test/replication/local_spaces.test.lua @@ -0,0 +1,54 @@ +env = require('test_run') +test_run = env.new() + +-- +-- gh-3443: Check that changes done to spaces marked as local +-- are not replicated, but vclock is still promoted. +-- + +s1 = box.schema.space.create('test1') +_ = s1:create_index('pk') +s2 = box.schema.space.create('test2', {is_local = true}) +_ = s2:create_index('pk') +s1.is_local +s2.is_local +_ = s1:insert{1} +_ = s2:insert{1} +box.snapshot() +_ = s1:insert{2} +_ = s2:insert{2} + +box.schema.user.grant('guest', 'replication') +test_run:cmd("create server replica with rpl_master=default, script='replication/replica.lua'") +test_run:cmd("start server replica") + +test_run:cmd("switch replica") +box.space.test1.is_local +box.space.test2.is_local +box.space.test1:select() +box.space.test2:select() +for i = 1, 3 do box.space.test2:insert{i, i} end + +test_run:cmd("switch default") +_ = s1:insert{3} +_ = s2:insert{3} +vclock = test_run:get_vclock('default') +_ = test_run:wait_vclock('replica', vclock) + +test_run:cmd("switch replica") +box.space.test1:select() +box.space.test2:select() +test_run:cmd("restart server replica") +box.space.test1:select() +box.space.test2:select() + +test_run:cmd("switch default") +test_run:cmd("stop server replica") +test_run:cmd("cleanup server replica") +box.schema.user.revoke('guest', 'replication') + +s1:select() +s2:select() + +s1:drop() +s2:drop() diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg index 95e94e5a..283edcad 100644 --- a/test/replication/suite.cfg +++ b/test/replication/suite.cfg @@ -6,6 +6,7 @@ "wal_off.test.lua": {}, "hot_standby.test.lua": {}, "rebootstrap.test.lua": {}, + "local_spaces.test.lua": {}, "*": { "memtx": {"engine": "memtx"}, "vinyl": {"engine": "vinyl"} diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result index 16ee7097..c9498adb 100644 --- a/test/vinyl/ddl.result +++ b/test/vinyl/ddl.result @@ -44,6 +44,11 @@ space:create_index('pk', {bloom_fpr = 1.1}) space:drop() --- ... +-- vinyl does not support replica local spaces +space = box.schema.space.create('test', {engine = 'vinyl', is_local = true}) +--- +- error: 'Can''t modify space ''test'': engine does not support local flag' +... -- space secondary index create space = box.schema.space.create('test', { engine = 'vinyl' }) --- diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua index 95dd5a11..a02622fc 100644 --- a/test/vinyl/ddl.test.lua +++ b/test/vinyl/ddl.test.lua @@ -12,6 +12,9 @@ space:create_index('pk', {bloom_fpr = 0}) space:create_index('pk', {bloom_fpr = 1.1}) space:drop() +-- vinyl does not support replica local spaces +space = box.schema.space.create('test', {engine = 'vinyl', is_local = true}) + -- space secondary index create space = box.schema.space.create('test', { engine = 'vinyl' }) index1 = space:create_index('primary') -- 2.11.0