[Tarantool-patches] [PATCH v1 1/1] sql: fix fk violation for autoincremented field

Kirill Shcherbatov kshcherbatov at tarantool.org
Thu Oct 17 17:32:54 MSK 2019


Fk constraints used to ignore incremented fields that are
represented with NULL placeholders in a tuple during FK tests in
the SQL frontend. A factual value is inserted in a tuple a bit
later, during DML 'insert' operation on the server side.

To work around this issue a new OP_NextSequenceValue opcode is
introduced. This new operation retrieves a next sequence value
(that will be factually inserted in a tuple later) to test it
for fk constraint compatibility.

Closes #4565
---
Branch: http://github.com/tarantool/tarantool/tree/kshch/gh-4546-fk-autoincrement
Issue: https://github.com/tarantool/tarantool/issues/4546


 src/box/sequence.c                         | 54 ++++++++-----
 src/box/sequence.h                         | 11 +++
 src/box/sql/fk_constraint.c                | 30 +++++++-
 src/box/sql/vdbe.c                         | 21 +++++
 test/sql/gh-4565-fk-autoincrement.result   | 90 ++++++++++++++++++++++
 test/sql/gh-4565-fk-autoincrement.test.lua | 26 +++++++
 6 files changed, 211 insertions(+), 21 deletions(-)
 create mode 100644 test/sql/gh-4565-fk-autoincrement.result
 create mode 100644 test/sql/gh-4565-fk-autoincrement.test.lua

diff --git a/src/box/sequence.c b/src/box/sequence.c
index 5ebfa2747..1b3302a86 100644
--- a/src/box/sequence.c
+++ b/src/box/sequence.c
@@ -201,25 +201,23 @@ sequence_update(struct sequence *seq, int64_t value)
 }
 
 int
-sequence_next(struct sequence *seq, int64_t *result)
+sequence_next_value(struct sequence *seq, int64_t *result, uint32_t *key_hash,
+		    bool *is_start)
 {
-	int64_t value;
 	struct sequence_def *def = seq->def;
-	struct sequence_data new_data, old_data;
-	uint32_t key = seq->def->id;
-	uint32_t hash = sequence_hash(key);
-	uint32_t pos = light_sequence_find_key(&sequence_data_index, hash, key);
+	uint32_t key = def->id;
+	*key_hash = sequence_hash(key);
+	uint32_t pos = light_sequence_find_key(&sequence_data_index,
+					       *key_hash, key);
 	if (pos == light_sequence_end) {
-		new_data.id = key;
-		new_data.value = def->start;
-		if (light_sequence_insert(&sequence_data_index, hash,
-					  new_data) == light_sequence_end)
-			return -1;
+		*is_start = true;
 		*result = def->start;
 		return 0;
 	}
-	old_data = light_sequence_get(&sequence_data_index, pos);
-	value = old_data.value;
+	*is_start = false;
+	struct sequence_data old_data =
+		light_sequence_get(&sequence_data_index, pos);
+	int64_t value = old_data.value;
 	if (def->step > 0) {
 		if (value < def->min) {
 			value = def->min;
@@ -244,11 +242,6 @@ sequence_next(struct sequence *seq, int64_t *result)
 	}
 done:
 	assert(value >= def->min && value <= def->max);
-	new_data.id = key;
-	new_data.value = value;
-	if (light_sequence_replace(&sequence_data_index, hash,
-				   new_data, &old_data) == light_sequence_end)
-		unreachable();
 	*result = value;
 	return 0;
 overflow:
@@ -260,6 +253,31 @@ overflow:
 	goto done;
 }
 
+int
+sequence_next(struct sequence *seq, int64_t *result)
+{
+	uint32_t key_hash;
+	bool is_start;
+	if (sequence_next_value(seq, result, &key_hash, &is_start) != 0) {
+		assert(is_start == false);
+		return -1;
+	}
+	uint32_t key = seq->def->id;
+	struct sequence_data old_data, new_data;
+	new_data.id = key;
+	new_data.value = *result;
+	if (is_start) {
+		if (light_sequence_insert(&sequence_data_index, key_hash,
+					  new_data) == light_sequence_end)
+			return -1;
+	} else {
+		if (light_sequence_replace(&sequence_data_index, key_hash,
+				   new_data, &old_data) == light_sequence_end)
+			unreachable();
+	}
+	return 0;
+}
+
 int
 access_check_sequence(struct sequence *seq)
 {
diff --git a/src/box/sequence.h b/src/box/sequence.h
index 976020a25..fee39e1ab 100644
--- a/src/box/sequence.h
+++ b/src/box/sequence.h
@@ -137,6 +137,17 @@ sequence_set(struct sequence *seq, int64_t value);
 int
 sequence_update(struct sequence *seq, int64_t value);
 
+
+/**
+ * Return the next sequence value.
+ * In case of overflow, the diag error message is set.
+ *
+ * @result, otherwise return -1 and set diag.
+ */
+int
+sequence_next_value(struct sequence *seq, int64_t *result, uint32_t *key_hash,
+		    bool *is_start);
+
 /**
  * Advance a sequence.
  *
diff --git a/src/box/sql/fk_constraint.c b/src/box/sql/fk_constraint.c
index 482220a95..1eb52a51a 100644
--- a/src/box/sql/fk_constraint.c
+++ b/src/box/sql/fk_constraint.c
@@ -37,6 +37,7 @@
 #include "sqlInt.h"
 #include "box/fk_constraint.h"
 #include "box/schema.h"
+#include "box/sequence.h"
 
 /*
  * Deferred and Immediate FKs
@@ -199,6 +200,8 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent,
 	struct Vdbe *v = sqlGetVdbe(parse_context);
 	int cursor = parse_context->nTab - 1;
 	int ok_label = sqlVdbeMakeLabel(v);
+	struct space *child = space_by_id(fk_def->child_id);
+	assert(child != NULL);
 	/*
 	 * If incr_count is less than zero, then check at runtime
 	 * if there are any outstanding constraints to resolve.
@@ -216,6 +219,15 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent,
 	}
 	struct field_link *link = fk_def->links;
 	for (uint32_t i = 0; i < fk_def->field_count; ++i, ++link) {
+		if (child->sequence != NULL &&
+		    child->sequence_fieldno == link->child_field) {
+			/*
+			 * In case of auto incremented field
+			 * this heuristics is not applicable and
+			 * fk constraint should be evaluated.
+			 */
+			continue;
+		}
 		int reg = link->child_field + reg_data + 1;
 		sqlVdbeAddOp2(v, OP_IsNull, reg, ok_label);
 	}
