[tarantool-patches] [PATCH v2 2/3] sql: clean-up in case constraint creation failed

imeevma at tarantool.org imeevma at tarantool.org
Fri Jun 28 15:07:55 MSK 2019


Thank you for review! My answers and new patch below.

On 6/26/19 7:11 PM, n.pettik wrote:
>
>
>> On 25 Jun 2019, at 18:31, imeevma at tarantool.org wrote:
>>
>> This patch makes VDBE run destructors before halting in case
>> constraint creation failed.
>>
>> Closes #4183
>
> tarantool> box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(if > 0), CONSTRAINT ck1 FOREIGN KEY(id) REFERENCES t1, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t1, CONSTRAINT ck1 CHECK(id < 0));")
> ---
> - error: 'Supplied key type of part 0 does not match index part type: expected string'
> ...
>
> tarantool> box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(if > 0), CONSTRAINT ck1 FOREIGN KEY(id) REFERENCES t1, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t1, CONSTRAINT ck1 CHECK(id < 0));")
> ---
> - error: Space 'T1' already exists
> ...
>
> Fix this bug. Also apply this diff containing explanations to the patch:
>
Fixed in next patch.

> diff --git a/src/box/sql/build.c b/src/box/sql/build.c
> index d05ca0bc3..0b907b167 100644
> --- a/src/box/sql/build.c
> +++ b/src/box/sql/build.c
> @@ -74,21 +74,29 @@ struct saved_record
>         /** The number of the operation. */
>         int op_number;
>         /** Flag to show that operation is SInsert. */
> -       bool is_sinsert;
> +       bool is_insert;
>  };
>  
>  /**
> - * Save inserted in system space record in list.
> + * Save inserted in system space record in list. This procedure is
> + * called after generation of either OP_SInsert or OP_NoColflict +
> + * OP_Error. In the first case, record inserted to the system space
> + * is supposed to be deleted; in the latter - jump target specified
> + * in OP_Error should be adjusted to the start of clean-up routines
> + * (current entry isn't inserted to the space yet, so there's no
> + * need to delete it).
>   *
>   * @param parser SQL Parser object.
>   * @param space_id Id of table in which record is inserted.
>   * @param reg_key Register that contains first field of the key.
>   * @param reg_key_count Exact number of fields of the key.
> - * @param op_number Number of the operation.
> + * @param op_addr Address of opcode (OP_Error or OP_SInter). Used
> + *                to fix jump target (see sql_finish_coding()).
> + * @param is_op_insert Whether opcode is OP_SInsert or not.
>   */
>  static inline void
>  save_record(struct Parse *parser, uint32_t space_id, int reg_key,
> -           int reg_key_count, int op_number, bool is_sinsert)
> +           int reg_key_count, int op_number, bool is_insert_op)
>  {
>         struct saved_record *record =
>                 region_alloc(&parser->region, sizeof(*record));
> @@ -102,7 +110,7 @@ save_record(struct Parse *parser, uint32_t space_id, int reg_key,
>         record->reg_key = reg_key;
>         record->reg_key_count = reg_key_count;
>         record->op_number = op_number;
> -       record->is_sinsert = is_sinsert;
> +       record->is_insert = is_insert_op;
>         rlist_add_entry(&parser->record_list, record, link);
>  }
>  
> @@ -121,11 +129,12 @@ sql_finish_coding(struct Parse *parse_context)
>          * it won't be created. This code works the same way for
>          * other "CREATE ..." statements but it won't delete
>          * anything as these statements create no more than one
> -        * record.
> -        *
> -        * Here it was assumed that after OP_Error there is at
> -        * least one OP_SInsert. That means that the last opcode
> -        * in the current list is OP_SInsert.
> +        * record. Hence for processed insertions we should remove
> +        * entries from corresponding system spaces alongside
> +        * with fixing jump address for OP_SInsert opcode in
> +        * case it fails during VDBE runtime; for OP_Error only
> +        * adjust jump target to the start of clean-up program
> +        * for already inserted entries.
>          */
>         if (!rlist_empty(&parse_context->record_list)) {
>                 struct saved_record *record =
> @@ -136,7 +145,7 @@ sql_finish_coding(struct Parse *parse_context)
>                 MAYBE_UNUSED const char *comment =
>                         "Delete entry from %s if CREATE TABLE fails";
>                 rlist_foreach_entry(record, &parse_context->record_list, link) {
> -                       if (record->is_sinsert) {
> +                       if (record->is_insert) {
>                                 int rec_reg = ++parse_context->nMem;
>                                 sqlVdbeAddOp3(v, OP_MakeRecord, record->reg_key,
>                                               record->reg_key_count, rec_reg);
> @@ -146,7 +155,7 @@ sql_finish_coding(struct Parse *parse_context)
>                                         space_by_id(record->space_id);
>                                 VdbeComment((v, comment, space_name(space)));
>                         }
> -                       /* Set P2 of operation. */
> +                       /* Set jump target for OP_Error and OP_SInsert. */
>                         sqlVdbeChangeP2(v, record->op_number, v->nOp);
>                 }
>                 sqlVdbeAddOp1(v, OP_Halt, -1);
> diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h
> index 445b58a62..73dc6e4d7 100644
> --- a/src/box/sql/sqlInt.h
> +++ b/src/box/sql/sqlInt.h
> @@ -4521,9 +4521,6 @@ extern int sqlSubProgramsRemaining;
>   * The function allocates error and name resources for VDBE
>   * itself.
>   *
> - * This function should not be used after the OP_SInsert, since it
> - * can lead to garbage in the case of an error.
> - *
>   * @param parser Parsing context.
>   * @param space_id Space to lookup identifier.
>   * @param index_id Index identifier of key.
>
>
Thank you. Fixed.

>> diff --git a/test/sql/clear.test.lua b/test/sql/clear.test.lua
>> index c67a447..1495134 100644
>> --- a/test/sql/clear.test.lua
>> +++ b/test/sql/clear.test.lua
>> @@ -27,5 +27,16 @@ box.execute("SELECT * from zoobar")
>> box.execute("DROP INDEX zoobar2 ON zoobar")
>> box.execute("DROP TABLE zoobar")
>>
>> +--
>> +-- gh-4183: Garbage in case of failed constraint creation.
>> +--
>> +box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(if > 0), CONSTRAINT ck1 CHECK(id < 0));”)
>
> if > 0 -> id > 0
> Please, use correct condition in check constraint.
>
Sorry, fixed.


New patch:

>From 34cdfac3f2acbfd77cd394481616041c863c1300 Mon Sep 17 00:00:00 2001
Date: Thu, 27 Jun 2019 16:59:42 +0300
Subject: [PATCH] sql: clean-up in case constraint creation failed

This patch makes VDBE run destructors before halting in case
constraint creation failed.

Part of #4183

diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index aab60dd..15b8162 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -71,22 +71,33 @@ struct saved_record
 	int reg_key;
 	/** Number of registers the key consists of. */
 	int reg_key_count;
-	/** The number of the OP_SInsert operation. */
-	int insertion_opcode;
+	/** The address of the opcode. */
+	int op_addr;
+	/** Flag to show that operation is SInsert. */
+	bool is_insert;
 };
 
 /**
- * Save inserted in system space record in list.
+ * Save inserted in system space record in list. This procedure is
+ * called after generation of either OP_SInsert or OP_NoColflict +
+ * OP_SetDiag. In the first case, record inserted to the system
+ * space is supposed to be deleted on error; in the latter - jump
+ * target specified in OP_SetDiag should be adjusted to the start
+ * of clean-up routines (current entry isn't inserted to the space
+ * yet, so there's no need to delete it).
  *
  * @param parser SQL Parser object.
  * @param space_id Id of table in which record is inserted.
  * @param reg_key Register that contains first field of the key.
  * @param reg_key_count Exact number of fields of the key.
- * @param insertion_opcode Number of OP_SInsert opcode.
+ * @param op_addr Address of opcode (OP_SetDiag or OP_SInsert).
+ *                Used to fix jump target (see
+ *                sql_finish_coding()).
+ * @param is_insert_op Whether opcode is OP_SInsert or not.
  */
 static inline void
 save_record(struct Parse *parser, uint32_t space_id, int reg_key,
-	    int reg_key_count, int insertion_opcode)
+	    int reg_key_count, int op_addr, bool is_insert_op)
 {
 	struct saved_record *record =
 		region_alloc(&parser->region, sizeof(*record));
@@ -99,7 +110,8 @@ save_record(struct Parse *parser, uint32_t space_id, int reg_key,
 	record->space_id = space_id;
 	record->reg_key = reg_key;
 	record->reg_key_count = reg_key_count;
-	record->insertion_opcode = insertion_opcode;
+	record->op_addr = op_addr;
+	record->is_insert = is_insert_op;
 	rlist_add_entry(&parser->record_list, record, link);
 }
 
@@ -118,28 +130,39 @@ sql_finish_coding(struct Parse *parse_context)
 	 * it won't be created. This code works the same way for
 	 * other "CREATE ..." statements but it won't delete
 	 * anything as these statements create no more than one
-	 * record.
+	 * record. Hence for processed insertions we should remove
+	 * entries from corresponding system spaces alongside
+	 * with fixing jump address for OP_SInsert opcode in
+	 * case it fails during VDBE runtime; for OP_SetDiag only
+	 * adjust jump target to the start of clean-up program
+	 * for already inserted entries.
 	 */
 	if (!rlist_empty(&parse_context->record_list)) {
 		struct saved_record *record =
 			rlist_shift_entry(&parse_context->record_list,
 					  struct saved_record, link);
-		/* Set P2 of SInsert. */
-		sqlVdbeChangeP2(v, record->insertion_opcode, v->nOp);
+		/*
+		 * Set jump target for OP_SetDiag and OP_SInsert.
+		 */
+		sqlVdbeChangeP2(v, record->op_addr, v->nOp);
 		MAYBE_UNUSED const char *comment =
 			"Delete entry from %s if CREATE TABLE fails";
 		rlist_foreach_entry(record, &parse_context->record_list, link) {
-			int record_reg = ++parse_context->nMem;
-			sqlVdbeAddOp3(v, OP_MakeRecord, record->reg_key,
-					  record->reg_key_count, record_reg);
-			sqlVdbeAddOp2(v, OP_SDelete, record->space_id,
-					  record_reg);
-			MAYBE_UNUSED struct space *space =
-				space_by_id(record->space_id);
-			VdbeComment((v, comment, space_name(space)));
-			/* Set P2 of SInsert. */
-			sqlVdbeChangeP2(v, record->insertion_opcode,
-					    v->nOp);
+			if (record->is_insert) {
+				int rec_reg = ++parse_context->nMem;
+				sqlVdbeAddOp3(v, OP_MakeRecord, record->reg_key,
+					      record->reg_key_count, rec_reg);
+				sqlVdbeAddOp2(v, OP_SDelete, record->space_id,
+					      rec_reg);
+				MAYBE_UNUSED struct space *space =
+					space_by_id(record->space_id);
+				VdbeComment((v, comment, space_name(space)));
+			}
+			/*
+			 * Set jump target for OP_SetDiag and
+			 * OP_SInsert.
+			 */
+			sqlVdbeChangeP2(v, record->op_addr, v->nOp);
 		}
 		sqlVdbeAddOp1(v, OP_Halt, -1);
 		VdbeComment((v,
@@ -891,7 +914,7 @@ vdbe_emit_create_index(struct Parse *parse, struct space_def *def,
 			  SQL_SUBTYPE_MSGPACK, index_parts, P4_STATIC);
 	sqlVdbeAddOp3(v, OP_MakeRecord, entry_reg, 6, tuple_reg);
 	sqlVdbeAddOp3(v, OP_SInsert, BOX_INDEX_ID, 0, tuple_reg);
-	save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1);
+	save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1, true);
 	return;
 error:
 	parse->is_aborted = true;
@@ -950,7 +973,7 @@ vdbe_emit_space_create(struct Parse *pParse, int space_id_reg,
 	sqlVdbeAddOp3(v, OP_MakeRecord, iFirstCol, 7, tuple_reg);
 	sqlVdbeAddOp3(v, OP_SInsert, BOX_SPACE_ID, 0, tuple_reg);
 	sqlVdbeChangeP5(v, OPFLAG_NCHANGE);
-	save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1);
+	save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1, true);
 	return;
 error:
 	pParse->is_aborted = true;
@@ -1072,12 +1095,12 @@ vdbe_emit_ck_constraint_create(struct Parse *parser,
 	if (vdbe_emit_halt_with_presence_test(parser, BOX_CK_CONSTRAINT_ID, 0,
 					      ck_constraint_reg, 2,
 					      ER_CONSTRAINT_EXISTS, error_msg,
-					      false, OP_NoConflict) != 0)
+					      false, OP_NoConflict, true) != 0)
 		return;
 	sqlVdbeAddOp3(v, OP_SInsert, BOX_CK_CONSTRAINT_ID, 0,
 		      ck_constraint_reg + 5);
 	save_record(parser, BOX_CK_CONSTRAINT_ID, ck_constraint_reg, 2,
-		    v->nOp - 1);
+		    v->nOp - 1, true);
 	VdbeComment((v, "Create CK constraint %s", ck_def->name));
 	sqlReleaseTempRange(parser, ck_constraint_reg, 5);
 }
