[PATCH 05/10] ddl: fix _space_sequence rollback

Vladimir Davydov vdavydov.dev at gmail.com
Wed Jul 3 22:30:07 MSK 2019


_space_sequence changes are not rolled back properly. Fix it keeping in
mind that our ultimate goal is to implement transactional DDL, which
implies that all changes to the schema should be done synchronously,
i.e. on_replace, not on_commit.
---
 src/box/alter.cc           | 138 +++++++++++++++++++++++++++++++--------------
 test/box/sequence.result   |  54 ++++++++++++++++++
 test/box/sequence.test.lua |  19 +++++++
 3 files changed, 170 insertions(+), 41 deletions(-)

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 7aeed834..89ab9c65 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -126,7 +126,8 @@ access_check_ddl(const char *name, uint32_t object_id, uint32_t owner_uid,
  */
 static void
 index_def_check_sequence(struct index_def *index_def, uint32_t sequence_fieldno,
-			 const char *sequence_path, const char *space_name)
+			 const char *sequence_path, uint32_t sequence_path_len,
+			 const char *space_name)
 {
 	struct key_def *key_def = index_def->key_def;
 	struct key_part *sequence_part = NULL;
@@ -137,7 +138,7 @@ index_def_check_sequence(struct index_def *index_def, uint32_t sequence_fieldno,
 		if ((part->path == NULL && sequence_path == NULL) ||
 		    (part->path != NULL && sequence_path != NULL &&
 		     json_path_cmp(part->path, part->path_len,
-				   sequence_path, strlen(sequence_path),
+				   sequence_path, sequence_path_len,
 				   TUPLE_INDEX_BASE) == 0)) {
 			sequence_part = part;
 			break;
@@ -302,7 +303,10 @@ index_def_new_from_tuple(struct tuple *tuple, struct space *space)
 	space_check_index_def_xc(space, index_def);
 	if (index_def->iid == 0 && space->sequence != NULL)
 		index_def_check_sequence(index_def, space->sequence_fieldno,
-					 space->sequence_path, space_name(space));
+					 space->sequence_path,
+					 space->sequence_path != NULL ?
+					 strlen(space->sequence_path) : 0,
+					 space_name(space));
 	index_def_guard.is_active = false;
 	return index_def;
 }
@@ -3484,12 +3488,83 @@ on_replace_dd_sequence_data(struct trigger * /* trigger */, void *event)
 }
 
 /**
- * Run the triggers registered on commit of a change in _space.
+ * Extract field number and path from _space_sequence tuple.
+ * The path is allocated using malloc().
  */
+static uint32_t
+sequence_field_from_tuple(struct space *space, struct tuple *tuple,
+			  char **path_ptr)
+{
+	struct index *pk = index_find_xc(space, 0);
+	struct key_part *part = &pk->def->key_def->parts[0];
+	uint32_t fieldno = part->fieldno;
+	const char *path_raw = part->path;
+	uint32_t path_len = part->path_len;
+
+	/* Sequence field was added in 2.2.1. */
+	if (tuple_field_count(tuple) > BOX_SPACE_SEQUENCE_FIELD_FIELDNO) {
+		fieldno = tuple_field_u32_xc(tuple,
+				BOX_SPACE_SEQUENCE_FIELD_FIELDNO);
+		path_raw = tuple_field_str_xc(tuple,
+				BOX_SPACE_SEQUENCE_FIELD_PATH, &path_len);
+		if (path_len == 0)
+			path_raw = NULL;
+	}
+	index_def_check_sequence(pk->def, fieldno, path_raw, path_len,
+				 space_name(space));
+	char *path = NULL;
+	if (path_raw != NULL) {
+		path = (char *)malloc(path_len + 1);
+		if (path == NULL)
+			tnt_raise(OutOfMemory, path_len + 1,
+				  "malloc", "sequence path");
+		memcpy(path, path_raw, path_len);
+		path[path_len] = 0;
+	}
+	*path_ptr = path;
+	return fieldno;
+}
+
+/** Attach a sequence to a space on rollback in _space_sequence. */
+static void
+set_space_sequence(struct trigger *trigger, void * /* event */)
+{
+	struct tuple *tuple = (struct tuple *)trigger->data;
+	uint32_t space_id = tuple_field_u32_xc(tuple,
+			BOX_SPACE_SEQUENCE_FIELD_ID);
+	uint32_t sequence_id = tuple_field_u32_xc(tuple,
+			BOX_SPACE_SEQUENCE_FIELD_SEQUENCE_ID);
+	bool is_generated = tuple_field_bool_xc(tuple,
+			BOX_SPACE_SEQUENCE_FIELD_IS_GENERATED);
+	struct space *space = space_by_id(space_id);
+	assert(space != NULL);
+	struct sequence *seq = sequence_by_id(sequence_id);
+	assert(seq != NULL);
+	char *path;
+	uint32_t fieldno = sequence_field_from_tuple(space, tuple, &path);
+	seq->is_generated = is_generated;
+	space->sequence = seq;
+	space->sequence_fieldno = fieldno;
+	free(space->sequence_path);
+	space->sequence_path = path;
+	trigger_run_xc(&on_alter_space, space);
+}
+
+/** Detach a sequence from a space on rollback in _space_sequence. */
 static void
