[PATCH 04/10] ddl: synchronize sequence cache with actual data state

Vladimir Davydov vdavydov.dev at gmail.com
Wed Jul 3 22:30:06 MSK 2019


To implement transactional DDL, we must make sure that in-memory schema
is updated synchronously with system space updates, i.e. on_replace, not
on_commit.

Note, to do this in case of the sequence cache, we have to rework the
way sequences are exported to Lua - make on_alter_sequence similar to
how on_alter_space and on_alter_func triggers are implemented.
---
 src/box/alter.cc           | 114 ++++++++++++++++++++++++---------------------
 src/box/lua/load_cfg.lua   |   1 -
 src/box/lua/schema.lua     |  48 +++----------------
 src/box/lua/sequence.c     | 110 +++++++++++++++++++++++++++++++++++++------
 src/box/schema.cc          |  39 +++++-----------
 src/box/schema.h           |   8 ++--
 src/box/sequence.c         |  21 +++++++++
 src/box/sequence.h         |  27 +++++++++++
 test/box/sequence.result   |  42 +++++++++++++++++
 test/box/sequence.test.lua |  15 ++++++
 10 files changed, 282 insertions(+), 143 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 062cf9f8..7aeed834 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -3337,50 +3337,52 @@ sequence_def_new_from_tuple(struct tuple *tuple, uint32_t errcode)
 	return def;
 }
 
-/** Argument passed to on_commit_dd_sequence() trigger. */
-struct alter_sequence {
-	/** Trigger invoked on commit in the _sequence space. */
-	struct trigger on_commit;
-	/** Trigger invoked on rollback in the _sequence space. */
-	struct trigger on_rollback;
-	/** Old sequence definition or NULL if create. */
-	struct sequence_def *old_def;
-	/** New sequence defitition or NULL if drop. */
-	struct sequence_def *new_def;
-};
-
-/**
- * Trigger invoked on commit in the _sequence space.
- */
 static void
-on_commit_dd_sequence(struct trigger *trigger, void *event)
+on_create_sequence_rollback(struct trigger *trigger, void * /* event */)
 {
-	struct txn *txn = (struct txn *) event;
-	struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
+	/* Remove the new sequence from the cache and delete it. */
+	struct sequence *seq = (struct sequence *)trigger->data;
+	sequence_cache_delete(seq->def->id);
+	trigger_run_xc(&on_alter_sequence, seq);
+	sequence_delete(seq);
+}
 
-	if (alter->new_def != NULL && alter->old_def != NULL) {
-		/* Alter a sequence. */
-		sequence_cache_replace(alter->new_def);
-	} else if (alter->new_def == NULL) {
-		/* Drop a sequence. */
-		sequence_cache_delete(alter->old_def->id);
-	}
+static void
+on_drop_sequence_commit(struct trigger *trigger, void * /* event */)
+{
+	/* Delete the old sequence. */
+	struct sequence *seq = (struct sequence *)trigger->data;
+	sequence_delete(seq);
+}
 
-	trigger_run_xc(&on_alter_sequence, txn_last_stmt(txn));
+static void
+on_drop_sequence_rollback(struct trigger *trigger, void * /* event */)
+{
+	/* Insert the old sequence back into the cache. */
+	struct sequence *seq = (struct sequence *)trigger->data;
+	sequence_cache_insert(seq);
+	trigger_run_xc(&on_alter_sequence, seq);
 }
 
-/**
- * Trigger invoked on rollback in the _sequence space.
- */
+
 static void
-on_rollback_dd_sequence(struct trigger *trigger, void * /* event */)
+on_alter_sequence_commit(struct trigger *trigger, void * /* event */)
 {
-	struct alter_sequence *alter = (struct alter_sequence *) trigger->data;
+	/* Delete the old old sequence definition. */
+	struct sequence_def *def = (struct sequence_def *)trigger->data;
+	free(def);
+}
 
