[Tarantool-patches] [PATCH v2 2/2] sql: make constraint operations transactional

Roman Khabibov roman.habibov at tarantool.org
Fri Nov 29 10:38:43 MSK 2019


Forgot: I can’t space name in ER_CONSTRAINT_EXISTS (build.c), because I can’t get any info
about space in build.c during ALTER TABLE ADD CONSTRAINT.

> On Nov 28, 2019, at 21:34, Roman Khabibov <roman.habibov at tarantool.org> wrote:
> 
> Put constraint names into the space's hash table and drop them on
> replace in correspondig system spaces (_index, _fk_constraint,
> _ck_constraint).
> 
> Closes #3503
> ---
> src/box/alter.cc             | 427 +++++++++++++++++++++++++++++++----
> test/sql/constraint.result   | 190 ++++++++++++++++
> test/sql/constraint.test.lua |  84 +++++++
> 3 files changed, 654 insertions(+), 47 deletions(-)
> create mode 100644 test/sql/constraint.result
> create mode 100755 test/sql/constraint.test.lua
> 
> diff --git a/src/box/alter.cc b/src/box/alter.cc
> index 4a3241a79..4bb8b8556 100644
> --- a/src/box/alter.cc
> +++ b/src/box/alter.cc
> @@ -57,6 +57,7 @@
> #include "version.h"
> #include "sequence.h"
> #include "sql.h"
> +#include "constraint_def.h"
> 
> /* {{{ Auxiliary functions and methods. */
> 
> @@ -1765,6 +1766,38 @@ MoveCkConstraints::rollback(struct alter_space *alter)
> 	space_swap_ck_constraint(alter->new_space, alter->old_space);
> }
> 
> +/**
> + * Move constraint names hash table from old space to a new one.
> + */
> +class MoveConstraints: public AlterSpaceOp
> +{
> +	inline void space_swap_constraint(struct space *old_space,
> +				   struct space *new_space);
> +public:
> +	MoveConstraints(struct alter_space *alter) : AlterSpaceOp(alter) {}
> +	virtual void alter(struct alter_space *alter);
> +	virtual void rollback(struct alter_space *alter);
> +};
> +
> +inline void
> +MoveConstraints::space_swap_constraint(struct space *old_space,
> +				       struct space *new_space)
> +{
> +	SWAP(new_space->constraint_names, old_space->constraint_names);
> +}
> +
> +void
> +MoveConstraints::alter(struct alter_space *alter)
> +{
> +	space_swap_constraint(alter->old_space, alter->new_space);
> +}
> +
> +void
> +MoveConstraints::rollback(struct alter_space *alter)
> +{
> +	space_swap_constraint(alter->new_space, alter->old_space);
> +}
> +
> /* }}} */
> 
> /**
> @@ -2307,6 +2340,7 @@ on_replace_dd_space(struct trigger * /* trigger */, void *event)
> 		def_guard.is_active = false;
> 		/* Create MoveIndex ops for all space indexes. */
> 		alter_space_move_indexes(alter, 0, old_space->index_id_max + 1);
> +		(void) new MoveConstraints(alter);
> 		/* Remember to update schema_version. */
> 		(void) new UpdateSchemaVersion(alter);
> 		try {
> @@ -2341,6 +2375,150 @@ index_is_used_by_fk_constraint(struct rlist *fk_list, uint32_t iid)
> 	return false;
> }
> 
> +/**
> + * Bring back the node of an unique index to the constraint hash
> + * table of a space.
> + *
> + * Rollback DROP index.
> + */
> +static int
> +on_drop_index_rollback(struct trigger *trigger, void * /* event */)
> +{
> +	struct constraint_def *def = (struct constraint_def *)trigger->data;
> +	struct space *space = space_by_id(def->space_id);
> +	assert(space != NULL);
> +	if (space_put_constraint(space, def) != 0) {
> +		panic("Can't recover after index drop rollback (out of "
> +		      "memory)");
> +	}
> +	return 0;
> +}
> +
> +/**
> + * Drop the node of an unique index from the constraint hash table
> + * of a space and clean the unnecessary constraint def memory.
> + *
> + * Rollback CREATE index.
> + */
> +static int
> +on_create_index_rollback(struct trigger *trigger, void * /* event */)
> +{
> +	struct constraint_def *def = (struct constraint_def *)trigger->data;
> +	struct space *space = space_by_id(def->space_id);
> +	assert(space != NULL);
> +	space_drop_constraint(space, def->name);
> +	constraint_def_free(def);
> +	return 0;
> +}
> +
> +/**
> + * Clean the unnecessary constraint def memory.
> + *
> + * Commit REPLACE index.
> + */
> +static int
> +on_replace_index_commit(struct trigger *trigger, void * /* event */)
> +{
> +	struct constraint_def **defs = (struct constraint_def **)trigger->data;
> +	constraint_def_free(defs[0]);
> +	return 0;
> +}
> +
> +/**
> + * Bring back the new node of a unique index to the constraint
> + * hash table of a space and drop the old node respectively. Clean
> + * the unnecessary constraint def memory.
> + *
> + * Rollback REPLACE index.
> + */
> +static int
> +on_replace_index_rollback(struct trigger *trigger, void * /* event */)
> +{
> +	struct constraint_def **defs = (struct constraint_def **)trigger->data;
> +	struct constraint_def *old_def = defs[0];
> +	struct constraint_def *new_def = defs[1];
> +	struct space *space = space_by_id(old_def->space_id);
> +	assert(space != NULL);
> +	space_drop_constraint(space, new_def->name);
> +	constraint_def_free(new_def);
> +	if (space_put_constraint(space, old_def) != 0) {
> +		panic("Can't recover after index replace rollback (out of "
> +		      "memory)");
> +	}
> +	return 0;
> +}
> +
> +/**
> + * Put the node of an unique index to the constraint hash table of
> + * @a space and create trigger on rollback.
> + *
> + * This function is needed to wrap the duplicated piece of code
> + * inside on_replace_dd_index().
> + *
> + * @param stmt  Statement.
> + * @param space Space.
> + * @param def   Index def.
> + *
> + * @retval 0  Success.
> + * @retval -1 Constraint already exists or out of memory.
> + */
> +static int
> +create_index_as_constraint(struct txn_stmt *stmt, struct space *space,
> +			   struct index_def *def) {
> +	assert(def->opts.is_unique);
> +	if (space_constraint_def_by_name(space, def->name) != NULL) {
> +		diag_set(ClientError, ER_CONSTRAINT_EXISTS, def->name);
> +		return -1;
> +	}
> +	struct constraint_def *constr_def =
> +		constraint_def_new(space->def->id, def->iid == 0 ?
> +				   CONSTRAINT_TYPE_PK : CONSTRAINT_TYPE_UNIQUE,
> +				   def->name);
> +	if (constr_def == NULL)
> +		return -1;
> +	struct trigger *on_rollback = txn_alter_trigger_new(NULL, NULL);
> +	if (space_put_constraint(space, constr_def) != 0 ||
> +	    on_rollback == NULL) {
> +		constraint_def_free(constr_def);
> +		return -1;
> +	}
> +	on_rollback->data = constr_def;
> +	on_rollback->run = on_create_index_rollback;
> +	txn_stmt_on_rollback(stmt, on_rollback);
> +	return 0;
> +}
> +
> +/**
> + * Drop the node of an unique index from the constraint hash table
> + * of @a space and create trigger on rollback.
> + *
> + * This function is needed to wrap the duplicated piece of code
> + * inside on_replace_dd_index().
> + *
> + * @param stmt  Statement.
> + * @param space Space.
> + * @param def   Index def.
> + *
> + * @retval 0  Success.
> + * @retval -1 Out of memory.
> + */
> +static int
> +drop_index_as_constraint(struct txn_stmt *stmt, struct space *space,
> +			 struct index_def *def) {
> +	assert(def->opts.is_unique);
> +	struct constraint_def *constr_def =
> +		space_constraint_def_by_name(space, def->name);
> +	assert(constr_def != NULL);
> +	space_drop_constraint(space, def->name);
> +	struct trigger *on_rollback = txn_alter_trigger_new(NULL, NULL);
> +	if (on_rollback == NULL)
> +		return -1;
> +	on_rollback->data = constr_def;
> +	on_rollback->run = on_drop_index_rollback;
> +	txn_stmt_on_rollback(stmt, on_rollback);
> +	return 0;
> +}
> +
> /**
>  * Just like with _space, 3 major cases:
>  *
> @@ -2391,21 +2569,21 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 	if (tuple_field_u32(old_tuple ? old_tuple : new_tuple,
> 			    BOX_INDEX_FIELD_ID, &iid) != 0)
> 		return -1;
> -	struct space *old_space = space_cache_find(id);
> -	if (old_space == NULL)
> +	struct space *space = space_cache_find(id);
> +	if (space == NULL)
> 		return -1;
> -	if (old_space->def->opts.is_view) {
> -		diag_set(ClientError, ER_ALTER_SPACE, space_name(old_space),
> +	if (space->def->opts.is_view) {
> +		diag_set(ClientError, ER_ALTER_SPACE, space_name(space),
> 			  "can not add index on a view");
> 		return -1;
> 	}
> 	enum priv_type priv_type = new_tuple ? PRIV_C : PRIV_D;
> 	if (old_tuple && new_tuple)
> 		priv_type = PRIV_A;
> -	if (access_check_ddl(old_space->def->name, old_space->def->id,
> -			 old_space->def->uid, SC_SPACE, priv_type) != 0)
> +	if (access_check_ddl(space->def->name, space->def->id, space->def->uid,
> +			     SC_SPACE, priv_type) != 0)
> 		return -1;
> -	struct index *old_index = space_index(old_space, iid);
> +	struct index *old_index = space_index(space, iid);
> 
> 	/*
> 	 * Deal with various cases of dropping of the primary key.
> @@ -2414,43 +2592,41 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 		/*
> 		 * Dropping the primary key in a system space: off limits.
> 		 */
> -		if (space_is_system(old_space)) {
> +		if (space_is_system(space)) {
> 			diag_set(ClientError, ER_LAST_DROP,
> -				  space_name(old_space));
> +				 space_name(space));
> 			return -1;
> 		}
> 		/*
> 		 * Can't drop primary key before secondary keys.
> 		 */
> -		if (old_space->index_count > 1) {
> +		if (space->index_count > 1) {
> 			diag_set(ClientError, ER_DROP_PRIMARY_KEY,
> -				  space_name(old_space));
> +				 space_name(space));
> 			return -1;
> 		}
> 		/*
> 		 * Can't drop primary key before space sequence.
> 		 */
> -		if (old_space->sequence != NULL) {
> -			diag_set(ClientError, ER_ALTER_SPACE,
> -				  space_name(old_space),
> +		if (space->sequence != NULL) {
> +			diag_set(ClientError, ER_ALTER_SPACE, space_name(space),
> 				  "can not drop primary key while "
> 				  "space sequence exists");
> 			return -1;
> 		}
> 	}
> 
> -	if (iid != 0 && space_index(old_space, 0) == NULL) {
> +	if (iid != 0 && space_index(space, 0) == NULL) {
> 		/*
> 		 * A secondary index can not be created without
> 		 * a primary key.
> 		 */
> -		diag_set(ClientError, ER_ALTER_SPACE,
> -			  space_name(old_space),
> +		diag_set(ClientError, ER_ALTER_SPACE, space_name(space),
> 			  "can not add a secondary key before primary");
> 		return -1;
> 	}
> 
> -	struct alter_space *alter = alter_space_new(old_space);
> +	struct alter_space *alter = alter_space_new(space);
> 	if (alter == NULL)
> 		return -1;
> 	auto scoped_guard =
> @@ -2469,35 +2645,111 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 		 * Can't drop index if foreign key constraints
> 		 * references this index.
> 		 */
> -		if (index_is_used_by_fk_constraint(&old_space->parent_fk_constraint,
> +		if (index_is_used_by_fk_constraint(&space->parent_fk_constraint,
> 						   iid)) {
> -			diag_set(ClientError, ER_ALTER_SPACE,
> -				  space_name(old_space),
> +			diag_set(ClientError, ER_ALTER_SPACE, space_name(space),
> 				  "can not drop a referenced index");
> 			return -1;
> 		}
> 		alter_space_move_indexes(alter, 0, iid);
> 		(void) new DropIndex(alter, old_index);
> +		if (old_index->def->opts.is_unique &&
> +		    drop_index_as_constraint(stmt, space, old_index->def) != 0)
> +			return -1;
> 	}
> 	/* Case 2: create an index, if it is simply created. */
> 	if (old_index == NULL && new_tuple != NULL) {
> +		struct index_def *def =
> +			index_def_new_from_tuple(new_tuple, space);
> +		if (def == NULL)
> +			return -1;
> +		if (def->opts.is_unique) {
> +			if (create_index_as_constraint(stmt, space, def) != 0) {
> +				index_def_delete(def);
> +				return -1;
> +			}
> +		}
> 		alter_space_move_indexes(alter, 0, iid);
> 		CreateIndex *create_index = new CreateIndex(alter);
> -		create_index->new_index_def =
> -			index_def_new_from_tuple(new_tuple, old_space);
> -		if (create_index->new_index_def == NULL)
> -			return -1;
> -		index_def_update_optionality(create_index->new_index_def,
> -					     alter->new_min_field_count);
> +		create_index->new_index_def = def;
> +		index_def_update_optionality(def, alter->new_min_field_count);
> 	}
> 	/* Case 3 and 4: check if we need to rebuild index data. */
> 	if (old_index != NULL && new_tuple != NULL) {
> 		struct index_def *index_def;
> -		index_def = index_def_new_from_tuple(new_tuple, old_space);
> +		index_def = index_def_new_from_tuple(new_tuple, space);
> 		if (index_def == NULL)
> 			return -1;
> 		auto index_def_guard =
> 			make_scoped_guard([=] { index_def_delete(index_def); });
> +		struct index_def *old_def = old_index->def;
> +		/**
> +		 * We put a new name either an index is becoming
> +		 * unique (i.e. constraint), or when an unique
> +		 * index's name is under change.
> +		 */
> +		if (!old_def->opts.is_unique && index_def->opts.is_unique) {
> +			if (create_index_as_constraint(stmt, space, index_def)
> +			    != 0)
> +				return -1;
> +		}
> +		if (old_def->opts.is_unique && index_def->opts.is_unique &&
> +		    strcmp(index_def->name, old_def->name) != 0) {
> +			if (space_constraint_def_by_name(space, index_def->name)
> +			    != NULL) {
> +				diag_set(ClientError, ER_CONSTRAINT_EXISTS,
> +					 index_def->name);
> +				return -1;
> +			}
> +			struct constraint_def *old_constr_def =
> +				space_constraint_def_by_name(space,
> +							     old_def->name);
> +			assert(old_constr_def != NULL);
> +			struct constraint_def *new_constr_def =
> +				constraint_def_new(old_constr_def->space_id,
> +						   old_constr_def->type,
> +						   index_def->name);
> +			if (new_constr_def == NULL)
> +				return -1;
> +			auto new_constraint_def_guard =
> +				make_scoped_guard([=] { constraint_def_free(new_constr_def); });
> +			if (space_put_constraint(space, new_constr_def) != 0)
> +				return -1;
> +			space_drop_constraint(space, old_def->name);
> +			/**
> +			 * Array with the pair of 2
> +			 * index_def structures: old and
> +			 * new.
> +			 */
> +			uint32_t size = sizeof(struct constraint_def *) * 2;
> +			struct region *r = &fiber()->gc;
> +			struct constraint_def **defs =
> +				(struct constraint_def **)region_alloc(r, size);
> +			if (defs == NULL) {
> +				diag_set(OutOfMemory, size, "region",
> +					 "new slab");
> +				return -1;
> +			}
> +			defs[0] = old_constr_def;
> +			defs[1] = new_constr_def;
> +			struct trigger *on_commit =
> +				txn_alter_trigger_new(NULL, NULL);
> +			struct trigger *on_rollback =
> +				txn_alter_trigger_new(NULL, NULL);
> +			if (on_commit == NULL || on_rollback == NULL)
> +				return -1;
> +			on_commit->data = defs;
> +			on_commit->run = on_replace_index_commit;
> +			on_rollback->data = defs;
> +			on_rollback->run = on_replace_index_rollback;
> +			txn_stmt_on_commit(stmt, on_commit);
> +			txn_stmt_on_rollback(stmt, on_rollback);
> +			new_constraint_def_guard.is_active = false;
> +		}
> +		if (old_def->opts.is_unique && !index_def->opts.is_unique) {
> +			if (drop_index_as_constraint(stmt, space, old_def) != 0)
> +				return -1;
> +		}
> 		/*
> 		 * To detect which key parts are optional,
> 		 * min_field_count is required. But
> @@ -2508,30 +2760,29 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 		 * index on a non-nullable second field. Min field
> 		 * count here is 2. Now alter the secondary index
> 		 * to make its part be nullable. In the
> -		 * 'old_space' min_field_count is still 2, but
> +		 * 'space' min_field_count is still 2, but
> 		 * actually it is already 1. Actual
> 		 * min_field_count must be calculated using old
> 		 * unchanged indexes, NEW definition of an updated
> 		 * index and a space format, defined by a user.
> 		 */
> 		struct key_def **keys;
> -		size_t bsize = old_space->index_count * sizeof(keys[0]);
> +		size_t bsize = space->index_count * sizeof(keys[0]);
> 		keys = (struct key_def **) region_alloc(&fiber()->gc, bsize);
> 		if (keys == NULL) {
> 			diag_set(OutOfMemory, bsize, "region", "new slab");
> 			return -1;
> 		}
> -		for (uint32_t i = 0, j = 0; i < old_space->index_count; ++i) {
> -			struct index_def *d = old_space->index[i]->def;
> +		for (uint32_t i = 0, j = 0; i < space->index_count; ++i) {
> +			struct index_def *d = space->index[i]->def;
> 			if (d->iid != index_def->iid)
> 				keys[j++] = d->key_def;
> 			else
> 				keys[j++] = index_def->key_def;
> 		}
> -		struct space_def *def = old_space->def;
> +		struct space_def *def = space->def;
> 		alter->new_min_field_count =
> -			tuple_format_min_field_count(keys,
> -						     old_space->index_count,
> +			tuple_format_min_field_count(keys, space->index_count,
> 						     def->fields,
> 						     def->field_count);
> 		index_def_update_optionality(index_def,
> @@ -2542,10 +2793,10 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 			(void) new MoveIndex(alter, old_index->def->iid);
> 		} else if (index_def_change_requires_rebuild(old_index,
> 							     index_def)) {
> -			if (index_is_used_by_fk_constraint(&old_space->parent_fk_constraint,
> +			if (index_is_used_by_fk_constraint(&space->parent_fk_constraint,
> 							   iid)) {
> 				diag_set(ClientError, ER_ALTER_SPACE,
> -					  space_name(old_space),
> +					  space_name(space),
> 					  "can not alter a referenced index");
> 				return -1;
> 			}
> @@ -2570,8 +2821,9 @@ on_replace_dd_index(struct trigger * /* trigger */, void *event)
> 	 * Create MoveIndex ops for the remaining indexes in the
> 	 * old space.
> 	 */
> -	alter_space_move_indexes(alter, iid + 1, old_space->index_id_max + 1);
> +	alter_space_move_indexes(alter, iid + 1, space->index_id_max + 1);
> 	(void) new MoveCkConstraints(alter);
> +	(void) new MoveConstraints(alter);
> 	/* Add an op to update schema_version on commit. */
> 	(void) new UpdateSchemaVersion(alter);
> 	try {
> @@ -2663,6 +2915,7 @@ on_replace_dd_truncate(struct trigger * /* trigger */, void *event)
> 	}
> 
> 	(void) new MoveCkConstraints(alter);
> +	(void) new MoveConstraints(alter);
> 	try {
> 		alter_space_do(stmt, alter);
> 	} catch (Exception *e) {
> @@ -4951,8 +5204,15 @@ on_create_fk_constraint_rollback(struct trigger *trigger, void *event)
> 	struct fk_constraint *fk = (struct fk_constraint *)trigger->data;
> 	rlist_del_entry(fk, in_parent_space);
> 	rlist_del_entry(fk, in_child_space);
> +	struct space *child = space_by_id(fk->def->child_id);
> +	const char *name = fk->def->name;
> +	struct constraint_def *constr_def =
> +		space_constraint_def_by_name(child, name);
> +	assert(constr_def != NULL);
> +	space_drop_constraint(child, name);
> +	constraint_def_free(constr_def);
> 	space_reset_fk_constraint_mask(space_by_id(fk->def->parent_id));
> -	space_reset_fk_constraint_mask(space_by_id(fk->def->child_id));
> +	space_reset_fk_constraint_mask(child);
> 	fk_constraint_delete(fk);
> 	return 0;
> }
> @@ -4986,11 +5246,24 @@ on_drop_fk_constraint_rollback(struct trigger *trigger, void *event)
> 	struct space *child = space_by_id(old_fk->def->child_id);
> 	rlist_add_entry(&child->child_fk_constraint, old_fk, in_child_space);
> 	rlist_add_entry(&parent->parent_fk_constraint, old_fk, in_parent_space);
> +	const char *name = old_fk->def->name;
> +	struct constraint_def *constr_def =
> +		constraint_def_new(child->def->id, CONSTRAINT_TYPE_FK, name);
> +	if (constr_def == NULL)
> +		goto panic;
> +	if (space_put_constraint(child, constr_def) != 0) {
> +		constraint_def_free(constr_def);
> +		goto panic;
> +	}
> 	fk_constraint_set_mask(old_fk, &child->fk_constraint_mask,
> 			       FIELD_LINK_CHILD);
> 	fk_constraint_set_mask(old_fk, &parent->fk_constraint_mask,
> 			       FIELD_LINK_PARENT);
> 	return 0;
> +
> +panic:
> +	panic("Can't recover after FK constraint drop rollback (out of "
> +	      "memory)");
> }
> 
> /**
> @@ -5165,15 +5438,31 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
> 		fk->def = fk_def;
> 		fk->index_id = fk_index->def->iid;
> 		if (old_tuple == NULL) {
> +			if (space_constraint_def_by_name(child_space,
> +							 fk_def->name)
> +			    != NULL) {
> +				diag_set(ClientError, ER_CONSTRAINT_EXISTS,
> +					 fk_def->name);
> +				return -1;
> +			}
> 			rlist_add_entry(&child_space->child_fk_constraint,
> 					fk, in_child_space);
> 			rlist_add_entry(&parent_space->parent_fk_constraint,
> 					fk, in_parent_space);
> +			struct constraint_def *constr_def =
> +				constraint_def_new(child_space->def->id,
> +						   CONSTRAINT_TYPE_FK,
> +						   fk_def->name);
> +			if (constr_def == NULL)
> +				return -1;
> 			struct trigger *on_rollback =
> 				txn_alter_trigger_new(on_create_fk_constraint_rollback,
> 						      fk);
> -			if (on_rollback == NULL)
> +			if (space_put_constraint(child_space, constr_def) != 0
> +			    || on_rollback == NULL) {
> +				constraint_def_free(constr_def);
> 				return -1;
> +			}
> 			txn_stmt_on_rollback(stmt, on_rollback);
> 			fk_constraint_set_mask(fk,
> 					       &parent_space->fk_constraint_mask,
> @@ -5221,6 +5510,12 @@ on_replace_dd_fk_constraint(struct trigger * /* trigger*/, void *event)
> 		struct fk_constraint *old_fk=
> 			fk_constraint_remove(&child_space->child_fk_constraint,
> 					     fk_def->name);
> +		const char *name = fk_def->name;
> +		struct constraint_def *constr_def =
> +			space_constraint_def_by_name(child_space, name);
> +		assert(constr_def != NULL);
> +		space_drop_constraint(child_space, name);
> +		constraint_def_free(constr_def);
> 		struct trigger *on_commit =
> 			txn_alter_trigger_new(on_drop_or_replace_fk_constraint_commit,
> 					      old_fk);
> @@ -5297,9 +5592,13 @@ on_create_ck_constraint_rollback(struct trigger *trigger, void * /* event */)
> 	assert(ck != NULL);
> 	struct space *space = space_by_id(ck->def->space_id);
> 	assert(space != NULL);
> -	assert(space_ck_constraint_by_name(space, ck->def->name,
> -					   strlen(ck->def->name)) != NULL);
> +	const char *name = ck->def->name;
> +	assert(space_ck_constraint_by_name(space, name, strlen(name)) != NULL);
> 	space_remove_ck_constraint(space, ck);
> +	struct constraint_def *constr_def =
> +		space_constraint_def_by_name(space, name);
> +	space_drop_constraint(space, name);
> +	constraint_def_free(constr_def);
> 	ck_constraint_delete(ck);
> 	if (trigger_run(&on_alter_space, space) != 0)
> 		return -1;
> @@ -5324,13 +5623,23 @@ on_drop_ck_constraint_rollback(struct trigger *trigger, void * /* event */)
> 	assert(ck != NULL);
> 	struct space *space = space_by_id(ck->def->space_id);
> 	assert(space != NULL);
> -	assert(space_ck_constraint_by_name(space, ck->def->name,
> -					   strlen(ck->def->name)) == NULL);
> -	if (space_add_ck_constraint(space, ck) != 0)
> -		panic("Can't recover after CK constraint drop rollback");
> +	const char *name = ck->def->name;
> +	assert(space_ck_constraint_by_name(space, name, strlen(name)) == NULL);
> +	struct constraint_def *constr_def =
> +		constraint_def_new(space->def->id, CONSTRAINT_TYPE_CK, name);
> +	if (constr_def == NULL)
> +		goto panic;
> +	if (space_add_ck_constraint(space, ck) != 0 ||
> +	    space_put_constraint(space, constr_def) != 0) {
> +		constraint_def_free(constr_def);
> +		goto panic;
> +	}
> 	if (trigger_run(&on_alter_space, space) != 0)
> 		return -1;
> 	return 0;
> +
> +panic:
> +	panic("Can't recover after CK constraint drop rollback");
> }
> 
> /** Commit REPLACE check constraint. */
> @@ -5443,11 +5752,28 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
> 		auto ck_guard = make_scoped_guard([=] {
> 			ck_constraint_delete(new_ck_constraint);
> 		});
> -		if (old_ck_constraint != NULL)
> +
> +		if (old_ck_constraint != NULL) {
> 			rlist_del_entry(old_ck_constraint, link);
> +		} else if (space_constraint_def_by_name(space, name) != NULL) {
> +			diag_set(ClientError, ER_CONSTRAINT_EXISTS,
> +				 name);
> +			return -1;
> +		}
> 		if (space_add_ck_constraint(space, new_ck_constraint) != 0)
> 			return -1;
> 		ck_guard.is_active = false;
> +		if (old_ck_constraint == NULL) {
> +			struct constraint_def *constr_def =
> +				constraint_def_new(space->def->id,
> +						   CONSTRAINT_TYPE_CK, name);
> +			if (constr_def == NULL)
> +				return -1;
> +			if (space_put_constraint(space, constr_def) != 0) {
> +				constraint_def_free(constr_def);
> +				return -1;
> +			}
> +		}
> 		if (old_tuple != NULL) {
> 			on_rollback->data = old_ck_constraint;
> 			on_rollback->run = on_replace_ck_constraint_rollback;
> @@ -5467,7 +5793,13 @@ on_replace_dd_ck_constraint(struct trigger * /* trigger*/, void *event)
> 			return -1;
> 		struct ck_constraint *old_ck_constraint =
> 			space_ck_constraint_by_name(space, name, name_len);
> +		struct constraint_def *constr_def =
> +			space_constraint_def_by_name(space,
> +						     old_ck_constraint->def->name);
> +		assert(constr_def != NULL);
> 		assert(old_ck_constraint != NULL);
> +		space_drop_constraint(space, old_ck_constraint->def->name);
> +		constraint_def_free(constr_def);
> 		space_remove_ck_constraint(space, old_ck_constraint);
> 		on_commit->data = old_ck_constraint;
> 		on_commit->run = on_drop_ck_constraint_commit;
> @@ -5567,6 +5899,7 @@ on_replace_dd_func_index(struct trigger *trigger, void *event)
> 	alter_space_move_indexes(alter, index->def->iid + 1,
> 				 space->index_id_max + 1);
> 	(void) new MoveCkConstraints(alter);
> +	(void) new MoveConstraints(alter);
> 	(void) new UpdateSchemaVersion(alter);
> 	try {
> 		alter_space_do(stmt, alter);
> diff --git a/test/sql/constraint.result b/test/sql/constraint.result
> new file mode 100644
> index 000000000..d8a4bc1c4
> --- /dev/null
> +++ b/test/sql/constraint.result
> @@ -0,0 +1,190 @@
> +-- test-run result file version 2
> +test_run = require('test_run').new()
> + | ---
> + | ...
> +engine = test_run:get_cfg('engine')
> + | ---
> + | ...
> +box.execute('pragma sql_default_engine=\''..engine..'\'')
> + | ---
> + | - row_count: 0
> + | ...
> +test_run:cmd("setopt delimiter ';'")
> + | ---
> + | - true
> + | ...
> +
> +--
> +-- Check a constraint name for duplicate within a single
> +-- <CREATE TABLE> statement.
> +--
> +box.execute('CREATE TABLE t1 (i INT PRIMARY KEY);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT, CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c PRIMARY KEY (i));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c PRIMARY KEY (i));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c UNIQUE (i));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c UNIQUE (i));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c CHECK (i < 0));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c CHECK (i > 0));]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY CONSTRAINT c REFERENCES t1(i),
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i))]]);
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +
> +--
> +-- Check a constraint name for duplicate using
> +-- <ALTER TABLE> statement.
> +--
> +box.execute('CREATE TABLE t2 (i INT CONSTRAINT c PRIMARY KEY);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c CHECK(i > 0);');
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c UNIQUE(i);');
> + | ---
> + | - null
> + | - Index 'C' already exists in space 'T2'
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i);');
> + | ---
> + | - null
> + | - Constraint C already exists
> + | ...
> +
> +--
> +-- Make sure that a constraint's name isn't kept after the
> +-- constraint drop.
> +--
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.execute('DROP INDEX d ON t2;');
> + | ---
> + | - row_count: 1
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d CHECK(i > 0);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.space.T2.ck_constraint.D:drop();
> + | ---
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d FOREIGN KEY(i) REFERENCES t1(i);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.execute('ALTER TABLE t2 DROP CONSTRAINT d;');
> + | ---
> + | - row_count: 1
> + | ...
> +
> +--
> +-- The same inside a transaction.
> +--
> +box.begin()
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);')
> +box.execute('DROP INDEX d ON t2;')
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d CHECK(i > 0);')
> +box.space.T2.ck_constraint.D:drop()
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d FOREIGN KEY(i) REFERENCES t1(i);')
> +box.execute('ALTER TABLE t2 DROP CONSTRAINT d;')
> +box.commit();
> + | ---
> + | ...
> +
> +--
> +-- Make sure, that altering of an index name affect to its record
> +-- in the space's constraint hash table.
> +--
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.space.T2.index.D:alter({name = 'E'});
> + | ---
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> + | ---
> + | - null
> + | - Constraint E already exists
> + | ...
> +
> +--
> +-- Make sure, that altering of an index uniqueness puts/drops
> +-- its name to/from the space's constraint hash table.
> +--
> +box.space.T2.index.E:alter({unique = false});
> + | ---
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> + | ---
> + | - row_count: 1
> + | ...
> +box.space.T2.index.E:alter({unique = true});
> + | ---
> + | - error: Constraint E already exists
> + | ...
> +box.space.T2.ck_constraint.E:drop();
> + | ---
> + | ...
> +box.space.T2.index.E:alter({unique = true});
> + | ---
> + | ...
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> + | ---
> + | - null
> + | - Constraint E already exists
> + | ...
> +
> +-- Alter name and uniqueness of an unique index simultaneously.
> +box.space.T2.index.E:alter({name = 'D', unique = false});
> + | ---
> + | ...
> +box.execute('CREATE UNIQUE INDEX e ON t2(i);');
> + | ---
> + | - row_count: 1
> + | ...
> diff --git a/test/sql/constraint.test.lua b/test/sql/constraint.test.lua
> new file mode 100755
> index 000000000..e80aa4387
> --- /dev/null
> +++ b/test/sql/constraint.test.lua
> @@ -0,0 +1,84 @@
> +test_run = require('test_run').new()
> +engine = test_run:get_cfg('engine')
> +box.execute('pragma sql_default_engine=\''..engine..'\'')
> +test_run:cmd("setopt delimiter ';'")
> +
> +--
> +-- Check a constraint name for duplicate within a single
> +-- <CREATE TABLE> statement.
> +--
> +box.execute('CREATE TABLE t1 (i INT PRIMARY KEY);');
> +box.execute([[CREATE TABLE t2 (i INT, CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c PRIMARY KEY (i));]]);
> +box.execute([[CREATE TABLE t2 (i INT,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c PRIMARY KEY (i));]]);
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c UNIQUE (i));]]);
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c UNIQUE (i));]]);
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c CHECK (i > 0),
> +                               CONSTRAINT c CHECK (i < 0));]]);
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY,
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i),
> +                               CONSTRAINT c CHECK (i > 0));]]);
> +box.execute([[CREATE TABLE t2 (i INT PRIMARY KEY CONSTRAINT c REFERENCES t1(i),
> +                               CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i))]]);
> +
> +--
> +-- Check a constraint name for duplicate using
> +-- <ALTER TABLE> statement.
> +--
> +box.execute('CREATE TABLE t2 (i INT CONSTRAINT c PRIMARY KEY);');
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c CHECK(i > 0);');
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c UNIQUE(i);');
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT c FOREIGN KEY(i) REFERENCES t1(i);');
> +
> +--
> +-- Make sure that a constraint's name isn't kept after the
> +-- constraint drop.
> +--
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);');
> +box.execute('DROP INDEX d ON t2;');
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d CHECK(i > 0);');
> +box.space.T2.ck_constraint.D:drop();
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d FOREIGN KEY(i) REFERENCES t1(i);');
> +box.execute('ALTER TABLE t2 DROP CONSTRAINT d;');
> +
> +--
> +-- The same inside a transaction.
> +--
> +box.begin()
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);')
> +box.execute('DROP INDEX d ON t2;')
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d CHECK(i > 0);')
> +box.space.T2.ck_constraint.D:drop()
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT d FOREIGN KEY(i) REFERENCES t1(i);')
> +box.execute('ALTER TABLE t2 DROP CONSTRAINT d;')
> +box.commit();
> +
> +--
> +-- Make sure, that altering of an index name affect to its record
> +-- in the space's constraint hash table.
> +--
> +box.execute('CREATE UNIQUE INDEX d ON t2(i);');
> +box.space.T2.index.D:alter({name = 'E'});
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> +
> +--
> +-- Make sure, that altering of an index uniqueness puts/drops
> +-- its name to/from the space's constraint hash table.
> +--
> +box.space.T2.index.E:alter({unique = false});
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> +box.space.T2.index.E:alter({unique = true});
> +box.space.T2.ck_constraint.E:drop();
> +box.space.T2.index.E:alter({unique = true});
> +box.execute('ALTER TABLE t2 ADD CONSTRAINT e CHECK(i > 0);');
> +
> +-- Alter name and uniqueness of an unique index simultaneously.
> +box.space.T2.index.E:alter({name = 'D', unique = false});
> +box.execute('CREATE UNIQUE INDEX e ON t2(i);');
> -- 
> 2.21.0 (Apple Git-122)
> 



More information about the Tarantool-patches mailing list