[PATCH 04/10] ddl: synchronize sequence cache with actual data state
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Jul 3 22:30:06 MSK 2019
To implement transactional DDL, we must make sure that in-memory schema
is updated synchronously with system space updates, i.e. on_replace, not
on_commit.
Note, to do this in case of the sequence cache, we have to rework the
way sequences are exported to Lua - make on_alter_sequence similar to
how on_alter_space and on_alter_func triggers are implemented.
---
src/box/alter.cc | 114 ++++++++++++++++++++++++---------------------
src/box/lua/load_cfg.lua | 1 -
src/box/lua/schema.lua | 48 +++----------------
src/box/lua/sequence.c | 110 +++++++++++++++++++++++++++++++++++++------
src/box/schema.cc | 39 +++++-----------
src/box/schema.h | 8 ++--
src/box/sequence.c | 21 +++++++++
src/box/sequence.h | 27 +++++++++++
test/box/sequence.result | 42 +++++++++++++++++
test/box/sequence.test.lua | 15 ++++++
10 files changed, 282 insertions(+), 143 deletions(-)
diff --git a/src/box/alter.cc b/src/box/alter.cc
index 062cf9f8..7aeed834 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3337,50 +3337,52 @@ sequence_def_new_from_tuple(struct tuple *tuple, uint32_t errcode)
return def;
}
-/** Argument passed to on_commit_dd_sequence() trigger. */
-struct alter_sequence {
- /** Trigger invoked on commit in the _sequence space. */
- struct trigger on_commit;
- /** Trigger invoked on rollback in the _sequence space. */
- struct trigger on_rollback;
- /** Old sequence definition or NULL if create. */
- struct sequence_def *old_def;
- /** New sequence defitition or NULL if drop. */
- struct sequence_def *new_def;
-};
-
-/**
- * Trigger invoked on commit in the _sequence space.
- */
static void
-on_commit_dd_sequence(struct trigger *trigger, void *event)
+on_create_sequence_rollback(struct trigger *trigger, void * /* event */)
{
- struct txn *txn = (struct txn *) event;
- struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
+ /* Remove the new sequence from the cache and delete it. */
+ struct sequence *seq = (struct sequence *)trigger->data;
+ sequence_cache_delete(seq->def->id);
+ trigger_run_xc(&on_alter_sequence, seq);
+ sequence_delete(seq);
+}
- if (alter->new_def != NULL && alter->old_def != NULL) {
- /* Alter a sequence. */
- sequence_cache_replace(alter->new_def);
- } else if (alter->new_def == NULL) {
- /* Drop a sequence. */
- sequence_cache_delete(alter->old_def->id);
- }
+static void
+on_drop_sequence_commit(struct trigger *trigger, void * /* event */)
+{
+ /* Delete the old sequence. */
+ struct sequence *seq = (struct sequence *)trigger->data;
+ sequence_delete(seq);
+}
- trigger_run_xc(&on_alter_sequence, txn_last_stmt(txn));
+static void
+on_drop_sequence_rollback(struct trigger *trigger, void * /* event */)
+{
+ /* Insert the old sequence back into the cache. */
+ struct sequence *seq = (struct sequence *)trigger->data;
+ sequence_cache_insert(seq);
+ trigger_run_xc(&on_alter_sequence, seq);
}
-/**
- * Trigger invoked on rollback in the _sequence space.
- */
+
static void
-on_rollback_dd_sequence(struct trigger *trigger, void * /* event */)
+on_alter_sequence_commit(struct trigger *trigger, void * /* event */)
{
- struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
+ /* Delete the old old sequence definition. */
+ struct sequence_def *def = (struct sequence_def *)trigger->data;
+ free(def);
+}
- if (alter->new_def != NULL && alter->old_def == NULL) {
- /* Rollback creation of a sequence. */
- sequence_cache_delete(alter->new_def->id);
- }
+static void
+on_alter_sequence_rollback(struct trigger *trigger, void * /* event */)
+{
+ /* Restore the old sequence definition. */
+ struct sequence_def *def = (struct sequence_def *)trigger->data;
+ struct sequence *seq = sequence_by_id(def->id);
+ assert(seq != NULL);
+ free(seq->def);
+ seq->def = def;
+ trigger_run_xc(&on_alter_sequence, seq);
}
/**
@@ -3396,24 +3398,25 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
struct tuple *old_tuple = stmt->old_tuple;
struct tuple *new_tuple = stmt->new_tuple;
- struct alter_sequence *alter =
- region_calloc_object_xc(&txn->region, struct alter_sequence);
-
struct sequence_def *new_def = NULL;
auto def_guard = make_scoped_guard([=] { free(new_def); });
+ struct sequence *seq;
if (old_tuple == NULL && new_tuple != NULL) { /* INSERT */
new_def = sequence_def_new_from_tuple(new_tuple,
ER_CREATE_SEQUENCE);
- assert(sequence_by_id(new_def->id) == NULL);
access_check_ddl(new_def->name, new_def->id, new_def->uid,
SC_SEQUENCE, PRIV_C);
- sequence_cache_replace(new_def);
- alter->new_def = new_def;
+ struct trigger *on_rollback =
+ txn_alter_trigger_new(on_create_sequence_rollback, NULL);
+ seq = sequence_new_xc(new_def);
+ sequence_cache_insert(seq);
+ on_rollback->data = seq;
+ txn_on_rollback(txn, on_rollback);
} else if (old_tuple != NULL && new_tuple == NULL) { /* DELETE */
uint32_t id = tuple_field_u32_xc(old_tuple,
BOX_SEQUENCE_DATA_FIELD_ID);
- struct sequence *seq = sequence_by_id(id);
+ seq = sequence_by_id(id);
assert(seq != NULL);
access_check_ddl(seq->def->name, seq->def->id, seq->def->uid,
SC_SEQUENCE, PRIV_D);
@@ -3426,26 +3429,31 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
if (schema_find_grants("sequence", seq->def->id))
tnt_raise(ClientError, ER_DROP_SEQUENCE,
seq->def->name, "the sequence has grants");
- alter->old_def = seq->def;
+ struct trigger *on_commit =
+ txn_alter_trigger_new(on_drop_sequence_commit, seq);
+ struct trigger *on_rollback =
+ txn_alter_trigger_new(on_drop_sequence_rollback, seq);
+ sequence_cache_delete(seq->def->id);
+ txn_on_commit(txn, on_commit);
+ txn_on_rollback(txn, on_rollback);
} else { /* UPDATE */
new_def = sequence_def_new_from_tuple(new_tuple,
ER_ALTER_SEQUENCE);
- struct sequence *seq = sequence_by_id(new_def->id);
+ seq = sequence_by_id(new_def->id);
assert(seq != NULL);
access_check_ddl(seq->def->name, seq->def->id, seq->def->uid,
SC_SEQUENCE, PRIV_A);
- alter->old_def = seq->def;
- alter->new_def = new_def;
+ struct trigger *on_commit =
+ txn_alter_trigger_new(on_alter_sequence_commit, seq->def);
+ struct trigger *on_rollback =
+ txn_alter_trigger_new(on_alter_sequence_rollback, seq->def);
+ seq->def = new_def;
+ txn_on_commit(txn, on_commit);
+ txn_on_rollback(txn, on_rollback);
}
def_guard.is_active = false;
-
- trigger_create(&alter->on_commit,
- on_commit_dd_sequence, alter, NULL);
- txn_on_commit(txn, &alter->on_commit);
- trigger_create(&alter->on_rollback,
- on_rollback_dd_sequence, alter, NULL);
- txn_on_rollback(txn, &alter->on_rollback);
+ trigger_run_xc(&on_alter_sequence, seq);
}
/**
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 9f3344da..d1be0f39 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -467,7 +467,6 @@ setmetatable(box, {
})
local function load_cfg(cfg)
- box.internal.schema.init()
cfg = upgrade_cfg(cfg, translate_cfg)
cfg = prepare_cfg(cfg, default_cfg, template_cfg, modify_cfg)
apply_default_cfg(cfg, default_cfg);
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 9c3ee063..084addc2 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -1806,42 +1806,11 @@ sequence_mt.drop = function(self)
box.schema.sequence.drop(self.id)
end
-local function sequence_tuple_decode(seq, tuple)
- seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max,
- seq.start, seq.cache, seq.cycle = tuple:unpack()
-end
-
-local function sequence_new(tuple)
- local seq = setmetatable({}, sequence_mt)
- sequence_tuple_decode(seq, tuple)
- return seq
-end
-
-local function sequence_on_alter(old_tuple, new_tuple)
- if old_tuple and not new_tuple then
- local old_name = old_tuple.name
- box.sequence[old_name] = nil
- elseif not old_tuple and new_tuple then
- local seq = sequence_new(new_tuple)
- box.sequence[seq.name] = seq
- else
- local old_name = old_tuple.name
- local seq = box.sequence[old_name]
- if not seq then
- seq = sequence_new(seq, new_tuple)
- else
- sequence_tuple_decode(seq, new_tuple)
- end
- box.sequence[old_name] = nil
- box.sequence[seq.name] = seq
- end
-end
-
box.sequence = {}
-local function box_sequence_init()
- -- Install a trigger that will update Lua objects on
- -- _sequence space modifications.
- internal.sequence.on_alter(sequence_on_alter)
+box.schema.sequence = {}
+
+function box.schema.sequence.bless(seq)
+ setmetatable(seq, {__index = sequence_mt})
end
local sequence_options = {
@@ -1859,7 +1828,6 @@ create_sequence_options.if_not_exists = 'boolean'
local alter_sequence_options = table.deepcopy(sequence_options)
alter_sequence_options.name = 'string'
-box.schema.sequence = {}
box.schema.sequence.create = function(name, opts)
opts = opts or {}
check_param(name, 'name', 'string')
@@ -1897,7 +1865,8 @@ box.schema.sequence.alter = function(name, opts)
return
end
local seq = {}
- sequence_tuple_decode(seq, tuple)
+ seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max,
+ seq.start, seq.cache, seq.cycle = tuple:unpack()
opts = update_param_table(opts, seq)
local _sequence = box.space[box.schema.SEQUENCE_ID]
_sequence:replace{seq.id, seq.uid, opts.name, opts.step, opts.min,
@@ -2689,11 +2658,6 @@ end
setmetatable(box.space, { __serialize = box_space_mt })
-box.internal.schema = {}
-box.internal.schema.init = function()
- box_sequence_init()
-end
-
box.feedback = {}
box.feedback.save = function(file_name)
if type(file_name) ~= "string" then
diff --git a/src/box/lua/sequence.c b/src/box/lua/sequence.c
index 2fead2eb..bd9ec758 100644
--- a/src/box/lua/sequence.c
+++ b/src/box/lua/sequence.c
@@ -31,11 +31,11 @@
#include "box/lua/sequence.h"
#include "box/lua/tuple.h"
#include "lua/utils.h"
-#include "lua/trigger.h"
#include "diag.h"
#include "box/box.h"
#include "box/schema.h"
+#include "box/sequence.h"
#include "box/txn.h"
static int
@@ -68,28 +68,104 @@ lbox_sequence_reset(struct lua_State *L)
return 0;
}
-static int
-lbox_sequence_push_on_alter_event(struct lua_State *L, void *event)
+static void
+lbox_sequence_new(struct lua_State *L, struct sequence *seq)
{
- struct txn_stmt *stmt = (struct txn_stmt *) event;
- if (stmt->old_tuple) {
- luaT_pushtuple(L, stmt->old_tuple);
+ lua_getfield(L, LUA_GLOBALSINDEX, "box");
+ lua_getfield(L, -1, "sequence");
+ lua_rawgeti(L, -1, seq->def->id);
+ if (lua_isnil(L, -1)) {
+ /*
+ * If the sequence already exists, modify it,
+ * rather than create a new one -- to not
+ * invalidate Lua variable references to old
+ * sequence outside the box.schema.sequence[].
+ */
+ lua_pop(L, 1);
+ lua_newtable(L);
+ lua_rawseti(L, -2, seq->def->id);
+ lua_rawgeti(L, -1, seq->def->id);
} else {
+ /* Clear the reference to old sequence by old name. */
+ lua_getfield(L, -1, "name");
lua_pushnil(L);
+ lua_settable(L, -4);
}
- if (stmt->new_tuple) {
- luaT_pushtuple(L, stmt->new_tuple);
- } else {
+ int top = lua_gettop(L);
+ lua_pushstring(L, "id");
+ lua_pushnumber(L, seq->def->id);
+ lua_settable(L, top);
+ lua_pushstring(L, "uid");
+ lua_pushnumber(L, seq->def->uid);
+ lua_settable(L, top);
+ lua_pushstring(L, "name");
+ lua_pushstring(L, seq->def->name);
+ lua_settable(L, top);
+ lua_pushstring(L, "step");
+ luaL_pushint64(L, seq->def->step);
+ lua_settable(L, top);
+ lua_pushstring(L, "min");
+ luaL_pushint64(L, seq->def->min);
+ lua_settable(L, top);
+ lua_pushstring(L, "max");
+ luaL_pushint64(L, seq->def->max);
+ lua_settable(L, top);
+ lua_pushstring(L, "start");
+ luaL_pushint64(L, seq->def->start);
+ lua_settable(L, top);
+ lua_pushstring(L, "cache");
+ luaL_pushint64(L, seq->def->cache);
+ lua_settable(L, top);
+ lua_pushstring(L, "cycle");
+ lua_pushboolean(L, seq->def->cycle);
+ lua_settable(L, top);
+
+ /* Bless sequence object. */
+ lua_getfield(L, LUA_GLOBALSINDEX, "box");
+ lua_pushstring(L, "schema");
+ lua_gettable(L, -2);
+ lua_pushstring(L, "sequence");
+ lua_gettable(L, -2);
+ lua_pushstring(L, "bless");
+ lua_gettable(L, -2);
+
+ lua_pushvalue(L, top);
+ lua_call(L, 1, 0);
+ lua_pop(L, 3);
+
+ lua_setfield(L, -2, seq->def->name);
+
+ lua_pop(L, 2);
+}
+
+static void
+lbox_sequence_delete(struct lua_State *L, struct sequence *seq)
+{
+ lua_getfield(L, LUA_GLOBALSINDEX, "box");
+ lua_getfield(L, -1, "sequence");
+ lua_rawgeti(L, -1, seq->def->id);
+ if (!lua_isnil(L, -1)) {
+ lua_getfield(L, -1, "name");
lua_pushnil(L);
+ lua_rawset(L, -4);
+ lua_pop(L, 1); /* pop sequence */
+ lua_pushnil(L);
+ lua_rawseti(L, -2, seq->def->id);
+ } else {
+ lua_pop(L, 1);
}
- return 2;
+ lua_pop(L, 2); /* box, sequence */
}
-static int
-lbox_sequence_on_alter(struct lua_State *L)
+static void
+lbox_sequence_new_or_delete(struct trigger *trigger, void *event)
{
- return lbox_trigger_reset(L, 2, &on_alter_sequence,
- lbox_sequence_push_on_alter_event, NULL);
+ struct lua_State *L = trigger->data;
+ struct sequence *seq = event;
+ if (sequence_by_id(seq->def->id) != NULL)
+ lbox_sequence_new(L, seq);
+ else
+ lbox_sequence_delete(L, seq);
}
void
@@ -99,9 +175,13 @@ box_lua_sequence_init(struct lua_State *L)
{"next", lbox_sequence_next},
{"set", lbox_sequence_set},
{"reset", lbox_sequence_reset},
- {"on_alter", lbox_sequence_on_alter},
{NULL, NULL}
};
luaL_register(L, "box.internal.sequence", sequence_internal_lib);
lua_pop(L, 1);
+
+ static struct trigger on_alter_sequence_in_lua;
+ trigger_create(&on_alter_sequence_in_lua,
+ lbox_sequence_new_or_delete, L, NULL);
+ trigger_add(&on_alter_sequence, &on_alter_sequence_in_lua);
}
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 64412fac..3f90b8d4 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -635,41 +635,24 @@ sequence_cache_find(uint32_t id)
}
void
-sequence_cache_replace(struct sequence_def *def)
+sequence_cache_insert(struct sequence *seq)
{
- struct sequence *seq = sequence_by_id(def->id);
- if (seq == NULL) {
- /* Create a new sequence. */
- seq = (struct sequence *) calloc(1, sizeof(*seq));
- if (seq == NULL)
- goto error;
- struct mh_i32ptr_node_t node = { def->id, seq };
- if (mh_i32ptr_put(sequences, &node, NULL, NULL) ==
- mh_end(sequences))
- goto error;
- } else {
- /* Update an existing sequence. */
- free(seq->def);
+ assert(sequence_by_id(seq->def->id) == NULL);
+
+ struct mh_i32ptr_node_t node = { seq->def->id, seq };
+ mh_int_t k = mh_i32ptr_put(sequences, &node, NULL, NULL);
+ if (k == mh_end(sequences)) {
+ panic_syserror("Out of memory for the data "
+ "dictionary cache (sequence).");
}
- seq->def = def;
- return;
-error:
- panic_syserror("Out of memory for the data "
- "dictionary cache (sequence).");
}
void
sequence_cache_delete(uint32_t id)
{
- struct sequence *seq = sequence_by_id(id);
- if (seq != NULL) {
- /* Delete sequence data. */
- sequence_reset(seq);
- mh_i32ptr_del(sequences, seq->def->id, NULL);
- free(seq->def);
- TRASH(seq);
- free(seq);
- }
+ mh_int_t k = mh_i32ptr_find(sequences, id, NULL);
+ if (k != mh_end(sequences))
+ mh_i32ptr_del(sequences, k, NULL);
}
const char *
diff --git a/src/box/schema.h b/src/box/schema.h
index 30366382..84f0d33f 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -205,12 +205,12 @@ struct sequence *
sequence_cache_find(uint32_t id);
/**
- * Insert a new sequence object into the cache or update
- * an existing one if there's already a sequence with
- * the given id in the cache.
+ * Insert a new sequence object into the cache.
+ * There must not be a sequence with the same id
+ * in the cache.
*/
void
-sequence_cache_replace(struct sequence_def *def);
+sequence_cache_insert(struct sequence *seq);
/** Delete a sequence from the sequence cache. */
void
diff --git a/src/box/sequence.c b/src/box/sequence.c
index c9828c0d..2bf38f99 100644
--- a/src/box/sequence.c
+++ b/src/box/sequence.c
@@ -127,6 +127,27 @@ sequence_free(void)
mempool_destroy(&sequence_data_extent_pool);
}
+struct sequence *
+sequence_new(struct sequence_def *def)
+{
+ struct sequence *seq = calloc(1, sizeof(*seq));
+ if (seq == NULL) {
+ diag_set(OutOfMemory, sizeof(*seq), "malloc", "sequence");
+ return NULL;
+ }
+ seq->def = def;
+ return seq;
+}
+
+void
+sequence_delete(struct sequence *seq)
+{
+ /* Delete sequence data. */
+ sequence_reset(seq);
+ free(seq->def);
+ free(seq);
+}
+
void
sequence_reset(struct sequence *seq)
{
diff --git a/src/box/sequence.h b/src/box/sequence.h
index 44194279..9a745ad5 100644
--- a/src/box/sequence.h
+++ b/src/box/sequence.h
@@ -36,6 +36,7 @@
#include <stddef.h>
#include <stdint.h>
+#include "diag.h"
#include "user_def.h"
#if defined(__cplusplus)
@@ -99,6 +100,22 @@ sequence_init(void);
void
sequence_free(void);
+/**
+ * Create a new sequence object with the given definition.
+ * Note, on success the sequence definition is assigned to
+ * the new sequence and will be freed automatically when
+ * the sequence is destroyed so it must be allocated with
+ * malloc().
+ */
+struct sequence *
+sequence_new(struct sequence_def *def);
+
+/**
+ * Destroy a sequence and its definition.
+ */
+void
+sequence_delete(struct sequence *seq);
+
/** Reset a sequence. */
void
sequence_reset(struct sequence *seq);
@@ -155,6 +172,16 @@ sequence_data_iterator_create(void);
#if defined(__cplusplus)
} /* extern "C" */
+
+static inline struct sequence *
+sequence_new_xc(struct sequence_def *def)
+{
+ struct sequence *seq = sequence_new(def);
+ if (seq == NULL)
+ diag_raise();
+ return seq;
+}
+
#endif /* defined(__cplusplus) */
#endif /* INCLUDES_TARANTOOL_BOX_SEQUENCE_H */
diff --git a/test/box/sequence.result b/test/box/sequence.result
index 990d15db..2c0c0a96 100644
--- a/test/box/sequence.result
+++ b/test/box/sequence.result
@@ -2153,3 +2153,45 @@ s:insert{{a = 3, b = box.NULL}}
s:drop()
---
...
+--
+-- Check that sequence cache is updated synchronously with _sequence changes.
+--
+box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback()
+---
+...
+sq ~= nil
+---
+- true
+...
+box.sequence.test == nil
+---
+- true
+...
+sq = box.schema.sequence.create('test')
+---
+...
+box.begin() sq:alter{step = 10} step = sq.step box.rollback()
+---
+...
+step -- 10
+---
+- 10
+...
+sq.step -- 1
+---
+- 1
+...
+box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback()
+---
+...
+sq == nil
+---
+- true
+...
+box.sequence.test ~= nil
+---
+- true
+...
+box.sequence.test:drop()
+---
+...
diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua
index 3375572e..3c8140e8 100644
--- a/test/box/sequence.test.lua
+++ b/test/box/sequence.test.lua
@@ -727,3 +727,18 @@ pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.a'}} -- error
pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.b'}} -- ok
s:insert{{a = 3, b = box.NULL}}
s:drop()
+
+--
+-- Check that sequence cache is updated synchronously with _sequence changes.
+--
+box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback()
+sq ~= nil
+box.sequence.test == nil
+sq = box.schema.sequence.create('test')
+box.begin() sq:alter{step = 10} step = sq.step box.rollback()
+step -- 10
+sq.step -- 1
+box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback()
+sq == nil
+box.sequence.test ~= nil
+box.sequence.test:drop()
--
2.11.0
More information about the Tarantool-patches
mailing list