-	if (alter->new_def != NULL && alter->old_def == NULL) {
-		/* Rollback creation of a sequence. */
-		sequence_cache_delete(alter->new_def->id);
-	}
+static void
+on_alter_sequence_rollback(struct trigger *trigger, void * /* event */)
+{
+	/* Restore the old sequence definition. */
+	struct sequence_def *def = (struct sequence_def *)trigger->data;
+	struct sequence *seq = sequence_by_id(def->id);
+	assert(seq != NULL);
+	free(seq->def);
+	seq->def = def;
+	trigger_run_xc(&on_alter_sequence, seq);
 }
 
 /**
@@ -3396,24 +3398,25 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
 	struct tuple *old_tuple = stmt->old_tuple;
 	struct tuple *new_tuple = stmt->new_tuple;
 
-	struct alter_sequence *alter =
-		region_calloc_object_xc(&txn->region, struct alter_sequence);
-
 	struct sequence_def *new_def = NULL;
 	auto def_guard = make_scoped_guard([=] { free(new_def); });
 
+	struct sequence *seq;
 	if (old_tuple == NULL && new_tuple != NULL) {		/* INSERT */
 		new_def = sequence_def_new_from_tuple(new_tuple,
 						      ER_CREATE_SEQUENCE);
-		assert(sequence_by_id(new_def->id) == NULL);
 		access_check_ddl(new_def->name, new_def->id, new_def->uid,
 				 SC_SEQUENCE, PRIV_C);
-		sequence_cache_replace(new_def);
-		alter->new_def = new_def;
+		struct trigger *on_rollback =
+			txn_alter_trigger_new(on_create_sequence_rollback, NULL);
+		seq = sequence_new_xc(new_def);
+		sequence_cache_insert(seq);
+		on_rollback->data = seq;
+		txn_on_rollback(txn, on_rollback);
 	} else if (old_tuple != NULL && new_tuple == NULL) {	/* DELETE */
 		uint32_t id = tuple_field_u32_xc(old_tuple,
 						 BOX_SEQUENCE_DATA_FIELD_ID);
-		struct sequence *seq = sequence_by_id(id);
+		seq = sequence_by_id(id);
 		assert(seq != NULL);
 		access_check_ddl(seq->def->name, seq->def->id, seq->def->uid,
 				 SC_SEQUENCE, PRIV_D);
@@ -3426,26 +3429,31 @@ on_replace_dd_sequence(struct trigger * /* trigger */, void *event)
 		if (schema_find_grants("sequence", seq->def->id))
 			tnt_raise(ClientError, ER_DROP_SEQUENCE,
 				  seq->def->name, "the sequence has grants");
-		alter->old_def = seq->def;
+		struct trigger *on_commit =
+			txn_alter_trigger_new(on_drop_sequence_commit, seq);
+		struct trigger *on_rollback =
+			txn_alter_trigger_new(on_drop_sequence_rollback, seq);
+		sequence_cache_delete(seq->def->id);
+		txn_on_commit(txn, on_commit);
+		txn_on_rollback(txn, on_rollback);
 	} else {						/* UPDATE */
 		new_def = sequence_def_new_from_tuple(new_tuple,
 						      ER_ALTER_SEQUENCE);
-		struct sequence *seq = sequence_by_id(new_def->id);
+		seq = sequence_by_id(new_def->id);
 		assert(seq != NULL);
 		access_check_ddl(seq->def->name, seq->def->id, seq->def->uid,
 				 SC_SEQUENCE, PRIV_A);
-		alter->old_def = seq->def;
-		alter->new_def = new_def;
+		struct trigger *on_commit =
+			txn_alter_trigger_new(on_alter_sequence_commit, seq->def);
+		struct trigger *on_rollback =
+			txn_alter_trigger_new(on_alter_sequence_rollback, seq->def);
+		seq->def = new_def;
+		txn_on_commit(txn, on_commit);
+		txn_on_rollback(txn, on_rollback);
 	}
 
 	def_guard.is_active = false;
