From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 0973641D0BD for ; Thu, 17 Oct 2019 17:32:56 +0300 (MSK) From: Kirill Shcherbatov Date: Thu, 17 Oct 2019 17:32:54 +0300 Message-Id: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v1 1/1] sql: fix fk violation for autoincremented field List-Id: Tarantool development List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@freelists.org, tarantool-patches@dev.tarantool.org, korablev@tarantool.org Fk constraints used to ignore incremented fields that are represented with NULL placeholders in a tuple during FK tests in the SQL frontend. A factual value is inserted in a tuple a bit later, during DML 'insert' operation on the server side. To work around this issue a new OP_NextSequenceValue opcode is introduced. This new operation retrieves a next sequence value (that will be factually inserted in a tuple later) to test it for fk constraint compatibility. Closes #4565 --- Branch: http://github.com/tarantool/tarantool/tree/kshch/gh-4546-fk-autoincrement Issue: https://github.com/tarantool/tarantool/issues/4546 src/box/sequence.c | 54 ++++++++----- src/box/sequence.h | 11 +++ src/box/sql/fk_constraint.c | 30 +++++++- src/box/sql/vdbe.c | 21 +++++ test/sql/gh-4565-fk-autoincrement.result | 90 ++++++++++++++++++++++ test/sql/gh-4565-fk-autoincrement.test.lua | 26 +++++++ 6 files changed, 211 insertions(+), 21 deletions(-) create mode 100644 test/sql/gh-4565-fk-autoincrement.result create mode 100644 test/sql/gh-4565-fk-autoincrement.test.lua diff --git a/src/box/sequence.c b/src/box/sequence.c index 5ebfa2747..1b3302a86 100644 --- a/src/box/sequence.c +++ b/src/box/sequence.c @@ -201,25 +201,23 @@ sequence_update(struct sequence *seq, int64_t value) } int -sequence_next(struct sequence *seq, int64_t *result) +sequence_next_value(struct sequence *seq, int64_t *result, uint32_t *key_hash, + bool *is_start) { - int64_t value; struct sequence_def *def = seq->def; - struct sequence_data new_data, old_data; - uint32_t key = seq->def->id; - uint32_t hash = sequence_hash(key); - uint32_t pos = light_sequence_find_key(&sequence_data_index, hash, key); + uint32_t key = def->id; + *key_hash = sequence_hash(key); + uint32_t pos = light_sequence_find_key(&sequence_data_index, + *key_hash, key); if (pos == light_sequence_end) { - new_data.id = key; - new_data.value = def->start; - if (light_sequence_insert(&sequence_data_index, hash, - new_data) == light_sequence_end) - return -1; + *is_start = true; *result = def->start; return 0; } - old_data = light_sequence_get(&sequence_data_index, pos); - value = old_data.value; + *is_start = false; + struct sequence_data old_data = + light_sequence_get(&sequence_data_index, pos); + int64_t value = old_data.value; if (def->step > 0) { if (value < def->min) { value = def->min; @@ -244,11 +242,6 @@ sequence_next(struct sequence *seq, int64_t *result) } done: assert(value >= def->min && value <= def->max); - new_data.id = key; - new_data.value = value; - if (light_sequence_replace(&sequence_data_index, hash, - new_data, &old_data) == light_sequence_end) - unreachable(); *result = value; return 0; overflow: @@ -260,6 +253,31 @@ overflow: goto done; } +int +sequence_next(struct sequence *seq, int64_t *result) +{ + uint32_t key_hash; + bool is_start; + if (sequence_next_value(seq, result, &key_hash, &is_start) != 0) { + assert(is_start == false); + return -1; + } + uint32_t key = seq->def->id; + struct sequence_data old_data, new_data; + new_data.id = key; + new_data.value = *result; + if (is_start) { + if (light_sequence_insert(&sequence_data_index, key_hash, + new_data) == light_sequence_end) + return -1; + } else { + if (light_sequence_replace(&sequence_data_index, key_hash, + new_data, &old_data) == light_sequence_end) + unreachable(); + } + return 0; +} + int access_check_sequence(struct sequence *seq) { diff --git a/src/box/sequence.h b/src/box/sequence.h index 976020a25..fee39e1ab 100644 --- a/src/box/sequence.h +++ b/src/box/sequence.h @@ -137,6 +137,17 @@ sequence_set(struct sequence *seq, int64_t value); int sequence_update(struct sequence *seq, int64_t value); + +/** + * Return the next sequence value. + * In case of overflow, the diag error message is set. + * + * @result, otherwise return -1 and set diag. + */ +int +sequence_next_value(struct sequence *seq, int64_t *result, uint32_t *key_hash, + bool *is_start); + /** * Advance a sequence. * diff --git a/src/box/sql/fk_constraint.c b/src/box/sql/fk_constraint.c index 482220a95..1eb52a51a 100644 --- a/src/box/sql/fk_constraint.c +++ b/src/box/sql/fk_constraint.c @@ -37,6 +37,7 @@ #include "sqlInt.h" #include "box/fk_constraint.h" #include "box/schema.h" +#include "box/sequence.h" /* * Deferred and Immediate FKs @@ -199,6 +200,8 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent, struct Vdbe *v = sqlGetVdbe(parse_context); int cursor = parse_context->nTab - 1; int ok_label = sqlVdbeMakeLabel(v); + struct space *child = space_by_id(fk_def->child_id); + assert(child != NULL); /* * If incr_count is less than zero, then check at runtime * if there are any outstanding constraints to resolve. @@ -216,6 +219,15 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent, } struct field_link *link = fk_def->links; for (uint32_t i = 0; i < fk_def->field_count; ++i, ++link) { + if (child->sequence != NULL && + child->sequence_fieldno == link->child_field) { + /* + * In case of auto incremented field + * this heuristics is not applicable and + * fk constraint should be evaluated. + */ + continue; + } int reg = link->child_field + reg_data + 1; sqlVdbeAddOp2(v, OP_IsNull, reg, ok_label); } @@ -258,9 +270,21 @@ fk_constraint_lookup_parent(struct Parse *parse_context, struct space *parent, parent); link = fk_def->links; for (uint32_t i = 0; i < field_count; ++i, ++link) { - sqlVdbeAddOp2(v, OP_Copy, - link->child_field + 1 + reg_data, - temp_regs + i); + if (child->sequence != NULL && + child->sequence_fieldno == link->child_field) { + /* + * Retrieve a next sequence value + * for an autoincremented field + * and validate it instead of + * NULL placeholder. + */ + sqlVdbeAddOp2(v, OP_NextSequenceValue, + fk_def->child_id, temp_regs + i); + } else { + sqlVdbeAddOp2(v, OP_Copy, + link->child_field + 1 + reg_data, + temp_regs + i); + } } struct index *idx = space_index(parent, referenced_idx); assert(idx != NULL); diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c index f881a732e..94d41461f 100644 --- a/src/box/sql/vdbe.c +++ b/src/box/sql/vdbe.c @@ -3731,6 +3731,27 @@ case OP_NextSequenceId: { break; } +/* Opcode: NextSequenceValue P1 P2 * * * + * Synopsis: r[P2]=sequence_get_value(space_by_id(p1)) + * + * Get a next value of the sequence registered for a space + * with given id. Store result in P2 memory register. + */ +case OP_NextSequenceValue: { + struct space *space = space_by_id(pOp->p1); + assert(space != NULL); + assert(space->sequence != NULL); + bool is_start; + uint32_t dummy; + int64_t seq_val; + if (sequence_next_value(space->sequence, &seq_val, &dummy, + &is_start) != 0) + goto abort_due_to_error; + pOut = vdbe_prepare_null_out(p, pOp->p2); + mem_set_i64(pOut, seq_val); + break; +} + /* Opcode: NextIdEphemeral P1 P2 * * * * Synopsis: r[P2]=get_next_rowid(space[P1]) * diff --git a/test/sql/gh-4565-fk-autoincrement.result b/test/sql/gh-4565-fk-autoincrement.result new file mode 100644 index 000000000..00c147035 --- /dev/null +++ b/test/sql/gh-4565-fk-autoincrement.result @@ -0,0 +1,90 @@ +-- test-run result file version 2 +env = require('test_run') + | --- + | ... +test_run = env.new() + | --- + | ... +engine = test_run:get_cfg('engine') + | --- + | ... +box.execute('pragma sql_default_engine=\''..engine..'\'') + | --- + | - row_count: 0 + | ... + +-- +-- gh-4565: Missing foreign key check in case of +-- autoincremented field +-- +box.execute("CREATE TABLE t1 (s1 INTEGER PRIMARY KEY);") + | --- + | - row_count: 1 + | ... +box.execute("CREATE TABLE t2 (s1 INTEGER PRIMARY KEY AUTOINCREMENT, FOREIGN KEY (s1) REFERENCES t1);") + | --- + | - row_count: 1 + | ... +box.execute("INSERT INTO t2 VALUES (NULL);") + | --- + | - null + | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed' + | ... +box.space.T1:count() == 0 + | --- + | - true + | ... +box.space.T2:count() == 0 + | --- + | - true + | ... +box.execute("INSERT INTO t1 VALUES (1);") + | --- + | - row_count: 1 + | ... +box.execute("INSERT INTO t2 VALUES (NULL);") + | --- + | - autoincrement_ids: + | - 1 + | row_count: 1 + | ... +box.space.T1:count() == 1 + | --- + | - true + | ... +box.space.T2:count() == 1 + | --- + | - true + | ... +box.execute("INSERT INTO t2 VALUES (NULL);") + | --- + | - null + | - 'Failed to execute SQL statement: FOREIGN KEY constraint failed' + | ... +box.space.T1:count() == 1 + | --- + | - true + | ... +box.space.T2:count() == 1 + | --- + | - true + | ... +box.sequence.T2:set(box.sequence.T2.max) + | --- + | ... +box.sequence.T2:next() + | --- + | - error: Sequence 'T2' has overflowed + | ... +box.execute("INSERT INTO t2 VALUES (NULL);") + | --- + | - null + | - Sequence 'T2' has overflowed + | ... +box.space.T1:drop() + | --- + | - error: 'Can''t modify space ''T1'': can not drop a referenced index' + | ... +box.space.T2:drop() + | --- + | ... diff --git a/test/sql/gh-4565-fk-autoincrement.test.lua b/test/sql/gh-4565-fk-autoincrement.test.lua new file mode 100644 index 000000000..4e39f6420 --- /dev/null +++ b/test/sql/gh-4565-fk-autoincrement.test.lua @@ -0,0 +1,26 @@ +env = require('test_run') +test_run = env.new() +engine = test_run:get_cfg('engine') +box.execute('pragma sql_default_engine=\''..engine..'\'') + +-- +-- gh-4565: Missing foreign key check in case of +-- autoincremented field +-- +box.execute("CREATE TABLE t1 (s1 INTEGER PRIMARY KEY);") +box.execute("CREATE TABLE t2 (s1 INTEGER PRIMARY KEY AUTOINCREMENT, FOREIGN KEY (s1) REFERENCES t1);") +box.execute("INSERT INTO t2 VALUES (NULL);") +box.space.T1:count() == 0 +box.space.T2:count() == 0 +box.execute("INSERT INTO t1 VALUES (1);") +box.execute("INSERT INTO t2 VALUES (NULL);") +box.space.T1:count() == 1 +box.space.T2:count() == 1 +box.execute("INSERT INTO t2 VALUES (NULL);") +box.space.T1:count() == 1 +box.space.T2:count() == 1 +box.sequence.T2:set(box.sequence.T2.max) +box.sequence.T2:next() +box.execute("INSERT INTO t2 VALUES (NULL);") +box.space.T1:drop() +box.space.T2:drop() -- 2.23.0