@@ -258,9 +270,21 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent,
 				      parent);
 		link = fk_def->links;
 		for (uint32_t i = 0; i < field_count; ++i, ++link) {
-			sqlVdbeAddOp2(v, OP_Copy,
-					  link->child_field + 1 + reg_data,
-					  temp_regs + i);
+			if (child->sequence != NULL &&
+			    child->sequence_fieldno == link->child_field) {
+				/*
+				 * Retrieve a next sequence value
+				 * for an autoincremented field
+				 * and validate it instead of
+				 * NULL placeholder.
+				 */
+				sqlVdbeAddOp2(v, OP_NextSequenceValue,
+					      fk_def->child_id, temp_regs + i);
+			} else {
+				sqlVdbeAddOp2(v, OP_Copy,
+					      link->child_field + 1 + reg_data,
+					      temp_regs + i);
+			}
 		}
 		struct index *idx = space_index(parent, referenced_idx);
 		assert(idx != NULL);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index f881a732e..94d41461f 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3731,6 +3731,27 @@ case OP_NextSequenceId: {
 	break;
 }
 
+/* Opcode: NextSequenceValue P1 P2 * * *
+ * Synopsis: r[P2]=sequence_get_value(space_by_id(p1))
+ *
+ * Get a next value of the sequence registered for a space
+ * with given id. Store result in P2 memory register.
+ */
+case OP_NextSequenceValue: {
+	struct space *space = space_by_id(pOp->p1);
+	assert(space != NULL);
+	assert(space->sequence != NULL);
+	bool is_start;
+	uint32_t dummy;
+	int64_t seq_val;
+	if (sequence_next_value(space->sequence, &seq_val, &dummy,
+				&is_start) != 0)
+		goto abort_due_to_error;
+	pOut = vdbe_prepare_null_out(p, pOp->p2);
+	mem_set_i64(pOut, seq_val);
+	break;
+}
+
 /* Opcode: NextIdEphemeral P1 P2 * * *
  * Synopsis: r[P2]=get_next_rowid(space[P1])
  *
diff --git a/test/sql/gh-4565-fk-autoincrement.result b/test/sql/gh-4565-fk-autoincrement.result
new file mode 100644
index 000000000..00c147035
--- /dev/null
+++ b/test/sql/gh-4565-fk-autoincrement.result
@@ -0,0 +1,90 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+engine = test_run:get_cfg('engine')
+ | ---
+ | ...
+box.execute('pragma sql_default_engine=\''..engine..'\'')
+ | ---
+ | - row_count: 0
+ | ...
+
+--
+-- gh-4565: Missing foreign key check in case of
+--          autoincremented field
+--
+box.execute("CREATE TABLE t1 (s1 INTEGER PRIMARY KEY);")
+ | ---
+ | - row_count: 1
+ | ...
+box.execute("CREATE TABLE t2 (s1 INTEGER PRIMARY KEY AUTOINCREMENT, FOREIGN KEY (s1) REFERENCES t1);")
+ | ---
+ | - row_count: 1
+ | ...
+box.execute("INSERT INTO t2 VALUES (NULL);")
+ | ---
+ | - null
+ | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ | ...
+box.space.T1:count() == 0
+ | ---
+ | - true
+ | ...
+box.space.T2:count() == 0
+ | ---
+ | - true
+ | ...
+box.execute("INSERT INTO t1 VALUES (1);")
+ | ---
+ | - row_count: 1
+ | ...
+box.execute("INSERT INTO t2 VALUES (NULL);")
+ | ---
+ | - autoincrement_ids:
+ |   - 1
+ |   row_count: 1
+ | ...
+box.space.T1:count() == 1
+ | ---
+ | - true
+ | ...
+box.space.T2:count() == 1
+ | ---
+ | - true
+ | ...
+box.execute("INSERT INTO t2 VALUES (NULL);")
+ | ---
+ | - null
+ | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ | ...
+box.space.T1:count() == 1
+ | ---
+ | - true
+ | ...
+box.space.T2:count() == 1
+ | ---
+ | - true
+ | ...
+box.sequence.T2:set(box.sequence.T2.max)
+ | ---
+ | ...
+box.sequence.T2:next()
+ | ---
+ | - error: Sequence 'T2' has overflowed
+ | ...
+box.execute("INSERT INTO t2 VALUES (NULL);")
+ | ---
+ | - null
+ | - Sequence 'T2' has overflowed
+ | ...
+box.space.T1:drop()
+ | ---
+ | - error: 'Can''t modify space ''T1'': can not drop a referenced index'
+ | ...
+box.space.T2:drop()
+ | ---
+ | ...
diff --git a/test/sql/gh-4565-fk-autoincrement.test.lua b/test/sql/gh-4565-fk-autoincrement.test.lua
new file mode 100644
index 000000000..4e39f6420
--- /dev/null
+++ b/test/sql/gh-4565-fk-autoincrement.test.lua
@@ -0,0 +1,26 @@
+env = require('test_run')
+test_run = env.new()
+engine = test_run:get_cfg('engine')
+box.execute('pragma sql_default_engine=\''..engine..'\'')
+
+--
+-- gh-4565: Missing foreign key check in case of
+--          autoincremented field
+--
+box.execute("CREATE TABLE t1 (s1 INTEGER PRIMARY KEY);")
+box.execute("CREATE TABLE t2 (s1 INTEGER PRIMARY KEY AUTOINCREMENT, FOREIGN KEY (s1) REFERENCES t1);")
+box.execute("INSERT INTO t2 VALUES (NULL);")
+box.space.T1:count() == 0
+box.space.T2:count() == 0
+box.execute("INSERT INTO t1 VALUES (1);")
+box.execute("INSERT INTO t2 VALUES (NULL);")
+box.space.T1:count() == 1
+box.space.T2:count() == 1
+box.execute("INSERT INTO t2 VALUES (NULL);")
+box.space.T1:count() == 1
+box.space.T2:count() == 1
+box.sequence.T2:set(box.sequence.T2.max)
+box.sequence.T2:next()
+box.execute("INSERT INTO t2 VALUES (NULL);")
+box.space.T1:drop()
+box.space.T2:drop()
-- 
2.23.0



More information about the Tarantool-patches mailing list