-
-	trigger_create(&alter->on_commit,
-		       on_commit_dd_sequence, alter, NULL);
-	txn_on_commit(txn, &alter->on_commit);
-	trigger_create(&alter->on_rollback,
-		       on_rollback_dd_sequence, alter, NULL);
-	txn_on_rollback(txn, &alter->on_rollback);
+	trigger_run_xc(&on_alter_sequence, seq);
 }
 
 /**
diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua
index 9f3344da..d1be0f39 100644
--- a/src/box/lua/load_cfg.lua
+++ b/src/box/lua/load_cfg.lua
@@ -467,7 +467,6 @@ setmetatable(box, {
 })
 
 local function load_cfg(cfg)
-    box.internal.schema.init()
     cfg = upgrade_cfg(cfg, translate_cfg)
     cfg = prepare_cfg(cfg, default_cfg, template_cfg, modify_cfg)
     apply_default_cfg(cfg, default_cfg);
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 9c3ee063..084addc2 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -1806,42 +1806,11 @@ sequence_mt.drop = function(self)
     box.schema.sequence.drop(self.id)
 end
 
-local function sequence_tuple_decode(seq, tuple)
-    seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max,
-        seq.start, seq.cache, seq.cycle = tuple:unpack()
-end
-
-local function sequence_new(tuple)
-    local seq = setmetatable({}, sequence_mt)
-    sequence_tuple_decode(seq, tuple)
-    return seq
-end
-
-local function sequence_on_alter(old_tuple, new_tuple)
-    if old_tuple and not new_tuple then
-        local old_name = old_tuple.name
-        box.sequence[old_name] = nil
-    elseif not old_tuple and new_tuple then
-        local seq = sequence_new(new_tuple)
-        box.sequence[seq.name] = seq
-    else
-        local old_name = old_tuple.name
-        local seq = box.sequence[old_name]
-        if not seq then
-            seq = sequence_new(seq, new_tuple)
-        else
-            sequence_tuple_decode(seq, new_tuple)
-        end
-        box.sequence[old_name] = nil
-        box.sequence[seq.name] = seq
-    end
-end
-
 box.sequence = {}
-local function box_sequence_init()
-    -- Install a trigger that will update Lua objects on
-    -- _sequence space modifications.
-    internal.sequence.on_alter(sequence_on_alter)
+box.schema.sequence = {}
+
+function box.schema.sequence.bless(seq)
+    setmetatable(seq, {__index = sequence_mt})
 end
 
 local sequence_options = {
@@ -1859,7 +1828,6 @@ create_sequence_options.if_not_exists = 'boolean'
 local alter_sequence_options = table.deepcopy(sequence_options)
 alter_sequence_options.name = 'string'
 
-box.schema.sequence = {}
 box.schema.sequence.create = function(name, opts)
     opts = opts or {}
     check_param(name, 'name', 'string')
@@ -1897,7 +1865,8 @@ box.schema.sequence.alter = function(name, opts)
         return
     end
     local seq = {}
-    sequence_tuple_decode(seq, tuple)
+    seq.id, seq.uid, seq.name, seq.step, seq.min, seq.max,
+        seq.start, seq.cache, seq.cycle = tuple:unpack()
     opts = update_param_table(opts, seq)
     local _sequence = box.space[box.schema.SEQUENCE_ID]
     _sequence:replace{seq.id, seq.uid, opts.name, opts.step, opts.min,
@@ -2689,11 +2658,6 @@ end
 
 setmetatable(box.space, { __serialize = box_space_mt })
 
-box.internal.schema = {}
-box.internal.schema.init = function()
-    box_sequence_init()
-end
-
 box.feedback = {}
 box.feedback.save = function(file_name)
     if type(file_name) ~= "string" then
diff --git a/src/box/lua/sequence.c b/src/box/lua/sequence.c
index 2fead2eb..bd9ec758 100644
--- a/src/box/lua/sequence.c
+++ b/src/box/lua/sequence.c
@@ -31,11 +31,11 @@
 #include "box/lua/sequence.h"
 #include "box/lua/tuple.h"
 #include "lua/utils.h"
-#include "lua/trigger.h"
 
 #include "diag.h"
 #include "box/box.h"
 #include "box/schema.h"
+#include "box/sequence.h"
 #include "box/txn.h"
 
 static int
@@ -68,28 +68,104 @@ lbox_sequence_reset(struct lua_State *L)
 	return 0;
 }
 
-static int
-lbox_sequence_push_on_alter_event(struct lua_State *L, void *event)
+static void
+lbox_sequence_new(struct lua_State *L, struct sequence *seq)
 {
-	struct txn_stmt *stmt = (struct txn_stmt *) event;
-	if (stmt->old_tuple) {
-		luaT_pushtuple(L, stmt->old_tuple);
+	lua_getfield(L, LUA_GLOBALSINDEX, "box");
+	lua_getfield(L, -1, "sequence");
+	lua_rawgeti(L, -1, seq->def->id);
+	if (lua_isnil(L, -1)) {
+		/*
+		 * If the sequence already exists, modify it,
+		 * rather than create a new one -- to not
+		 * invalidate Lua variable references to old
+		 * sequence outside the box.schema.sequence[].
+		 */
+		lua_pop(L, 1);
+		lua_newtable(L);
+		lua_rawseti(L, -2, seq->def->id);
+		lua_rawgeti(L, -1, seq->def->id);
 	} else {
+		/* Clear the reference to old sequence by old name. */
+		lua_getfield(L, -1, "name");
 		lua_pushnil(L);
+		lua_settable(L, -4);
 	}