-on_commit_dd_space_sequence(struct trigger *trigger, void * /* event */)
+clear_space_sequence(struct trigger *trigger, void * /* event */)
 {
-	struct space *space = (struct space *) trigger->data;
+	struct tuple *tuple = (struct tuple *)trigger->data;
+	uint32_t space_id = tuple_field_u32_xc(tuple,
+			BOX_SPACE_SEQUENCE_FIELD_ID);
+	struct space *space = space_by_id(space_id);
+	assert(space != NULL);
+	assert(space->sequence != NULL);
+	space->sequence->is_generated = false;
+	space->sequence = NULL;
+	space->sequence_fieldno = 0;
+	free(space->sequence_path);
+	space->sequence_path = NULL;
 	trigger_run_xc(&on_alter_space, space);
 }
 
@@ -3537,65 +3612,46 @@ on_replace_dd_space_sequence(struct trigger * /* trigger */, void *event)
 	access_check_ddl(space->def->name, space->def->id, space->def->uid,
 			 SC_SPACE, PRIV_A);
 
-	struct trigger *on_commit =
-		txn_alter_trigger_new(on_commit_dd_space_sequence, space);
-	txn_on_commit(txn, on_commit);
-
 	if (stmt->new_tuple != NULL) {			/* INSERT, UPDATE */
-		struct index *pk = index_find_xc(space, 0);
-
-		/* Sequence field was added in 2.2.1. */
+		char *sequence_path;
 		uint32_t sequence_fieldno;
-		const char *sequence_path_raw;
-		uint32_t sequence_path_len;
-		if (tuple_field_count(tuple) > BOX_SPACE_SEQUENCE_FIELD_FIELDNO) {
-			sequence_fieldno = tuple_field_u32_xc(tuple,
-					BOX_SPACE_SEQUENCE_FIELD_FIELDNO);
-			sequence_path_raw = tuple_field_str_xc(tuple,
-					BOX_SPACE_SEQUENCE_FIELD_PATH,
-					&sequence_path_len);
-		} else {
-			struct key_part *part = &pk->def->key_def->parts[0];
-			sequence_fieldno = part->fieldno;
-			sequence_path_raw = part->path;
-			sequence_path_len = part->path_len;
-		}
-		char *sequence_path = NULL;
-		if (sequence_path_len > 0) {
-			sequence_path = (char *)malloc(sequence_path_len + 1);
-			if (sequence_path == NULL)
-				tnt_raise(OutOfMemory, sequence_path_len + 1,
-					  "malloc", "sequence path");
-			memcpy(sequence_path, sequence_path_raw,
-			       sequence_path_len);
-			sequence_path[sequence_path_len] = 0;
-		}
+		sequence_fieldno = sequence_field_from_tuple(space, tuple,
+							     &sequence_path);
 		auto sequence_path_guard = make_scoped_guard([=] {
 			free(sequence_path);
 		});
-
-		index_def_check_sequence(pk->def, sequence_fieldno,
-					 sequence_path,
-					 space_name(space));
 		if (seq->is_generated) {
 			tnt_raise(ClientError, ER_ALTER_SPACE,
 				  space_name(space),
 				  "can not attach generated sequence");
 		}
+		struct trigger *on_rollback;
+		if (stmt->old_tuple != NULL)
+			on_rollback = txn_alter_trigger_new(set_space_sequence,
+							    stmt->old_tuple);
+		else
+			on_rollback = txn_alter_trigger_new(clear_space_sequence,
+							    stmt->new_tuple);
 		seq->is_generated = is_generated;
 		space->sequence = seq;
 		space->sequence_fieldno = sequence_fieldno;
 		free(space->sequence_path);
 		space->sequence_path = sequence_path;
 		sequence_path_guard.is_active = false;
+		txn_on_rollback(txn, on_rollback);
 	} else {					/* DELETE */
+		struct trigger *on_rollback;
+		on_rollback = txn_alter_trigger_new(set_space_sequence,
+						    stmt->old_tuple);
 		assert(space->sequence == seq);
 		seq->is_generated = false;
 		space->sequence = NULL;
 		space->sequence_fieldno = 0;
 		free(space->sequence_path);
 		space->sequence_path = NULL;
+		txn_on_rollback(txn, on_rollback);
 	}
