[tarantool-patches] Re: [PATCH 3/5] sql: introduce ADD CONSTRAINT statement
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Jul 18 00:05:00 MSK 2018
Thanks for the patch! See 32 comments below. Also see other
fixes on the branch in a separate commit.
1. Typos below: paren, refrencing.
On 13/07/2018 05:04, Nikita Pettik wrote:
> After introducing separate space for persisting foreign key
> constraints, nothing prevents us from adding ALTER TABLE statement to
> add or drop named constraints. According to ANSI syntax is following:
>
> ALTER TABLE <referencing table> ADD CONSTRAINT
> <referential constraint name> FOREIGN KEY
> <left parent> <referencing columns> <right paren> REFERENCES
2. Can you give an example what is <left/right parent>? Or maybe you meant
parentheses?
> <referenced table> [ <referenced columns> ] [ MATCH <match type> ]
> [ <referential triggered action> ] [ <constraint check time> ]
>
> ALTER TABLE <referencing table> DROP CONSTRAINT <constrain name>
>
> In our terms it looks like:
>
> ALTER TABLE t1 ADD CONSTRAINT f1 FOREIGN KEY(id, a)
> REFERENCES t2 (id, b) MATCH FULL;
> ALTER TABLE t1 DROP CONSTRAINT f1;
>
> FK constraints which come with CREATE TABLE statement are also
> persisted with auto-generated name. They are coded after space and its
> indexes.
>
> Moreover, we don't use original SQLite foreign keys anymore: those
> obsolete structs have been removed alongside FK hash. Now FK constraints
> are stored only in space.
>
> Since types of refrencing and referenced fields must match, and now in
> SQL only PK is allowed to feature INT (other fields are always SCALAR),
> some tests have been corrected to obey this rule.
>
> Part of #3271
> ---
> extra/mkkeywordhash.c | 3 +
> src/box/fkey.h | 6 +
> src/box/sql.c | 16 +
> src/box/sql/alter.c | 7 -
> src/box/sql/build.c | 581 +++++++++++++++------
> src/box/sql/callback.c | 2 -
> src/box/sql/delete.c | 6 +-
> src/box/sql/expr.c | 10 +-
> src/box/sql/fkey.c | 974 +++++++++++------------------------
> src/box/sql/insert.c | 19 +-
> src/box/sql/main.c | 5 -
> src/box/sql/parse.y | 37 +-
> src/box/sql/pragma.c | 240 +--------
> src/box/sql/pragma.h | 7 -
> src/box/sql/prepare.c | 5 +
> src/box/sql/sqliteInt.h | 167 +++---
> src/box/sql/status.c | 5 +-
> src/box/sql/tarantoolInt.h | 12 +
> src/box/sql/update.c | 4 +-
> src/box/sql/vdbe.c | 16 +-
> test/sql-tap/alter.test.lua | 4 +-
> test/sql-tap/alter2.test.lua | 196 +++++++
> test/sql-tap/fkey1.test.lua | 51 +-
> test/sql-tap/fkey2.test.lua | 131 ++---
> test/sql-tap/fkey3.test.lua | 19 +-
> test/sql-tap/fkey4.test.lua | 2 +-
> test/sql-tap/orderby1.test.lua | 6 +-
> test/sql-tap/table.test.lua | 18 +-
> test/sql-tap/tkt-b1d3a2e531.test.lua | 6 +-
> test/sql-tap/triggerC.test.lua | 2 +-
> test/sql-tap/whereG.test.lua | 4 +-
> test/sql-tap/with1.test.lua | 2 +-
> 32 files changed, 1226 insertions(+), 1337 deletions(-)
> create mode 100755 test/sql-tap/alter2.test.lua
>
> diff --git a/src/box/fkey.h b/src/box/fkey.h
> index 1b6ea71d9..939773ef2 100644
> --- a/src/box/fkey.h
> +++ b/src/box/fkey.h
> @@ -141,6 +141,12 @@ fkey_is_self_referenced(const struct fkey_def *fkey)
> return fkey->child_id == fkey->parent_id;
> }
>
> +static inline bool
> +space_fkey_check_references(const struct space *space)
> +{
> + return space->parent_fkey != NULL;
> +}
3. Are you sure that you need this one-line function for the
single place of usage? And in this place you can remove it and
nothing would change. See the code:
if (space_fkey_check_references(space)) {
for (struct fkey *fk = space->parent_fkey; fk != NULL;
fk = fk->fkey_parent_next) {
If here space->parent_key == NULL, then the cycle just won't start. It
is not?
(I have fixed this comment in my commit).
> +
> /**
> * The second argument is a Trigger structure allocated by the
> * fkActionTrigger() routine.This function deletes the Trigger
> diff --git a/src/box/sql/alter.c b/src/box/sql/alter.c
> index fe54e5531..e81113f58 100644
> --- a/src/box/sql/alter.c
> +++ b/src/box/sql/alter.c
> @@ -189,12 +188,6 @@ sqlite3AlterFinishAddColumn(Parse * pParse, Token * pColDef)
> sqlite3ErrorMsg(pParse, "Cannot add a UNIQUE column");
> return;
> }
> - if ((user_session->sql_flags & SQLITE_ForeignKeys) && pNew->pFKey
> - && pDflt) {
> - sqlite3ErrorMsg(pParse,
> - "Cannot add a REFERENCES column with non-NULL default value");
> - return;
> - }
4. Why did you remove this?
> assert(pNew->def->fields[pNew->def->field_count - 1].is_nullable ==
> action_is_nullable(pNew->def->fields[
> pNew->def->field_count - 1].nullable_action));
> diff --git a/src/box/sql/build.c b/src/box/sql/build.c
> index 0c762fac9..c2d3cd035 100644
> --- a/src/box/sql/build.c
> +++ b/src/box/sql/build.c
> @@ -373,9 +374,6 @@ deleteTable(sqlite3 * db, Table * pTable)
> freeIndex(db, pIndex);
> }
>
> - /* Delete any foreign keys attached to this table. */
> - sqlite3FkDelete(db, pTable);
5. I still see sqlite3FkDelete in one comment. Please, remove. Maybe
the comment is obsolete.
> -
> /* Delete the Table structure itself.
> */
> sqlite3HashClear(&pTable->idxHash);
> @@ -1743,6 +1741,95 @@ emitNewSysSpaceSequenceRecord(Parse *pParse, int space_id, const char reg_seq_id
> return first_col;
> }
>
> +/**
> + * Generate opcodes to serialize foreign key into MgsPack and
> + * insert produced tuple into _fk_constraint space.
> + *
> + * @param parse_context Parsing context.
> + * @param fk Foreign key to be created.
> + */
> +static void
> +vdbe_fkey_code_creation(struct Parse *parse_context, const struct fkey_def *fk)
6. How about vdbe_emit_fkey_create? As I remember, we have decided to use
_emit for functions generating opcodes.
> +{
> + assert(parse_context != NULL);
> + assert(fk != NULL);
> + struct Vdbe *vdbe = sqlite3GetVdbe(parse_context);
> + assert(vdbe != NULL);
> + /*
> + * Occupy registers for 8 fields: each member in
> + * _constraint space plus one for final msgpack tuple.
> + */
> + int constr_tuple_reg = sqlite3GetTempRange(parse_context, 9);
> + const char *name_copy = sqlite3DbStrDup(parse_context->db, fk->name);
> + if (name_copy == NULL)
> + return;
> + sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg, 0, name_copy,
> + P4_DYNAMIC);
> + /*
> + * In case we are adding FK constraints during execution
> + * of <CREATE TABLE ...> statement, we don't have child
> + * id, but we know register where it will be stored.
> + * */
> + if (parse_context->pNewTable != NULL) {
> + sqlite3VdbeAddOp2(vdbe, OP_SCopy, fk->child_id,
> + constr_tuple_reg + 1);
> + } else {
> + sqlite3VdbeAddOp2(vdbe, OP_Integer, fk->child_id,
> + constr_tuple_reg + 1);
> + }
> + if (parse_context->pNewTable != NULL && fkey_is_self_referenced(fk)) {
> + sqlite3VdbeAddOp2(vdbe, OP_SCopy, fk->parent_id,
> + constr_tuple_reg + 2);
> + } else {
> + sqlite3VdbeAddOp2(vdbe, OP_Integer, fk->parent_id,
> + constr_tuple_reg + 2);
> + }
> + sqlite3VdbeAddOp2(vdbe, OP_Bool, 0, constr_tuple_reg + 3);
> + sqlite3VdbeChangeP4(vdbe, -1, (char*)&fk->is_deferred, P4_BOOL);
> + sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 4, 0,
> + fkey_match_strs[fk->match], P4_STATIC);
> + sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 5, 0,
> + fkey_action_strs[fk->on_delete], P4_STATIC);
> + sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 6, 0,
> + fkey_action_strs[fk->on_update], P4_STATIC);
> + size_t encoded_links_sz = fkey_encode_links(fk, NULL) + 1;
> + char *encoded_links = sqlite3DbMallocRaw(parse_context->db,
> + encoded_links_sz);
> + if (encoded_links == NULL) {
> + free((void *) name_copy);
7. name_copy is allocated on Db, but freed with libc. It is a path to
the dark side.
> + return;
> + }
> + size_t real_links_sz = fkey_encode_links(fk, encoded_links);
> + encoded_links[real_links_sz] = '\0';
8. Why do you need zero-termination? Encoded_links is MessagePack. It
can contain any number of zeros inside and can be non-terminated.
> + sqlite3VdbeAddOp4(vdbe, OP_Blob, real_links_sz, constr_tuple_reg + 7,
> + SQL_SUBTYPE_MSGPACK, encoded_links, P4_DYNAMIC);
> + sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, constr_tuple_reg, 8,
> + constr_tuple_reg + 8);
> + sqlite3VdbeAddOp2(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID,
> + constr_tuple_reg + 8);
> + sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
> + sqlite3ReleaseTempRange(parse_context, constr_tuple_reg, 9);
> +}
> +
> +static int
> +resolve_link(struct Parse *parse_context, const struct space_def *def,
> + const char *field_name, uint32_t *link)
> +{
> + assert(link != NULL);
> + uint32_t j;
> + for (j = 0; j < def->field_count; ++j) {
> + if (strcmp(field_name, def->fields[j].name) == 0) {
> + *link = j;
> + break;
> + }
> + }
> + if (j == def->field_count) {
> + sqlite3ErrorMsg(parse_context, "no such column %s", field_name);
> + return -1;
> + }
> + return 0;
> +}
9. How about create tuple_dictionary on CREATE TABLE in table->def and use
its method tuple_fieldno_by_name?
> +
> /*
> * This routine is called to report the final ")" that terminates
> * a CREATE TABLE statement.
> @@ -1913,6 +2000,39 @@ sqlite3EndTable(Parse * pParse, /* Parse context */
>
> /* Reparse everything to update our internal data structures */
> parseTableSchemaRecord(pParse, iSpaceId, zStmt); /* consumes zStmt */
> +
> + /* Code creation of FK constraints, if any. */
> + struct fkey_parse *fk_parse = pParse->new_fkey;
> + while (fk_parse != NULL) {
> + struct fkey_def *fk = fk_parse->fkey;
> + if (fk_parse->selfref_cols != NULL) {
> + struct ExprList *cols = fk_parse->selfref_cols;
> + for (uint32_t i = 0; i < fk->field_count; ++i) {
10. Why do you iterate for fk->field_count, but access cols->a? Is it
guaranteed that fk->field_count == cols->nExpr?
> + if (resolve_link(pParse, p->def,
> + cols->a[i].zName,
> + &fk->links[i].parent_field) != 0)
> + return;
> + }
> + fk->parent_id = iSpaceId;
> + } else if (fk_parse->is_self_referenced) {
> + struct Index *pk = sqlite3PrimaryKeyIndex(p);
> + if (pk->nColumn != fk->field_count) {
> + sqlite3ErrorMsg(pParse,
> + "number of columns in foreign key does "
> + "not match the number of columns in "
> + "the referenced table");
11. ER_CREATE_FK_CONSTRAINT? Or ER_CREATE_SPACE.
> + return;
> + }
> + for (uint32_t i = 0; i < fk->field_count; ++i) {
> + fk->links[i].parent_field =
> + pk->aiColumn[i];
> + }
> + fk->parent_id = iSpaceId;
> + }
> + fk->child_id = iSpaceId;
> + vdbe_fkey_code_creation(pParse, fk);
> + fk_parse = fk_parse->next;
12. You can use stailq/rlist to link fkey_parse objects and use
here (not only here) rlist_foreach_entry.
> + }
> }
>
> /* Add the table to the in-memory representation of the database.
> @@ -2085,6 +2205,32 @@ sql_clear_stat_spaces(Parse *parse, const char *table_name,
> }
> }
>
> +/**
> + * Generate VDBE program to remove entry from _fk_constraint space.
> + *
> + * @param parse_context Parsing context.
> + * @param constraint_name Name of FK constraint to be dropped.
> + * Must be allocated on head by sqlite3DbMalloc().
> + * It will be freed in VDBE.
> + * @param child_id Id of table which constraint belongs to.
> + */
> +static void
> +vdbe_fkey_code_drop(struct Parse *parse_context, const char *constraint_name,
> + uint32_t child_id)
13. vdbe_emit_fkey_drop?
> +{
> + struct Vdbe *vdbe = sqlite3GetVdbe(parse_context);
> + assert(vdbe != NULL);
> + int key_reg = sqlite3GetTempRange(parse_context, 3);
> + sqlite3VdbeAddOp4(vdbe, OP_String8, 0, key_reg, 0, constraint_name,
> + P4_DYNAMIC);
> + sqlite3VdbeAddOp2(vdbe, OP_Integer, child_id, key_reg + 1);
> + sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, key_reg, 2, key_reg + 2);
> + sqlite3VdbeAddOp2(vdbe, OP_SDelete, BOX_FK_CONSTRAINT_ID, key_reg + 2);
> + sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
> + VdbeComment((vdbe, "Delete FK constraint %s", constraint_name));
> + sqlite3ReleaseTempRange(parse_context, key_reg, 3);
> +}
> +
> /**
> * Generate code to drop a table.
> * This routine includes dropping triggers, sequences,
> @@ -2270,176 +2429,276 @@ sql_drop_table(struct Parse *parse_context, struct SrcList *table_name_list,
> sqlite3SrcListDelete(db, table_name_list);
> }
>
> -/*
> - * This routine is called to create a new foreign key on the table
> - * currently under construction. pFromCol determines which columns
> - * in the current table point to the foreign key. If pFromCol==0 then
> - * connect the key to the last column inserted. pTo is the name of
> - * the table referred to (a.k.a the "parent" table). pToCol is a list
> - * of tables in the parent pTo table. flags contains all
> - * information about the conflict resolution algorithms specified
> - * in the ON DELETE, ON UPDATE and ON INSERT clauses.
> +/**
> + * Return ordinal number of column by name. In case of error,
> + * set error message.
> *
> - * An FKey structure is created and added to the table currently
> - * under construction in the pParse->pNewTable field.
> + * @param parse_context Parsing context.
> + * @param space Space which column belongs to.
> + * @param column_name Name of column to investigate.
> + * @param[out] colno Found name of column.
> *
> - * The foreign key is set for IMMEDIATE processing. A subsequent call
> - * to sqlite3DeferForeignKey() might change this to DEFERRED.
> + * @retval 0 on success, -1 on fault.
> */
> -void
> -sqlite3CreateForeignKey(Parse * pParse, /* Parsing context */
> - ExprList * pFromCol, /* Columns in this table that point to other table */
> - Token * pTo, /* Name of the other table */
> - ExprList * pToCol, /* Columns in the other table */
> - int flags /* Conflict resolution algorithms. */
> - )
> +static int
> +columnno_by_name(struct Parse *parse_context, const struct space *space,
> + const char *column_name, uint32_t *colno)
> {
> - sqlite3 *db = pParse->db;
> -#ifndef SQLITE_OMIT_FOREIGN_KEY
> - FKey *pFKey = 0;
> - FKey *pNextTo;
> - Table *p = pParse->pNewTable;
> - int nByte;
> - int i;
> - int nCol;
> - char *z;
> -
> - assert(pTo != 0);
> - char *normilized_name = strndup(pTo->z, pTo->n);
> - if (normilized_name == NULL) {
> - diag_set(OutOfMemory, pTo->n, "strndup", "normalized name");
> - goto fk_end;
> - }
> - sqlite3NormalizeName(normilized_name);
> - uint32_t parent_id = box_space_id_by_name(normilized_name,
> - strlen(normilized_name));
> - if (parent_id == BOX_ID_NIL &&
> - strcmp(normilized_name, p->def->name) != 0) {
> - sqlite3ErrorMsg(pParse, "foreign key constraint references "\
> - "nonexistent table: %s", normilized_name);
> - goto fk_end;
> - }
> - struct space *parent_space = space_by_id(parent_id);
> - if (parent_space != NULL && parent_space->def->opts.is_view) {
> - sqlite3ErrorMsg(pParse, "can't create foreign key constraint "\
> - "referencing view: %s", normilized_name);
> - goto fk_end;
> + assert(colno != NULL);
> + uint32_t column_len = strlen(column_name);
> + if (tuple_fieldno_by_name(space->def->dict, column_name, column_len,
> + field_name_hash(column_name, column_len),
> + colno) != 0) {
> + sqlite3ErrorMsg(parse_context,
> + "table \"%s\" doesn't feature column %s",
> + space->def->name, column_name);
> + return -1;
> }
> - if (p == 0)
> - goto fk_end;
> - if (pFromCol == 0) {
> - int iCol = p->def->field_count - 1;
> - if (NEVER(iCol < 0))
> - goto fk_end;
> - if (pToCol && pToCol->nExpr != 1) {
> - sqlite3ErrorMsg(pParse, "foreign key on %s"
> - " should reference only one column of table %T",
> - p->def->fields[iCol].name, pTo);
> - goto fk_end;
> - }
> - nCol = 1;
> - } else if (pToCol && pToCol->nExpr != pFromCol->nExpr) {
> - sqlite3ErrorMsg(pParse,
> - "number of columns in foreign key does not match the number of "
> - "columns in the referenced table");
> - goto fk_end;
> + return 0;
> +}
> +
> +void
> +sql_create_foreign_key(struct Parse *parse_context, struct SrcList *child,
> + struct Token *constraint, struct ExprList *child_cols,
> + struct Token *parent, struct ExprList *parent_cols,
> + bool is_deferred, int actions)
> +{
> + struct sqlite3 *db = parse_context->db;
> + /*
> + * When this function is called second time during
> + * <CREATE TABLE ...> statement (i.e. at VDBE runtime),
> + * don't even try to do something.
> + */
> + if (db->init.busy)
> + return;
14. How is it possible? That sql_create_foreign_key is called twice. I
see that it is called from the parser only. But when I removed it, I got
a lot of errors.
> + /*
> + * Beforehand initialization for correct clean-up
> + * while emergency exiting in case of error.
> + */
> + const char *parent_name = NULL;
> + const char *constraint_name = NULL;
> + bool is_self_referenced = false;
> + /*
> + * Table under construction during CREATE TABLE
> + * processing. NULL for ALTER TABLE statement handling.
> + */
> + struct Table *new_tab = parse_context->pNewTable;
> + /* Whether we are processing ALTER TABLE or CREATE TABLE. */
> + bool is_alter = new_tab == NULL;
> + uint32_t child_cols_count;
> + if (child_cols == NULL) {
> + if (is_alter) {
> + sqlite3ErrorMsg(parse_context,
> + "referencing columns are not specified");
> + goto exit_create_fk;
15. No test. Can not grep this message anywhere.
> + }
> + child_cols_count = 1;
> } else {
> - nCol = pFromCol->nExpr;
> - }
> - nByte = sizeof(*pFKey) + (nCol - 1) * sizeof(pFKey->aCol[0]) +
> - strlen(normilized_name) + 1;
> - if (pToCol) {
> - for (i = 0; i < pToCol->nExpr; i++) {
> - nByte += sqlite3Strlen30(pToCol->a[i].zName) + 1;
> - }
> - }
> - pFKey = sqlite3DbMallocZero(db, nByte);
> - if (pFKey == 0) {
> - goto fk_end;
> - }
> - pFKey->pFrom = p;
> - pFKey->pNextFrom = p->pFKey;
> - z = (char *)&pFKey->aCol[nCol];
> - pFKey->zTo = z;
> - memcpy(z, normilized_name, strlen(normilized_name) + 1);
> - z += strlen(normilized_name) + 1;
> - pFKey->nCol = nCol;
> - if (pFromCol == 0) {
> - pFKey->aCol[0].iFrom = p->def->field_count - 1;
> + child_cols_count = child_cols->nExpr;
> + }
> + assert(!is_alter || (child != NULL && child->nSrc == 1));
> + struct space *child_space = NULL;
> + uint32_t child_id = 0;
> + if (is_alter) {
> + const char *child_name = child->a[0].zName;
> + child_id = box_space_id_by_name(child_name,
> + strlen(child_name));
> + if (child_id == BOX_ID_NIL) {
> + diag_set(ClientError, ER_NO_SUCH_SPACE, child_name);
> + goto tnt_error;
> + }
> + child_space = space_by_id(child_id);
> + assert(child_space != NULL);
> } else {
> - for (i = 0; i < nCol; i++) {
> - int j;
> - for (j = 0; j < (int)p->def->field_count; j++) {
> - if (strcmp(p->def->fields[j].name,
> - pFromCol->a[i].zName) == 0) {
> - pFKey->aCol[i].iFrom = j;
> - break;
> - }
> - }
> - if (j >= (int)p->def->field_count) {
> - sqlite3ErrorMsg(pParse,
> - "unknown column \"%s\" in foreign key definition",
> - pFromCol->a[i].zName);
> - goto fk_end;
> - }
> + struct fkey_parse *fk = region_alloc(&parse_context->region,
> + sizeof(*fk));
> + if (fk == NULL) {
> + diag_set(OutOfMemory, sizeof(*fk), "region",
> + "struct fkey_parse");
> + parse_context->rc = SQL_TARANTOOL_ERROR;
> + parse_context->nErr++;
> + goto exit_create_fk;
> }
> - }
> - if (pToCol) {
> - for (i = 0; i < nCol; i++) {
> - int n = sqlite3Strlen30(pToCol->a[i].zName);
> - pFKey->aCol[i].zCol = z;
> - memcpy(z, pToCol->a[i].zName, n);
> - z[n] = 0;
> - z += n + 1;
> + memset(fk, 0, sizeof(*fk));
> + struct fkey_parse *last_fk = parse_context->new_fkey;
> + parse_context->new_fkey = fk;
> + fk->next = last_fk;
> + }
> + assert(parent != NULL);
> + parent_name = sqlite3NameFromToken(db, parent);
> + if (parent_name == NULL)
> + goto exit_create_fk;
> + uint32_t parent_id = box_space_id_by_name(parent_name,
> + strlen(parent_name));
> + /*
> + * Within ALTER TABLE ADD CONSTRAINT FK also can be
> + * self-referenced, but in this case parent (which is
> + * also child) table will definitely exist.
> + */
> + is_self_referenced = is_alter ? false :
> + !strcmp(parent_name, new_tab->def->name);
> + if (parent_id == BOX_ID_NIL) {
> + if (is_self_referenced) {
> + parse_context->new_fkey->selfref_cols = parent_cols;
> + parse_context->new_fkey->is_self_referenced = true;
> + } else {
> + diag_set(ClientError, ER_NO_SUCH_SPACE, parent_name);;
> + goto tnt_error;
> }
> }
> - pFKey->isDeferred = 0;
> - pFKey->aAction[0] = (u8) (flags & 0xff); /* ON DELETE action */
> - pFKey->aAction[1] = (u8) ((flags >> 8) & 0xff); /* ON UPDATE action */
> -
> - pNextTo = (FKey *) sqlite3HashInsert(&p->pSchema->fkeyHash,
> - pFKey->zTo, (void *)pFKey);
> - if (pNextTo == pFKey) {
> - sqlite3OomFault(db);
> - goto fk_end;
> - }
> - if (pNextTo) {
> - assert(pNextTo->pPrevTo == 0);
> - pFKey->pNextTo = pNextTo;
> - pNextTo->pPrevTo = pFKey;
> + struct space *parent_space = space_by_id(parent_id);
> + if (parent_space != NULL && parent_space->def->opts.is_view) {
> + sqlite3ErrorMsg(parse_context,
> + "referenced table can't be view");
> + goto exit_create_fk;
> + }
> + if (parent_cols != NULL) {
> + if (parent_cols->nExpr != (int) child_cols_count) {
> + sqlite3ErrorMsg(parse_context,
> + "number of columns in foreign key does "
> + "not match the number of columns in "
> + "the referenced table");
16. This message appears 3 times. I think, it is worth to create a
separate error code. Or at least remember this string somewhere in a
variable and use it with ER_CREATE_FK_CONSTRAINT.
Or add a separate label with this error and do goto when occurs.
> + goto exit_create_fk;
> + }
> + } else if (!is_self_referenced) {
> + /*
> + * If parent columns are not specified, then PK columns
> + * of parent table are used as referenced.
> + */
> + struct index *parent_pk = space_index(parent_space, 0);
> + assert(parent_pk != NULL);
> + if (parent_pk->def->key_def->part_count != child_cols_count) {
> + sqlite3ErrorMsg(parse_context,
> + "number of columns in foreign key does "
> + "not match the number of columns in "
> + "the referenced table");
> + goto exit_create_fk;
> + }
> }
> -
> - /* Link the foreign key to the table as the last step.
> + if (constraint == NULL && !is_alter) {
> + if (parse_context->constraintName.n == 0) {
> + uint32_t fk_count = 0;
> + for (struct fkey_parse *fk = parse_context->new_fkey;
> + fk != NULL; fk = fk->next, fk_count++);
17. How about store fk count in fkey_parse?
> diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
> index 3183e3dc7..ad19759e2 100644
> --- a/src/box/sql/expr.c
> +++ b/src/box/sql/expr.c
> @@ -4835,12 +4835,12 @@ sqlite3ExprIfFalse(Parse * pParse, Expr * pExpr, int dest, int jumpIfNull)
> * Assert()s verify that the computation is correct.
> */
>
> - op = ((pExpr->op + (TK_ISNULL & 1)) ^ 1) - (TK_ISNULL & 1);
> + if (pExpr->op >= TK_NE && pExpr->op <= TK_GE)
18. Why from NE to GE? In the table above I see the range [NE, LT],
that includes [NE, GE]. Why this hunk is needed? I know about
dependecy of opcode and token values, but why your patch breaks it?
Can you add words in such way that they will not break parity?
> + op = ((pExpr->op + (TK_NE & 1)) ^ 1) - (TK_NE & 1);
> + if (pExpr->op == TK_ISNULL || pExpr->op == TK_NOTNULL)
> + op = ((pExpr->op + (TK_ISNULL & 1)) ^ 1) - (TK_ISNULL & 1);
>
> - /*
> - * Verify correct alignment of TK_ and OP_ constants.
> - * Tokens TK_ISNULL and TK_NE shoud have the same parity.
> - */
> + /* Verify correct alignment of TK_ and OP_ constants. */
> assert(pExpr->op != TK_NE || op == OP_Eq);
> assert(pExpr->op != TK_EQ || op == OP_Ne);
> assert(pExpr->op != TK_LT || op == OP_Ge);
> diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
> index 016ded8d0..1eebf6b10 100644
> --- a/src/box/sql/fkey.c
> +++ b/src/box/sql/fkey.c
> @@ -39,8 +39,9 @@
> #include "box/schema.h"
> #include "box/session.h"
> #include "tarantoolInt.h"
> +#include "vdbeInt.h"
>
> -#ifndef SQLITE_OMIT_FOREIGN_KEY
> +#ifndef SQLITE_OMIT_TRIGGER
19. Why?
> @@ -366,150 +187,116 @@ sqlite3FkLocateIndex(Parse * pParse, /* Parse context to store any error in */
> *
> * DELETE deferred Decrement the "deferred constraint counter".
> *
> - * These operations are identified in the comment at the top of this file
> - * (fkey.c) as "I.1" and "D.1".
> + * These operations are identified in the comment at the top of
> + * this file (fkey.c) as "I.1" and "D.1".
> + *
> + * @param parse_context Current parsing context.
> + * @param parent Parent table of FK constraint.
> + * @param fk_def FK constraint definition.
> + * @param referenced_idx Id of referenced index.
> + * @param reg_data Address of array containing child table row.
> + * @param incr_count Increment constraint counter by this value.
> + * @param is_ignore If true, pretend parent contains all NULLs.
> */
> static void
> -fkLookupParent(Parse * pParse, /* Parse context */
> - Table * pTab, /* Parent table of FK pFKey */
> - Index * pIdx, /* Unique index on parent key columns in pTab */
> - FKey * pFKey, /* Foreign key constraint */
> - int *aiCol, /* Map from parent key columns to child table columns */
> - int regData, /* Address of array containing child table row */
> - int nIncr, /* Increment constraint counter by this */
> - int isIgnore /* If true, pretend pTab contains all NULL values */
> - )
> +fkey_lookup_parent(struct Parse *parse_context, struct space *parent,
> + struct fkey_def *fk_def, uint32_t referenced_idx,
> + int reg_data, int incr_count, bool is_ignore)
> {
> - int i; /* Iterator variable */
> - Vdbe *v = sqlite3GetVdbe(pParse); /* Vdbe to add code to */
> - int iCur = pParse->nTab - 1; /* Cursor number to use */
> - int iOk = sqlite3VdbeMakeLabel(v); /* jump here if parent key found */
> - struct session *user_session = current_session();
> -
> - /* If nIncr is less than zero, then check at runtime if there are any
> - * outstanding constraints to resolve. If there are not, there is no need
> - * to check if deleting this row resolves any outstanding violations.
> + struct Vdbe *v = sqlite3GetVdbe(parse_context);
> + int cursor = parse_context->nTab - 1;
> + int ok_label = sqlite3VdbeMakeLabel(v);
> + /*
> + * If incr_count is less than zero, then check at runtime
> + * if there are any outstanding constraints to resolve.
> + * If there are not, there is no need to check if deleting
> + * this row resolves any outstanding violations.
> *
> - * Check if any of the key columns in the child table row are NULL. If
> - * any are, then the constraint is considered satisfied. No need to
> - * search for a matching row in the parent table.
> + * Check if any of the key columns in the child table row
> + * are NULL. If any are, then the constraint is considered
> + * satisfied. No need to search for a matching row in the
> + * parent table.
> */
> - if (nIncr < 0) {
> - sqlite3VdbeAddOp2(v, OP_FkIfZero, pFKey->isDeferred, iOk);
> - VdbeCoverage(v);
> - }
> - for (i = 0; i < pFKey->nCol; i++) {
> - int iReg = aiCol[i] + regData + 1;
> - sqlite3VdbeAddOp2(v, OP_IsNull, iReg, iOk);
> - VdbeCoverage(v);
> - }
> -
> - if (isIgnore == 0) {
> - if (pIdx == 0) {
> - /* If pIdx is NULL, then the parent key is the INTEGER PRIMARY KEY
> - * column of the parent table (table pTab).
> - */
> - int regTemp = sqlite3GetTempReg(pParse);
> -
> - /* Invoke MustBeInt to coerce the child key value to an integer (i.e.
> - * apply the affinity of the parent key). If this fails, then there
> - * is no matching parent key. Before using MustBeInt, make a copy of
> - * the value. Otherwise, the value inserted into the child key column
> - * will have INTEGER affinity applied to it, which may not be correct.
> - */
> - sqlite3VdbeAddOp2(v, OP_SCopy, aiCol[0] + 1 + regData,
> - regTemp);
> - VdbeCoverage(v);
> -
> - /* If the parent table is the same as the child table, and we are about
> - * to increment the constraint-counter (i.e. this is an INSERT operation),
> - * then check if the row being inserted matches itself. If so, do not
> - * increment the constraint-counter.
> - */
> - if (pTab == pFKey->pFrom && nIncr == 1) {
> - sqlite3VdbeAddOp3(v, OP_Eq, regData, iOk,
> - regTemp);
> - VdbeCoverage(v);
> - sqlite3VdbeChangeP5(v, SQLITE_NOTNULL);
> - }
> + if (incr_count < 0)
> + sqlite3VdbeAddOp2(v, OP_FkIfZero, fk_def->is_deferred,
> + ok_label);
>
> - } else {
> - int nCol = pFKey->nCol;
> - int regTemp = sqlite3GetTempRange(pParse, nCol);
> - int regRec = sqlite3GetTempReg(pParse);
> - struct space *space =
> - space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
> - vdbe_emit_open_cursor(pParse, iCur, pIdx->tnum, space);
> - for (i = 0; i < nCol; i++) {
> - sqlite3VdbeAddOp2(v, OP_Copy,
> - aiCol[i] + 1 + regData,
> - regTemp + i);
> - }
> -
> - /* If the parent table is the same as the child table, and we are about
> - * to increment the constraint-counter (i.e. this is an INSERT operation),
> - * then check if the row being inserted matches itself. If so, do not
> - * increment the constraint-counter.
> - *
> - * If any of the parent-key values are NULL, then the row cannot match
> - * itself. So set JUMPIFNULL to make sure we do the OP_Found if any
> - * of the parent-key values are NULL (at this point it is known that
> - * none of the child key values are).
> - */
> - if (pTab == pFKey->pFrom && nIncr == 1) {
> - int iJump =
> - sqlite3VdbeCurrentAddr(v) + nCol + 1;
> - for (i = 0; i < nCol; i++) {
> - int iChild = aiCol[i] + 1 + regData;
> - int iParent =
> - pIdx->aiColumn[i] + 1 + regData;
> - assert(pIdx->aiColumn[i] >= 0);
> - assert(aiCol[i] != pTab->iPKey);
> - if (pIdx->aiColumn[i] == pTab->iPKey) {
> - /* The parent key is a composite key that includes the IPK column */
> - iParent = regData;
> - }
> - sqlite3VdbeAddOp3(v, OP_Ne, iChild,
> - iJump, iParent);
> - VdbeCoverage(v);
> - sqlite3VdbeChangeP5(v,
> - SQLITE_JUMPIFNULL);
> - }
> - sqlite3VdbeGoto(v, iOk);
> + for (uint32_t i = 0; i < fk_def->field_count; i++) {
> + int iReg = fk_def->links[i].child_field + reg_data + 1;
> + sqlite3VdbeAddOp2(v, OP_IsNull, iReg, ok_label);
> + }
> + if (is_ignore == 0) {
> + uint32_t field_count = fk_def->field_count;
> + int temp_regs = sqlite3GetTempRange(parse_context, field_count);
> + int rec_reg = sqlite3GetTempReg(parse_context);
> + uint32_t id =
> + SQLITE_PAGENO_FROM_SPACEID_AND_INDEXID(fk_def->parent_id,
> + referenced_idx);
20. Vdbe_emit_open_cursor takes exactly index id, not pageno, on 2.0, so please,
rebase on the latest version.
> + vdbe_emit_open_cursor(parse_context, cursor, id, parent);
> + for (uint32_t i = 0; i < field_count; ++i) {
> + sqlite3VdbeAddOp2(v, OP_Copy,
> + fk_def->links[i].child_field + 1 +
> + reg_data, temp_regs + i);
> + }
> + /*
> + * If the parent table is the same as the child
> + * table, and we are about to increment the
> + * constraint-counter (i.e. this is an INSERT operation),
> + * then check if the row being inserted matches itself.
> + * If so, do not increment the constraint-counter.
> + *
> + * If any of the parent-key values are NULL, then
> + * the row cannot match itself. So set JUMPIFNULL
> + * to make sure we do the OP_Found if any of the
> + * parent-key values are NULL (at this point it
> + * is known that none of the child key values are).
> + */
> + if (parent->def->id == fk_def->child_id && incr_count == 1) {
21. What about fkey_is_self_referenced(fk_def)? Is it the same?
> + int jump = sqlite3VdbeCurrentAddr(v) + field_count + 1;
> + for (uint32_t i = 0; i < field_count; i++) {
> + int child_col = fk_def->links[i].child_field +
> + 1 + reg_data;
> + int parent_col = fk_def->links[i].parent_field +
> + 1 + reg_data;
> + sqlite3VdbeAddOp3(v, OP_Ne, child_col, jump,
> + parent_col);
> + sqlite3VdbeChangeP5(v, SQLITE_JUMPIFNULL);
> }
> -
> - sqlite3VdbeAddOp4(v, OP_MakeRecord, regTemp, nCol,
> - regRec,
> - sqlite3IndexAffinityStr(pParse->db,
> - pIdx), nCol);
> - sqlite3VdbeAddOp4Int(v, OP_Found, iCur, iOk, regRec, 0);
> - VdbeCoverage(v);
> -
> - sqlite3ReleaseTempReg(pParse, regRec);
> - sqlite3ReleaseTempRange(pParse, regTemp, nCol);
> + sqlite3VdbeGoto(v, ok_label);
> }
> + struct index *idx = space_index(parent, referenced_idx);
> + assert(idx != NULL);
> + sqlite3VdbeAddOp4(v, OP_MakeRecord, temp_regs, field_count,
> + rec_reg, sql_index_affinity_str(v->db,
> + idx->def),
> + P4_DYNAMIC);
> + sqlite3VdbeAddOp4Int(v, OP_Found, cursor, ok_label, rec_reg, 0);
> + sqlite3ReleaseTempReg(parse_context, rec_reg);
> + sqlite3ReleaseTempRange(parse_context, temp_regs, field_count);
> }
> -
> - if (!pFKey->isDeferred && !(user_session->sql_flags & SQLITE_DeferFKs)
> - && !pParse->pToplevel && !pParse->isMultiWrite) {
> - /* Special case: If this is an INSERT statement that will insert exactly
> - * one row into the table, raise a constraint immediately instead of
> - * incrementing a counter. This is necessary as the VM code is being
> + struct session *user_session = current_session();
> + if (!fk_def->is_deferred &&
> + !(user_session->sql_flags & SQLITE_DeferFKs) &&
22. Why do we check session flags here? They are runtime and I
can change DeferFKs after parsing but before executing. DeferFKs is
checked both on runtime and on parsing for unknown reason.
This is not a single place of this strange thing.
> @@ -844,59 +586,31 @@ sqlite3FkCheck(Parse * pParse, /* Parse context */
> if ((user_session->sql_flags & SQLITE_ForeignKeys) == 0)
> return;
>
> - /* Loop through all the foreign key constraints for which pTab is the
> - * child table (the table that the foreign key definition is part of).
> + /*
> + * Loop through all the foreign key constraints for which
> + * pTab is the child table.
> */
> - for (pFKey = pTab->pFKey; pFKey; pFKey = pFKey->pNextFrom) {
> - Table *pTo; /* Parent table of foreign key pFKey */
> - Index *pIdx = 0; /* Index on key columns in pTo */
> - int *aiFree = 0;
> - int *aiCol;
> - int iCol;
> - int i;
> + struct space *space = space_by_id(pTab->def->id);
> + assert(space != NULL);
> + for (struct fkey *fk = space->child_fkey; fk != NULL;
> + fk = fk->fkey_child_next) {
> + struct fkey_def *fk_def = fk->def;
> int bIgnore = 0;
> -
> - if (aChange
> - && sqlite3_stricmp(pTab->def->name, pFKey->zTo) != 0
> - && fkChildIsModified(pFKey, aChange) == 0) {
> + if (aChange != NULL && space->def->id != fk_def->parent_id &&
23. fkey_is_self_referenced?
> + !fkey_child_is_modified(fk_def, aChange))
> continue;
> - }
> -
> @@ -977,100 +686,74 @@ sqlite3FkCheck(Parse * pParse, /* Parse context */
> * might be set incorrectly if any OP_FkCounter related scans are
> * omitted.
> */
> - if (!pFKey->isDeferred && eAction != OE_Cascade
> - && eAction != OE_SetNull) {
> + if (!fk_def->is_deferred &&
> + action != FKEY_ACTION_CASCADE &&
> + action != FKEY_ACTION_SET_NULL) {
> sqlite3MayAbort(pParse);
> }
> }
> - pItem->zName = 0;
> sqlite3SrcListDelete(db, pSrc);
> }
> - sqlite3DbFree(db, aiCol);
> }
> }
>
> #define COLUMN_MASK(x) (((x)>31) ? 0xffffffff : ((u32)1<<(x)))
24. Lets use 64 bitmask and utilities from column_mask.h.
>
> -/*
> - * This function is called before generating code to update or delete a
> - * row contained in table pTab.
> - */
> -u32
> -sqlite3FkOldmask(Parse * pParse, /* Parse context */
> - Table * pTab /* Table being modified */
> - )
> +uint32_t
> +fkey_old_mask(uint32_t space_id)
25. I think we should calculate this mask once on fk creation
like it is done for key_def.columnm_mask.
> {
> - u32 mask = 0;
> + uint32_t mask = 0;
> struct session *user_session = current_session();
> -
> if (user_session->sql_flags & SQLITE_ForeignKeys) {
> - FKey *p;
> - int i;
> - for (p = pTab->pFKey; p; p = p->pNextFrom) {
> - for (i = 0; i < p->nCol; i++)
> - mask |= COLUMN_MASK(p->aCol[i].iFrom);
> + struct space *space = space_by_id(space_id);
> + for (struct fkey *fk = space->child_fkey; fk != NULL;
> + fk = fk->fkey_child_next) {
> + struct fkey_def *def = fk->def;
> + for (uint32_t i = 0; i < def->field_count; ++i)
> + mask |=COLUMN_MASK(def->links[i].child_field);
> }
> - for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
> - Index *pIdx = 0;
> - sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
> - if (pIdx) {
> - int nIdxCol = index_column_count(pIdx);
> - for (i = 0; i < nIdxCol; i++) {
> - assert(pIdx->aiColumn[i] >= 0);
> - mask |= COLUMN_MASK(pIdx->aiColumn[i]);
> - }
> - }
> + for (struct fkey *fk = space->parent_fkey; fk != NULL;
> + fk = fk->fkey_parent_next) {
> + struct fkey_def *def = fk->def;
> + for (uint32_t i = 0; i < def->field_count; ++i)
> + mask |= COLUMN_MASK(def->links[i].parent_field);
> }
> }
> return mask;
> }
> diff --git a/src/box/sql/main.c b/src/box/sql/main.c
> index 00dc7a631..618cdc420 100644
> --- a/src/box/sql/main.c
> +++ b/src/box/sql/main.c
> @@ -730,11 +730,6 @@ sqlite3RollbackAll(Vdbe * pVdbe, int tripCode)
> {
> sqlite3 *db = pVdbe->db;
> (void)tripCode;
> - struct session *user_session = current_session();
> -
> - /* DDL is impossible inside a transaction. */
> - assert((user_session->sql_flags & SQLITE_InternChanges) == 0
> - || db->init.busy == 1);
26. Why?
>
> /* If one has been configured, invoke the rollback-hook callback */
> if (db->xRollbackCallback && (!pVdbe->auto_commit)) {
> diff --git a/src/box/sql/parse.y b/src/box/sql/parse.y
> index b2940b7c4..1b84dbcaa 100644
> --- a/src/box/sql/parse.y
> +++ b/src/box/sql/parse.y
> @@ -300,19 +301,23 @@ autoinc(X) ::= AUTOINCR. {X = 1;}
> // check fails.
> //
> %type refargs {int}
> -refargs(A) ::= . { A = ON_CONFLICT_ACTION_NONE*0x0101; /* EV: R-19803-45884 */}
> +refargs(A) ::= . { A = FKEY_NO_ACTION; }
> refargs(A) ::= refargs(A) refarg(Y). { A = (A & ~Y.mask) | Y.value; }
> %type refarg {struct {int value; int mask;}}
> -refarg(A) ::= MATCH nm. { A.value = 0; A.mask = 0x000000; }
> +refarg(A) ::= MATCH matcharg(X). { A.value = X<<16; A.mask = 0xff0000; }
27. Why exactly 16? Why can not remain 0, or be << 2, or << 4?
> refarg(A) ::= ON INSERT refact. { A.value = 0; A.mask = 0x000000; }
> refarg(A) ::= ON DELETE refact(X). { A.value = X; A.mask = 0x0000ff; }
> refarg(A) ::= ON UPDATE refact(X). { A.value = X<<8; A.mask = 0x00ff00; }
> diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
> index 5c5369aeb..2489b31b2 100644
> --- a/src/box/sql/sqliteInt.h
> +++ b/src/box/sql/sqliteInt.h> @@ -4280,8 +4271,57 @@ sql_trigger_colmask(Parse *parser, struct sql_trigger *trigger,
> #define sqlite3IsToplevel(p) ((p)->pToplevel==0)
>
> int sqlite3JoinType(Parse *, Token *, Token *, Token *);
> -void sqlite3CreateForeignKey(Parse *, ExprList *, Token *, ExprList *, int);
> -void sqlite3DeferForeignKey(Parse *, int);
> +
> +/**
> + * Change defer mode of last FK constraint processed during
> + * <CREATE TABLE> statement.
28. 'CREATE and ALTER', it is not?
> + *
> + * @param parse_context Current parsing context.
> + * @param is_deferred Change defer mode to this value.
> + */
> +void
> +fkey_change_defer_mode(struct Parse *parse_context, bool is_deferred);
> +
> diff --git a/test/sql-tap/fkey1.test.lua b/test/sql-tap/fkey1.test.lua
> index 494af4b4a..3c29b097d 100755
> --- a/test/sql-tap/fkey1.test.lua
> +++ b/test/sql-tap/fkey1.test.lua
> @@ -17,10 +17,10 @@ test:do_execsql_test(
> "fkey1-1.2",
> [[
> CREATE TABLE t1(
> - a INTEGER PRIMARY KEY,
> + a PRIMARY KEY,
29. Why not INTEGER? As I know, we are going to forbid type omitting.
Same for other tests (for example: fkey3.test.lua, orderby1.test.lua).
> b INTEGER
> REFERENCES t1 ON DELETE CASCADE
> - REFERENCES t2,
> + REFERENCES t2 (x),
> c TEXT,
> FOREIGN KEY (b, c) REFERENCES t2(x, y) ON UPDATE CASCADE);
> ]], {
> diff --git a/test/sql-tap/fkey4.test.lua b/test/sql-tap/fkey4.test.lua
> index 9415b62cb..9810ce22f 100755
> --- a/test/sql-tap/fkey4.test.lua
> +++ b/test/sql-tap/fkey4.test.lua
> @@ -186,7 +186,7 @@ test:do_execsql_test(
> DROP TABLE IF EXISTS c1;
> DROP TABLE IF EXISTS p1;
> CREATE TABLE p1(a PRIMARY KEY, b);
> - CREATE TABLE c1(x PRIMARY KEY REFERENCES p1 DEFERRABLE INITIALLY DEFERRED);
> + CREATE TABLE c1(x PRIMARY KEY REFERENCES p1);
30. Why?
> INSERT INTO p1 VALUES (1, 'one');
> INSERT INTO p1 VALUES (2, 'two');
> INSERT INTO c1 VALUES (1);
> diff --git a/test/sql-tap/table.test.lua b/test/sql-tap/table.test.lua
> index 6aa290742..24f494852 100755
> --- a/test/sql-tap/table.test.lua
> +++ b/test/sql-tap/table.test.lua
> @@ -791,14 +791,16 @@ test:do_catchsql_test(
> );
> ]], {
> -- <table-10.7>
> - 0
> + 1, "table \"T4\" doesn't feature column B"
> -- </table-10.7>
> })
>
> test:do_catchsql_test(
> "table-10.8",
> [[
> - DROP TABLE t6;
> + DROP TABLE IF EXISTS t6;
> + DROP TABLE IF EXISTS t4;
31. Indentation.
> + CREATE TABLE t4(x UNIQUE, y, PRIMARY KEY (x, y));
> CREATE TABLE t6(a primary key,b,c,
> FOREIGN KEY (b,c) REFERENCES t4(x,y) MATCH PARTIAL
> ON UPDATE SET NULL ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
> @@ -861,7 +863,7 @@ test:do_test(
> ]]
> end, {
> -- <table-10.12>
> - 1, [[unknown column "X" in foreign key definition]]
> + 1, [[no such column X]]
32. Can you keep the old message?
> -- </table-10.12>
> })
>
More information about the Tarantool-patches
mailing list