-	if (stmt->new_tuple) {
-		luaT_pushtuple(L, stmt->new_tuple);
-	} else {
+	int top = lua_gettop(L);
+	lua_pushstring(L, "id");
+	lua_pushnumber(L, seq->def->id);
+	lua_settable(L, top);
+	lua_pushstring(L, "uid");
+	lua_pushnumber(L, seq->def->uid);
+	lua_settable(L, top);
+	lua_pushstring(L, "name");
+	lua_pushstring(L, seq->def->name);
+	lua_settable(L, top);
+	lua_pushstring(L, "step");
+	luaL_pushint64(L, seq->def->step);
+	lua_settable(L, top);
+	lua_pushstring(L, "min");
+	luaL_pushint64(L, seq->def->min);
+	lua_settable(L, top);
+	lua_pushstring(L, "max");
+	luaL_pushint64(L, seq->def->max);
+	lua_settable(L, top);
+	lua_pushstring(L, "start");
+	luaL_pushint64(L, seq->def->start);
+	lua_settable(L, top);
+	lua_pushstring(L, "cache");
+	luaL_pushint64(L, seq->def->cache);
+	lua_settable(L, top);
+	lua_pushstring(L, "cycle");
+	lua_pushboolean(L, seq->def->cycle);
+	lua_settable(L, top);
+
+	/* Bless sequence object. */
+	lua_getfield(L, LUA_GLOBALSINDEX, "box");
+	lua_pushstring(L, "schema");
+	lua_gettable(L, -2);
+	lua_pushstring(L, "sequence");
+	lua_gettable(L, -2);
+	lua_pushstring(L, "bless");
+	lua_gettable(L, -2);
+
+	lua_pushvalue(L, top);
+	lua_call(L, 1, 0);
+	lua_pop(L, 3);
+
+	lua_setfield(L, -2, seq->def->name);
+
+	lua_pop(L, 2);
+}
+
+static void
+lbox_sequence_delete(struct lua_State *L, struct sequence *seq)
+{
+	lua_getfield(L, LUA_GLOBALSINDEX, "box");
+	lua_getfield(L, -1, "sequence");
+	lua_rawgeti(L, -1, seq->def->id);
+	if (!lua_isnil(L, -1)) {
+		lua_getfield(L, -1, "name");
 		lua_pushnil(L);
+		lua_rawset(L, -4);
+		lua_pop(L, 1); /* pop sequence */
+		lua_pushnil(L);
+		lua_rawseti(L, -2, seq->def->id);
+	} else {
+		lua_pop(L, 1);
 	}
-	return 2;
+	lua_pop(L, 2); /* box, sequence */
 }
 