@@ -1137,7 +1160,7 @@ vdbe_emit_fk_constraint_create(struct Parse *parse_context,
 					      BOX_FK_CONSTRAINT_ID, 0,
 					      constr_tuple_reg, 2,
 					      ER_CONSTRAINT_EXISTS, error_msg,
-					      false, OP_NoConflict) != 0)
+					      false, OP_NoConflict, true) != 0)
 		return;
 	sqlVdbeAddOp2(vdbe, OP_Bool, fk->is_deferred, constr_tuple_reg + 3);
 	sqlVdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 4, 0,
@@ -1187,7 +1210,7 @@ vdbe_emit_fk_constraint_create(struct Parse *parse_context,
 		sqlVdbeChangeP5(vdbe, OPFLAG_NCHANGE);
 	}
 	save_record(parse_context, BOX_FK_CONSTRAINT_ID, constr_tuple_reg, 2,
-		    vdbe->nOp - 1);
+		    vdbe->nOp - 1, true);
 	sqlReleaseTempRange(parse_context, constr_tuple_reg, 10);
 	return;
 error:
@@ -1284,7 +1307,7 @@ sqlEndTable(struct Parse *pParse)
 	if (vdbe_emit_halt_with_presence_test(pParse, BOX_SPACE_ID, 2,
 					      name_reg, 1, ER_SPACE_EXISTS,
 					      error_msg, (no_err != 0),
-					      OP_NoConflict) != 0)
+					      OP_NoConflict, false) != 0)
 		return;
 
 	int reg_space_id = getNewSpaceId(pParse);
