From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 04/10] ddl: synchronize sequence cache with actual data state Date: Wed, 3 Jul 2019 22:30:06 +0300 Message-Id: <521653a5de84b07109f22ec024909e991eb06104.1562181197.git.vdavydov.dev@gmail.com> In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: To implement transactional DDL, we must make sure that in-memory schema is updated synchronously with system space updates, i.e. on_replace, not on_commit. Note, to do this in case of the sequence cache, we have to rework the way sequences are exported to Lua - make on_alter_sequence similar to how on_alter_space and on_alter_func triggers are implemented. --- src/box/alter.cc | 114 ++++++++++++++++++++++++--------------------- src/box/lua/load_cfg.lua | 1 - src/box/lua/schema.lua | 48 +++---------------- src/box/lua/sequence.c | 110 +++++++++++++++++++++++++++++++++++++------ src/box/schema.cc | 39 +++++----------- src/box/schema.h | 8 ++-- src/box/sequence.c | 21 +++++++++ src/box/sequence.h | 27 +++++++++++ test/box/sequence.result | 42 +++++++++++++++++ test/box/sequence.test.lua | 15 ++++++ 10 files changed, 282 insertions(+), 143 deletions(-) diff --git a/src/box/alter.cc b/src/box/alter.cc index 062cf9f8..7aeed834 100644 --- a/src/box/alter.cc +++ b/src/box/alter.cc @@ -3337,50 +3337,52 @@ sequence_def_new_from_tuple(struct tuple *tuple, uint32_t errcode) return def; } -/** Argument passed to on_commit_dd_sequence() trigger. */ -struct alter_sequence { - /** Trigger invoked on commit in the _sequence space. */ - struct trigger on_commit; - /** Trigger invoked on rollback in the _sequence space. */ - struct trigger on_rollback; - /** Old sequence definition or NULL if create. */ - struct sequence_def *old_def; - /** New sequence defitition or NULL if drop. */ - struct sequence_def *new_def; -}; - -/** - * Trigger invoked on commit in the _sequence space. - */ static void -on_commit_dd_sequence(struct trigger *trigger, void *event) +on_create_sequence_rollback(struct trigger *trigger, void * /* event */) { - struct txn *txn = (struct txn *) event; - struct alter_sequence *alter = (struct alter_sequence *) trigger->data; + /* Remove the new sequence from the cache and delete it. */ + struct sequence *seq = (struct sequence *)trigger->data; + sequence_cache_delete(seq->def->id); + trigger_run_xc(&on_alter_sequence, seq); + sequence_delete(seq); +} - if (alter->new_def != NULL && alter->old_def != NULL) { - /* Alter a sequence. */ - sequence_cache_replace(alter->new_def); - } else if (alter->new_def == NULL) { - /* Drop a sequence. */ - sequence_cache_delete(alter->old_def->id); - } +static void +on_drop_sequence_commit(struct trigger *trigger, void * /* event */) +{ + /* Delete the old sequence. */ + struct sequence *seq = (struct sequence *)trigger->data; + sequence_delete(seq); +} - trigger_run_xc(&on_alter_sequence, txn_last_stmt(txn)); +static void +on_drop_sequence_rollback(struct trigger *trigger, void * /* event */) +{ + /* Insert the old sequence back into the cache. */ + struct sequence *seq = (struct sequence *)trigger->data; + sequence_cache_insert(seq); + trigger_run_xc(&on_alter_sequence, seq); } -/** - * Trigger invoked on rollback in the _sequence space. - */ + static void -on_rollback_dd_sequence(struct trigger *trigger, void * /* event */) +on_alter_sequence_commit(struct trigger *trigger, void * /* event */) { - struct alter_sequence *alter = (struct alter_sequence *) trigger->data; + /* Delete the old old sequence definition. */ + struct sequence_def *def = (struct sequence_def *)trigger->data; + free(def); +} - if (alter->new_def != NULL && alter->old_def == NULL) { - /* Rollback creation of a sequence. */ - sequence_cache_delete(alter->new_def->id); - } +static void +on_alter_sequence_rollback(struct trigger *trigger, void * /* event */) +{ + /* Restore the old sequence definition. */ + struct sequence_def *def = (struct sequence_def *)trigger->data; + struct sequence *seq = sequence_by_id(def->id); + assert(seq != NULL); + free(seq->def); + seq->def = def; + trigger_run_xc(&on_alter_sequence, seq); } /** @@ -3396,24 +3398,25 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event) struct tuple *old_tuple = stmt->old_tuple; struct tuple *new_tuple = stmt->new_tuple; - struct alter_sequence *alter = - region_calloc_object_xc(&txn->region, struct alter_sequence); - struct sequence_def *new_def = NULL; auto def_guard = make_scoped_guard([=] { free(new_def); }); + struct sequence *seq; if (old_tuple == NULL && new_tuple != NULL) { /* INSERT */ new_def = sequence_def_new_from_tuple(new_tuple, ER_CREATE_SEQUENCE); - assert(sequence_by_id(new_def->id) == NULL); access_check_ddl(new_def->name, new_def->id, new_def->uid, SC_SEQUENCE, PRIV_C); - sequence_cache_replace(new_def); - alter->new_def = new_def; + struct trigger *on_rollback = + txn_alter_trigger_new(on_create_sequence_rollback, NULL); + seq = sequence_new_xc(new_def); + sequence_cache_insert(seq); + on_rollback->data = seq; + txn_on_rollback(txn, on_rollback); } else if (old_tuple != NULL && new_tuple == NULL) { /* DELETE */ uint32_t id = tuple_field_u32_xc(old_tuple, BOX_SEQUENCE_DATA_FIELD_ID); - struct sequence *seq = sequence_by_id(id); + seq = sequence_by_id(id); assert(seq != NULL); access_check_ddl(seq->def->name, seq->def->id, seq->def->uid, SC_SEQUENCE, PRIV_D); @@ -3426,26 +3429,31 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event) if (schema_find_grants("sequence", seq->def->id)) tnt_raise(ClientError, ER_DROP_SEQUENCE, seq->def->name, "the sequence has grants"); - alter->old_def = seq->def; + struct trigger *on_commit = + txn_alter_trigger_new(on_drop_sequence_commit, seq); + struct trigger *on_rollback = + txn_alter_trigger_new(on_drop_sequence_rollback, seq); + sequence_cache_delete(seq->def->id); + txn_on_commit(txn, on_commit); + txn_on_rollback(txn, on_rollback); } else { /* UPDATE */ new_def = sequence_def_new_from_tuple(new_tuple, ER_ALTER_SEQUENCE); - struct sequence *seq = sequence_by_id(new_def->id); + seq = sequence_by_id(new_def->id); assert(seq != NULL); access_check_ddl(seq->def->name, seq->def->id, seq->def->uid, SC_SEQUENCE, PRIV_A); - alter->old_def = seq->def; - alter->new_def = new_def; + struct trigger *on_commit = + txn_alter_trigger_new(on_alter_sequence_commit, seq->def); + struct trigger *on_rollback = + txn_alter_trigger_new(on_alter_sequence_rollback, seq->def); + seq->def = new_def; + txn_on_commit(txn, on_commit); + txn_on_rollback(txn, on_rollback); } def_guard.is_active = false; - - trigger_create(&alter->on_commit, - on_commit_dd_sequence, alter, NULL); - txn_on_commit(txn, &alter->on_commit); - trigger_create(&alter->on_rollback, - on_rollback_dd_sequence, alter, NULL); - txn_on_rollback(txn, &alter->on_rollback); + trigger_run_xc(&on_alter_sequence, seq); } /** diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 9f3344da..d1be0f39 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -467,7 +467,6 @@ setmetatable(box, { }) local function load_cfg(cfg) - box.internal.schema.init() cfg = upgrade_cfg(cfg, translate_cfg) cfg = prepare_cfg(cfg, default_cfg, template_cfg, modify_cfg) apply_default_cfg(cfg, default_cfg); diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 9c3ee063..084addc2 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -1806,42 +1806,11 @@ sequence_mt.drop = function(self) box.schema.sequence.drop(self.id) end -local function sequence_tuple_decode(seq, tuple) - seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max, - seq.start, seq.cache, seq.cycle = tuple:unpack() -end - -local function sequence_new(tuple) - local seq = setmetatable({}, sequence_mt) - sequence_tuple_decode(seq, tuple) - return seq -end - -local function sequence_on_alter(old_tuple, new_tuple) - if old_tuple and not new_tuple then - local old_name = old_tuple.name - box.sequence[old_name] = nil - elseif not old_tuple and new_tuple then - local seq = sequence_new(new_tuple) - box.sequence[seq.name] = seq - else - local old_name = old_tuple.name - local seq = box.sequence[old_name] - if not seq then - seq = sequence_new(seq, new_tuple) - else - sequence_tuple_decode(seq, new_tuple) - end - box.sequence[old_name] = nil - box.sequence[seq.name] = seq - end -end - box.sequence = {} -local function box_sequence_init() - -- Install a trigger that will update Lua objects on - -- _sequence space modifications. - internal.sequence.on_alter(sequence_on_alter) +box.schema.sequence = {} + +function box.schema.sequence.bless(seq) + setmetatable(seq, {__index = sequence_mt}) end local sequence_options = { @@ -1859,7 +1828,6 @@ create_sequence_options.if_not_exists = 'boolean' local alter_sequence_options = table.deepcopy(sequence_options) alter_sequence_options.name = 'string' -box.schema.sequence = {} box.schema.sequence.create = function(name, opts) opts = opts or {} check_param(name, 'name', 'string') @@ -1897,7 +1865,8 @@ box.schema.sequence.alter = function(name, opts) return end local seq = {} - sequence_tuple_decode(seq, tuple) + seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max, + seq.start, seq.cache, seq.cycle = tuple:unpack() opts = update_param_table(opts, seq) local _sequence = box.space[box.schema.SEQUENCE_ID] _sequence:replace{seq.id, seq.uid, opts.name, opts.step, opts.min, @@ -2689,11 +2658,6 @@ end setmetatable(box.space, { __serialize = box_space_mt }) -box.internal.schema = {} -box.internal.schema.init = function() - box_sequence_init() -end - box.feedback = {} box.feedback.save = function(file_name) if type(file_name) ~= "string" then diff --git a/src/box/lua/sequence.c b/src/box/lua/sequence.c index 2fead2eb..bd9ec758 100644 --- a/src/box/lua/sequence.c +++ b/src/box/lua/sequence.c @@ -31,11 +31,11 @@ #include "box/lua/sequence.h" #include "box/lua/tuple.h" #include "lua/utils.h" -#include "lua/trigger.h" #include "diag.h" #include "box/box.h" #include "box/schema.h" +#include "box/sequence.h" #include "box/txn.h" static int @@ -68,28 +68,104 @@ lbox_sequence_reset(struct lua_State *L) return 0; } -static int -lbox_sequence_push_on_alter_event(struct lua_State *L, void *event) +static void +lbox_sequence_new(struct lua_State *L, struct sequence *seq) { - struct txn_stmt *stmt = (struct txn_stmt *) event; - if (stmt->old_tuple) { - luaT_pushtuple(L, stmt->old_tuple); + lua_getfield(L, LUA_GLOBALSINDEX, "box"); + lua_getfield(L, -1, "sequence"); + lua_rawgeti(L, -1, seq->def->id); + if (lua_isnil(L, -1)) { + /* + * If the sequence already exists, modify it, + * rather than create a new one -- to not + * invalidate Lua variable references to old + * sequence outside the box.schema.sequence[]. + */ + lua_pop(L, 1); + lua_newtable(L); + lua_rawseti(L, -2, seq->def->id); + lua_rawgeti(L, -1, seq->def->id); } else { + /* Clear the reference to old sequence by old name. */ + lua_getfield(L, -1, "name"); lua_pushnil(L); + lua_settable(L, -4); } - if (stmt->new_tuple) { - luaT_pushtuple(L, stmt->new_tuple); - } else { + int top = lua_gettop(L); + lua_pushstring(L, "id"); + lua_pushnumber(L, seq->def->id); + lua_settable(L, top); + lua_pushstring(L, "uid"); + lua_pushnumber(L, seq->def->uid); + lua_settable(L, top); + lua_pushstring(L, "name"); + lua_pushstring(L, seq->def->name); + lua_settable(L, top); + lua_pushstring(L, "step"); + luaL_pushint64(L, seq->def->step); + lua_settable(L, top); + lua_pushstring(L, "min"); + luaL_pushint64(L, seq->def->min); + lua_settable(L, top); + lua_pushstring(L, "max"); + luaL_pushint64(L, seq->def->max); + lua_settable(L, top); + lua_pushstring(L, "start"); + luaL_pushint64(L, seq->def->start); + lua_settable(L, top); + lua_pushstring(L, "cache"); + luaL_pushint64(L, seq->def->cache); + lua_settable(L, top); + lua_pushstring(L, "cycle"); + lua_pushboolean(L, seq->def->cycle); + lua_settable(L, top); + + /* Bless sequence object. */ + lua_getfield(L, LUA_GLOBALSINDEX, "box"); + lua_pushstring(L, "schema"); + lua_gettable(L, -2); + lua_pushstring(L, "sequence"); + lua_gettable(L, -2); + lua_pushstring(L, "bless"); + lua_gettable(L, -2); + + lua_pushvalue(L, top); + lua_call(L, 1, 0); + lua_pop(L, 3); + + lua_setfield(L, -2, seq->def->name); + + lua_pop(L, 2); +} + +static void +lbox_sequence_delete(struct lua_State *L, struct sequence *seq) +{ + lua_getfield(L, LUA_GLOBALSINDEX, "box"); + lua_getfield(L, -1, "sequence"); + lua_rawgeti(L, -1, seq->def->id); + if (!lua_isnil(L, -1)) { + lua_getfield(L, -1, "name"); lua_pushnil(L); + lua_rawset(L, -4); + lua_pop(L, 1); /* pop sequence */ + lua_pushnil(L); + lua_rawseti(L, -2, seq->def->id); + } else { + lua_pop(L, 1); } - return 2; + lua_pop(L, 2); /* box, sequence */ } -static int -lbox_sequence_on_alter(struct lua_State *L) +static void +lbox_sequence_new_or_delete(struct trigger *trigger, void *event) { - return lbox_trigger_reset(L, 2, &on_alter_sequence, - lbox_sequence_push_on_alter_event, NULL); + struct lua_State *L = trigger->data; + struct sequence *seq = event; + if (sequence_by_id(seq->def->id) != NULL) + lbox_sequence_new(L, seq); + else + lbox_sequence_delete(L, seq); } void @@ -99,9 +175,13 @@ box_lua_sequence_init(struct lua_State *L) {"next", lbox_sequence_next}, {"set", lbox_sequence_set}, {"reset", lbox_sequence_reset}, - {"on_alter", lbox_sequence_on_alter}, {NULL, NULL} }; luaL_register(L, "box.internal.sequence", sequence_internal_lib); lua_pop(L, 1); + + static struct trigger on_alter_sequence_in_lua; + trigger_create(&on_alter_sequence_in_lua, + lbox_sequence_new_or_delete, L, NULL); + trigger_add(&on_alter_sequence, &on_alter_sequence_in_lua); } diff --git a/src/box/schema.cc b/src/box/schema.cc index 64412fac..3f90b8d4 100644 --- a/src/box/schema.cc +++ b/src/box/schema.cc @@ -635,41 +635,24 @@ sequence_cache_find(uint32_t id) } void -sequence_cache_replace(struct sequence_def *def) +sequence_cache_insert(struct sequence *seq) { - struct sequence *seq = sequence_by_id(def->id); - if (seq == NULL) { - /* Create a new sequence. */ - seq = (struct sequence *) calloc(1, sizeof(*seq)); - if (seq == NULL) - goto error; - struct mh_i32ptr_node_t node = { def->id, seq }; - if (mh_i32ptr_put(sequences, &node, NULL, NULL) == - mh_end(sequences)) - goto error; - } else { - /* Update an existing sequence. */ - free(seq->def); + assert(sequence_by_id(seq->def->id) == NULL); + + struct mh_i32ptr_node_t node = { seq->def->id, seq }; + mh_int_t k = mh_i32ptr_put(sequences, &node, NULL, NULL); + if (k == mh_end(sequences)) { + panic_syserror("Out of memory for the data " + "dictionary cache (sequence)."); } - seq->def = def; - return; -error: - panic_syserror("Out of memory for the data " - "dictionary cache (sequence)."); } void sequence_cache_delete(uint32_t id) { - struct sequence *seq = sequence_by_id(id); - if (seq != NULL) { - /* Delete sequence data. */ - sequence_reset(seq); - mh_i32ptr_del(sequences, seq->def->id, NULL); - free(seq->def); - TRASH(seq); - free(seq); - } + mh_int_t k = mh_i32ptr_find(sequences, id, NULL); + if (k != mh_end(sequences)) + mh_i32ptr_del(sequences, k, NULL); } const char * diff --git a/src/box/schema.h b/src/box/schema.h index 30366382..84f0d33f 100644 --- a/src/box/schema.h +++ b/src/box/schema.h @@ -205,12 +205,12 @@ struct sequence * sequence_cache_find(uint32_t id); /** - * Insert a new sequence object into the cache or update - * an existing one if there's already a sequence with - * the given id in the cache. + * Insert a new sequence object into the cache. + * There must not be a sequence with the same id + * in the cache. */ void -sequence_cache_replace(struct sequence_def *def); +sequence_cache_insert(struct sequence *seq); /** Delete a sequence from the sequence cache. */ void diff --git a/src/box/sequence.c b/src/box/sequence.c index c9828c0d..2bf38f99 100644 --- a/src/box/sequence.c +++ b/src/box/sequence.c @@ -127,6 +127,27 @@ sequence_free(void) mempool_destroy(&sequence_data_extent_pool); } +struct sequence * +sequence_new(struct sequence_def *def) +{ + struct sequence *seq = calloc(1, sizeof(*seq)); + if (seq == NULL) { + diag_set(OutOfMemory, sizeof(*seq), "malloc", "sequence"); + return NULL; + } + seq->def = def; + return seq; +} + +void +sequence_delete(struct sequence *seq) +{ + /* Delete sequence data. */ + sequence_reset(seq); + free(seq->def); + free(seq); +} + void sequence_reset(struct sequence *seq) { diff --git a/src/box/sequence.h b/src/box/sequence.h index 44194279..9a745ad5 100644 --- a/src/box/sequence.h +++ b/src/box/sequence.h @@ -36,6 +36,7 @@ #include #include +#include "diag.h" #include "user_def.h" #if defined(__cplusplus) @@ -99,6 +100,22 @@ sequence_init(void); void sequence_free(void); +/** + * Create a new sequence object with the given definition. + * Note, on success the sequence definition is assigned to + * the new sequence and will be freed automatically when + * the sequence is destroyed so it must be allocated with + * malloc(). + */ +struct sequence * +sequence_new(struct sequence_def *def); + +/** + * Destroy a sequence and its definition. + */ +void +sequence_delete(struct sequence *seq); + /** Reset a sequence. */ void sequence_reset(struct sequence *seq); @@ -155,6 +172,16 @@ sequence_data_iterator_create(void); #if defined(__cplusplus) } /* extern "C" */ + +static inline struct sequence * +sequence_new_xc(struct sequence_def *def) +{ + struct sequence *seq = sequence_new(def); + if (seq == NULL) + diag_raise(); + return seq; +} + #endif /* defined(__cplusplus) */ #endif /* INCLUDES_TARANTOOL_BOX_SEQUENCE_H */ diff --git a/test/box/sequence.result b/test/box/sequence.result index 990d15db..2c0c0a96 100644 --- a/test/box/sequence.result +++ b/test/box/sequence.result @@ -2153,3 +2153,45 @@ s:insert{{a = 3, b = box.NULL}} s:drop() --- ... +-- +-- Check that sequence cache is updated synchronously with _sequence changes. +-- +box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback() +--- +... +sq ~= nil +--- +- true +... +box.sequence.test == nil +--- +- true +... +sq = box.schema.sequence.create('test') +--- +... +box.begin() sq:alter{step = 10} step = sq.step box.rollback() +--- +... +step -- 10 +--- +- 10 +... +sq.step -- 1 +--- +- 1 +... +box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback() +--- +... +sq == nil +--- +- true +... +box.sequence.test ~= nil +--- +- true +... +box.sequence.test:drop() +--- +... diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua index 3375572e..3c8140e8 100644 --- a/test/box/sequence.test.lua +++ b/test/box/sequence.test.lua @@ -727,3 +727,18 @@ pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.a'}} -- error pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.b'}} -- ok s:insert{{a = 3, b = box.NULL}} s:drop() + +-- +-- Check that sequence cache is updated synchronously with _sequence changes. +-- +box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback() +sq ~= nil +box.sequence.test == nil +sq = box.schema.sequence.create('test') +box.begin() sq:alter{step = 10} step = sq.step box.rollback() +step -- 10 +sq.step -- 1 +box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback() +sq == nil +box.sequence.test ~= nil +box.sequence.test:drop() -- 2.11.0