-static int
-lbox_sequence_on_alter(struct lua_State *L)
+static void
+lbox_sequence_new_or_delete(struct trigger *trigger, void *event)
 {
-	return lbox_trigger_reset(L, 2, &on_alter_sequence,
-				  lbox_sequence_push_on_alter_event, NULL);
+	struct lua_State *L = trigger->data;
+	struct sequence *seq = event;
+	if (sequence_by_id(seq->def->id) != NULL)
+		lbox_sequence_new(L, seq);
+	else
+		lbox_sequence_delete(L, seq);
 }
 
 void
@@ -99,9 +175,13 @@ box_lua_sequence_init(struct lua_State *L)
 		{"next", lbox_sequence_next},
 		{"set", lbox_sequence_set},
 		{"reset", lbox_sequence_reset},
-		{"on_alter", lbox_sequence_on_alter},
 		{NULL, NULL}
 	};
 	luaL_register(L, "box.internal.sequence", sequence_internal_lib);
 	lua_pop(L, 1);
+
+	static struct trigger on_alter_sequence_in_lua;
+	trigger_create(&on_alter_sequence_in_lua,
+		       lbox_sequence_new_or_delete, L, NULL);
+	trigger_add(&on_alter_sequence, &on_alter_sequence_in_lua);
 }
diff --git a/src/box/schema.cc b/src/box/schema.cc
index 64412fac..3f90b8d4 100644
--- a/src/box/schema.cc
+++ b/src/box/schema.cc
@@ -635,41 +635,24 @@ sequence_cache_find(uint32_t id)
 }
 
 void
-sequence_cache_replace(struct sequence_def *def)
+sequence_cache_insert(struct sequence *seq)
 {
-	struct sequence *seq = sequence_by_id(def->id);
-	if (seq == NULL) {
-		/* Create a new sequence. */
-		seq = (struct sequence *) calloc(1, sizeof(*seq));
-		if (seq == NULL)
-			goto error;
-		struct mh_i32ptr_node_t node = { def->id, seq };
-		if (mh_i32ptr_put(sequences, &node, NULL, NULL) ==
-		    mh_end(sequences))
-			goto error;
-	} else {
-		/* Update an existing sequence. */
-		free(seq->def);
+	assert(sequence_by_id(seq->def->id) == NULL);
+
+	struct mh_i32ptr_node_t node = { seq->def->id, seq };
+	mh_int_t k = mh_i32ptr_put(sequences, &node, NULL, NULL);
+	if (k == mh_end(sequences)) {
+		panic_syserror("Out of memory for the data "
+			       "dictionary cache (sequence).");
 	}
-	seq->def = def;
-	return;
-error:
-	panic_syserror("Out of memory for the data "
-		       "dictionary cache (sequence).");
 }
 
 void
 sequence_cache_delete(uint32_t id)
 {
-	struct sequence *seq = sequence_by_id(id);
-	if (seq != NULL) {
-		/* Delete sequence data. */
-		sequence_reset(seq);
-		mh_i32ptr_del(sequences, seq->def->id, NULL);
-		free(seq->def);
-		TRASH(seq);
-		free(seq);
-	}
+	mh_int_t k = mh_i32ptr_find(sequences, id, NULL);
+	if (k != mh_end(sequences))
+		mh_i32ptr_del(sequences, k, NULL);
 }
 
 const char *
diff --git a/src/box/schema.h b/src/box/schema.h
index 30366382..84f0d33f 100644
--- a/src/box/schema.h
+++ b/src/box/schema.h
@@ -205,12 +205,12 @@ struct sequence *
 sequence_cache_find(uint32_t id);
 
 /**
- * Insert a new sequence object into the cache or update
- * an existing one if there's already a sequence with
- * the given id in the cache.
+ * Insert a new sequence object into the cache.
+ * There must not be a sequence with the same id
+ * in the cache.
  */
 void
-sequence_cache_replace(struct sequence_def *def);
+sequence_cache_insert(struct sequence *seq);
 
 /** Delete a sequence from the sequence cache. */
 void
diff --git a/src/box/sequence.c b/src/box/sequence.c
index c9828c0d..2bf38f99 100644
--- a/src/box/sequence.c
+++ b/src/box/sequence.c
@@ -127,6 +127,27 @@ sequence_free(void)
 	mempool_destroy(&sequence_data_extent_pool);
 }
 