@@ -1310,7 +1333,7 @@ sqlEndTable(struct Parse *pParse)
 		sqlVdbeAddOp3(v, OP_SInsert, BOX_SEQUENCE_ID, 0,
 				  reg_seq_record);
 		save_record(pParse, BOX_SEQUENCE_ID, reg_seq_record + 1, 1,
-			    v->nOp - 1);
+			    v->nOp - 1, true);
 		/* Do an insertion into _space_sequence. */
 		int reg_space_seq_record = emitNewSysSpaceSequenceRecord(pParse,
 							reg_space_id, reg_seq_id,
@@ -1318,7 +1341,7 @@ sqlEndTable(struct Parse *pParse)
 		sqlVdbeAddOp3(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID, 0,
 				  reg_space_seq_record);
 		save_record(pParse, BOX_SPACE_SEQUENCE_ID,
-			    reg_space_seq_record + 1, 1, v->nOp - 1);
+			    reg_space_seq_record + 1, 1, v->nOp - 1, true);
 	}
 	/* Code creation of FK constraints, if any. */
 	struct fk_constraint_parse *fk_parse;
@@ -1445,7 +1468,7 @@ sql_create_view(struct Parse *parse_context)
 	if (vdbe_emit_halt_with_presence_test(parse_context, BOX_SPACE_ID, 2,
 					      name_reg, 1, ER_SPACE_EXISTS,
 					      error_msg, (no_err != 0),
-					      OP_NoConflict) != 0)
+					      OP_NoConflict, false) != 0)
 		goto create_view_fail;
 
 	vdbe_emit_space_create(parse_context, getNewSpaceId(parse_context),
@@ -1564,7 +1587,7 @@ vdbe_emit_fk_constraint_drop(struct Parse *parse_context, char *constraint_name,
 					      BOX_FK_CONSTRAINT_ID, 0,
 					      key_reg, 2, ER_NO_SUCH_CONSTRAINT,
 					      error_msg, false,
-					      OP_Found) != 0) {
+					      OP_Found, false) != 0) {
 		sqlDbFree(parse_context->db, constraint_name);
 		return;
 	}