+	trigger_run_xc(&on_alter_space, space);
 }
 
 /* }}} sequence */
diff --git a/test/box/sequence.result b/test/box/sequence.result
index 2c0c0a96..99eed0ec 100644
--- a/test/box/sequence.result
+++ b/test/box/sequence.result
@@ -2195,3 +2195,57 @@ box.sequence.test ~= nil
 box.sequence.test:drop()
 ---
 ...
+--
+-- Check that changes to _space_sequence are rolled back properly.
+--
+s = box.schema.space.create('test')
+---
+...
+_ = s:create_index('pk')
+---
+...
+sq = box.schema.sequence.create('test')
+---
+...
+box.begin() box.space._space_sequence:insert{s.id, sq.id, false, 0, ''} id = s.index.pk.sequence_id box.rollback()
+---
+...
+id == sq.id
+---
+- true
+...
+s.index.pk.sequence_id == nil
+---
+- true
+...
+s:insert{box.NULL} -- error
+---
+- error: 'Tuple field 1 type does not match one required by operation: expected unsigned'
+...
+s.index.pk:alter{sequence = sq}
+---
+...
+box.begin() box.space._space_sequence:delete{s.id} id = s.index.pk.sequence_id box.rollback()
+---
+...
+id == nil
+---
+- true
+...
+s.index.pk.sequence_id == sq.id
+---
+- true
+...
+s:insert{box.NULL} -- ok
+---
+- [1]
+...
+s.index.pk:alter{sequence = false}
+---
+...
+sq:drop()
+---
+...
+s:drop()
+---
+...
diff --git a/test/box/sequence.test.lua b/test/box/sequence.test.lua
index 3c8140e8..9615c447 100644
--- a/test/box/sequence.test.lua
+++ b/test/box/sequence.test.lua
@@ -742,3 +742,22 @@ box.begin() box.space._sequence:delete{sq.id} sq = box.sequence.test box.rollbac
 sq == nil
 box.sequence.test ~= nil
 box.sequence.test:drop()
+
+--
+-- Check that changes to _space_sequence are rolled back properly.
+--
+s = box.schema.space.create('test')
+_ = s:create_index('pk')
+sq = box.schema.sequence.create('test')
+box.begin() box.space._space_sequence:insert{s.id, sq.id, false, 0, ''} id = s.index.pk.sequence_id box.rollback()
+id == sq.id
+s.index.pk.sequence_id == nil
+s:insert{box.NULL} -- error
+s.index.pk:alter{sequence = sq}
+box.begin() box.space._space_sequence:delete{s.id} id = s.index.pk.sequence_id box.rollback()
+id == nil
+s.index.pk.sequence_id == sq.id
+s:insert{box.NULL} -- ok
+s.index.pk:alter{sequence = false}
+sq:drop()
+s:drop()
-- 
2.11.0




More information about the Tarantool-patches mailing list