+struct sequence *
+sequence_new(struct sequence_def *def)
+{
+	struct sequence *seq = calloc(1, sizeof(*seq));
+	if (seq == NULL) {
+		diag_set(OutOfMemory, sizeof(*seq), "malloc", "sequence");
+		return NULL;
+	}
+	seq->def = def;
+	return seq;
+}
+
+void
+sequence_delete(struct sequence *seq)
+{
+	/* Delete sequence data. */
+	sequence_reset(seq);
+	free(seq->def);
+	free(seq);
+}
+
 void
 sequence_reset(struct sequence *seq)
 {
diff --git a/src/box/sequence.h b/src/box/sequence.h
index 44194279..9a745ad5 100644
--- a/src/box/sequence.h
+++ b/src/box/sequence.h
@@ -36,6 +36,7 @@
 #include <stddef.h>
 #include <stdint.h>
 
+#include "diag.h"
 #include "user_def.h"
 
 #if defined(__cplusplus)
@@ -99,6 +100,22 @@ sequence_init(void);
 void
 sequence_free(void);
 
+/**
+ * Create a new sequence object with the given definition.
+ * Note, on success the sequence definition is assigned to
+ * the new sequence and will be freed automatically when
+ * the sequence is destroyed so it must be allocated with
+ * malloc().
+ */
+struct sequence *
+sequence_new(struct sequence_def *def);
+
+/**
+ * Destroy a sequence and its definition.
+ */
+void
+sequence_delete(struct sequence *seq);
+
 /** Reset a sequence. */
 void
 sequence_reset(struct sequence *seq);
@@ -155,6 +172,16 @@ sequence_data_iterator_create(void);
 
 #if defined(__cplusplus)
 } /* extern "C" */
+
+static inline struct sequence *
+sequence_new_xc(struct sequence_def *def)
+{
+	struct sequence *seq = sequence_new(def);
+	if (seq == NULL)
+		diag_raise();
+	return seq;
+}
+
 #endif /* defined(__cplusplus) */
 
 #endif /* INCLUDES_TARANTOOL_BOX_SEQUENCE_H */
diff --git a/test/box/sequence.result b/test/box/sequence.result
index 990d15db..2c0c0a96 100644
--- a/test/box/sequence.result
+++ b/test/box/sequence.result
@@ -2153,3 +2153,45 @@ s:insert{{a = 3, b = box.NULL}}
 s:drop()
 ---
 ...
+--
+-- Check that sequence cache is updated synchronously with _sequence changes.
+--
+box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback()
+---
+...
+sq ~= nil
+---
+- true
+...
+box.sequence.test == nil
+---
+- true
+...
+sq = box.schema.sequence.create('test')
+---
+...
+box.begin() sq:alter{step = 10} step = sq.step box.rollback()
+---
+...
+step -- 10
+---
+- 10
+...
+sq.step -- 1
+---
+- 1
+...
+box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback()
+---
+...
+sq == nil
+---
+- true
+...
+box.sequence.test ~= nil
+---
+- true
+...
+box.sequence.test:drop()
+---
+...
diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua
index 3375572e..3c8140e8 100644
--- a/test/box/sequence.test.lua
+++ b/test/box/sequence.test.lua
@@ -727,3 +727,18 @@ pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.a'}} -- error
 pk:alter{parts = {{'y.b', 'unsigned'}}, sequence = {field = 'y.b'}} -- ok
 s:insert{{a = 3, b = box.NULL}}
 s:drop()
+
+--
+-- Check that sequence cache is updated synchronously with _sequence changes.
+--
+box.begin() box.schema.sequence.create('test') sq = box.sequence.test box.rollback()
+sq ~= nil
+box.sequence.test == nil
+sq = box.schema.sequence.create('test')
+box.begin() sq:alter{step = 10} step = sq.step box.rollback()
+step -- 10
+sq.step -- 1
+box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollback()
+sq == nil
+box.sequence.test ~= nil
+box.sequence.test:drop()
-- 
2.11.0




More information about the Tarantool-patches mailing list