@@ -1597,7 +1620,7 @@ vdbe_emit_ck_constraint_drop(struct Parse *parser, const char *ck_name,
 	if (vdbe_emit_halt_with_presence_test(parser, BOX_CK_CONSTRAINT_ID, 0,
 					      key_reg, 2, ER_NO_SUCH_CONSTRAINT,
 					      error_msg, false,
-					      OP_Found) != 0)
+					      OP_Found, false) != 0)
 		return;
 	sqlVdbeAddOp3(v, OP_MakeRecord, key_reg, 2, key_reg + 2);
 	sqlVdbeAddOp2(v, OP_SDelete, BOX_CK_CONSTRAINT_ID, key_reg + 2);
@@ -3263,7 +3286,7 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
 				  int index_id, int key_reg, uint32_t key_len,
 				  int tarantool_error_code,
 				  const char *error_src, bool no_error,
-				  int cond_opcode)
+				  int cond_opcode, bool is_clean_needed)
 {
 	assert(cond_opcode == OP_NoConflict || cond_opcode == OP_Found);
 	struct Vdbe *v = sqlGetVdbe(parser);
@@ -3283,6 +3306,8 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
 	} else {
 		sqlVdbeAddOp4(v, OP_SetDiag, tarantool_error_code, 0, 0, error,
 			      P4_DYNAMIC);
+		if (is_clean_needed)
+			save_record(parser, 0, 0, 0, v->nOp - 1, false);
 		sqlVdbeAddOp1(v, OP_Halt, -1);
 	}
 	sqlVdbeJumpHere(v, jmp);
diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h
index 73dc6e4..1d9408a 100644
--- a/src/box/sql/sqlInt.h
+++ b/src/box/sql/sqlInt.h
@@ -4540,7 +4540,7 @@ vdbe_emit_halt_with_presence_test(struct Parse *parser, int space_id,
 				  int index_id, int key_reg, uint32_t key_len,
 				  int tarantool_error_code,
 				  const char *error_src, bool no_error,
-				  int cond_opcode);
+				  int cond_opcode, bool is_clean_needed);
 
 /**
  * Generate VDBE code to delete records from system _sql_stat1 or
diff --git a/src/box/sql/trigger.c b/src/box/sql/trigger.c
index d746ef8..5627239 100644
--- a/src/box/sql/trigger.c
+++ b/src/box/sql/trigger.c
@@ -112,7 +112,8 @@ sql_trigger_begin(struct Parse *parse)
 						      name_reg, 1,
 						      ER_TRIGGER_EXISTS,
 						      error_msg, (no_err != 0),
-						      OP_NoConflict) != 0)
+						      OP_NoConflict,
+						      false) != 0)
 			goto trigger_cleanup;
 	}
 
@@ -420,7 +421,8 @@ sql_drop_trigger(struct Parse *parser)
 	sqlVdbeAddOp4(v, OP_String8, 0, name_reg, 0, name_copy, P4_DYNAMIC);
 	if (vdbe_emit_halt_with_presence_test(parser, BOX_TRIGGER_ID, 0,
 					      name_reg, 1, ER_NO_SUCH_TRIGGER,
-					      error_msg, no_err, OP_Found) != 0)
+					      error_msg, no_err, OP_Found,
+					      false) != 0)
 		goto drop_trigger_cleanup;
 
 	vdbe_code_drop_trigger(parser, trigger_name, true);
diff --git a/test/sql/checks.result b/test/sql/checks.result
index bdcf507..5279ae0 100644
--- a/test/sql/checks.result
+++ b/test/sql/checks.result
@@ -251,8 +251,9 @@ box.execute("CREATE TABLE T2(ID INT PRIMARY KEY, CONSTRAINT CK1 CHECK(ID > 0), C
 ---
 - error: Constraint CK1 already exists
 ...
-box.space.T2:drop()
+box.space.T2
 ---
+- null
 ...
 box.space._ck_constraint:select()
 ---
diff --git a/test/sql/checks.test.lua b/test/sql/checks.test.lua
index 50cb13c..cacba59 100644
--- a/test/sql/checks.test.lua
+++ b/test/sql/checks.test.lua
@@ -86,7 +86,7 @@ box.execute("INSERT INTO \"test\" VALUES(11, 11);")
 box.execute("INSERT INTO \"test\" VALUES(12, 11);")
 s:drop()
 box.execute("CREATE TABLE T2(ID INT PRIMARY KEY, CONSTRAINT CK1 CHECK(ID > 0), CONSTRAINT CK1 CHECK(ID < 0))")
-box.space.T2:drop()
+box.space.T2
 box.space._ck_constraint:select()
 
 --
diff --git a/test/sql/clear.result b/test/sql/clear.result
index 0a3baa3..4e10b0e 100644
--- a/test/sql/clear.result
+++ b/test/sql/clear.result
@@ -168,3 +168,30 @@ box.execute("DROP TABLE zoobar")
 ...
 -- Debug
 -- require("console").start()
+--
+-- gh-4183: Garbage in case of failed constraint creation.
+--
+box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(id > 0), CONSTRAINT ck1 CHECK(id < 0));")
+---
+- error: Constraint CK1 already exists
+...
+box.space.t1
+---
+- null
+...
+box.space._ck_constraint:select()
+---
+- []
+...
+box.execute("CREATE TABLE t2(id INT PRIMARY KEY, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2);")
+---
+- error: Constraint FK1 already exists
+...
+box.space.t2
+---
+- null
+...
+box.space._fk_constraint:select()
+---
+- []
+...
diff --git a/test/sql/clear.test.lua b/test/sql/clear.test.lua
index c67a447..4e212e3 100644
--- a/test/sql/clear.test.lua
+++ b/test/sql/clear.test.lua
@@ -29,3 +29,14 @@ box.execute("DROP TABLE zoobar")
 
 -- Debug
 -- require("console").start()
+
+--
+-- gh-4183: Garbage in case of failed constraint creation.
+--
+box.execute("CREATE TABLE t1(id INT PRIMARY KEY, CONSTRAINT ck1 CHECK(id > 0), CONSTRAINT ck1 CHECK(id < 0));")
+box.space.t1
+box.space._ck_constraint:select()
+
+box.execute("CREATE TABLE t2(id INT PRIMARY KEY, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2, CONSTRAINT fk1 FOREIGN KEY(id) REFERENCES t2);")
+box.space.t2
+box.space._fk_constraint:select()




More information about the Tarantool-patches mailing list