[patches] [constraints 2/2] sql: change constraints checks order
Bulat Niatshin
niatshin at tarantool.org
Mon Feb 12 21:07:28 MSK 2018
- Modify constraints checks execution order, if user specified ON
CONFLICT REPLACE
- Execute constraint check with ON CONFLICT REPLACE before BEFORE
triggers, if it fails, then remove conflicting entry and insert new,
after that - execute query as usual.
For #2963
Signed-off-by: Bulat Niatshin <niatshin at tarantool.org>
---
src/box/sql/insert.c | 219 +++++++++++++++++++++++++++++-------------------
src/box/sql/sqliteInt.h | 5 +-
src/box/sql/update.c | 14 +++-
3 files changed, 152 insertions(+), 86 deletions(-)
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index bc1906fb6..9bea98217 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -341,6 +341,9 @@ sqlite3Insert(Parse * pParse, /* Parser context */
int regData; /* register holding first column to insert */
int *aRegIdx = 0; /* One register allocated to each index */
+ int isReplace; /* Set to true if constraints may cause a replace */
+ int bUseSeek; /* True to use OPFLAG_SEEKRESULT */
+
#ifndef SQLITE_OMIT_TRIGGER
int isView; /* True if attempting to insert into a view */
Trigger *pTrigger; /* List of triggers on pTab, if required */
@@ -661,67 +664,10 @@ sqlite3Insert(Parse * pParse, /* Parser context */
VdbeCoverage(v);
}
- /* Run the BEFORE and INSTEAD OF triggers, if there are any
- */
- endOfLoop = sqlite3VdbeMakeLabel(v);
- if (tmask & TRIGGER_BEFORE) {
- int regCols = sqlite3GetTempRange(pParse, pTab->nCol + 1);
-
- /* Create the new column data
- */
- for (i = j = 0; i < pTab->nCol; i++) {
- if (pColumn) {
- for (j = 0; j < pColumn->nId; j++) {
- if (pColumn->a[j].idx == i)
- break;
- }
- }
- if ((!useTempTable && !pList)
- || (pColumn && j >= pColumn->nId)
- || (pColumn == 0
- && IsOrdinaryHiddenColumn(&pTab->aCol[i]))) {
- if (i == pTab->iAutoIncPKey)
- sqlite3VdbeAddOp2(v, OP_Integer, -1,
- regCols + i + 1);
- else
- sqlite3ExprCode(pParse,
- pTab->aCol[i].pDflt,
- regCols + i + 1);
- } else if (useTempTable) {
- sqlite3VdbeAddOp3(v, OP_Column, srcTab, j,
- regCols + i + 1);
- } else {
- assert(pSelect == 0); /* Otherwise useTempTable is true */
- sqlite3ExprCodeAndCache(pParse,
- pList->a[j].pExpr,
- regCols + i + 1);
- }
- if (pColumn == 0
- && !IsOrdinaryHiddenColumn(&pTab->aCol[i]))
- j++;
- }
-
- /* If this is an INSERT on a view with an INSTEAD OF INSERT trigger,
- * do not attempt any conversions before assembling the record.
- * If this is a real table, attempt conversions as required by the
- * table column affinities.
- */
- if (!isView) {
- sqlite3TableAffinity(v, pTab, regCols + 1);
- }
-
- /* Fire BEFORE or INSTEAD OF triggers */
- sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
- TRIGGER_BEFORE, pTab,
- regCols - pTab->nCol - 1, onError,
- endOfLoop);
-
- sqlite3ReleaseTempRange(pParse, regCols, pTab->nCol + 1);
- }
-
/* Compute the content of the next row to insert into a range of
* registers beginning at regIns.
*/
+ endOfLoop = sqlite3VdbeMakeLabel(v);
if (!isView) {
if (ipkColumn >= 0) {
if (useTempTable) {
@@ -836,18 +782,77 @@ sqlite3Insert(Parse * pParse, /* Parser context */
iRegStore);
}
}
-
- /* Generate code to check constraints and generate index keys
- and do the insertion.
- */
- int isReplace; /* Set to true if constraints may cause a replace */
- int bUseSeek; /* True to use OPFLAG_SEEKRESULT */
sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
iIdxCur, regIns, 0,
ipkColumn >= 0, onError,
- endOfLoop, &isReplace, 0);
- sqlite3FkCheck(pParse, pTab, 0, regIns, 0);
+ endOfLoop, &isReplace, 0,
+ CONSTRAINT_CHECK_REPLACE);
+ }
+ /* Run the BEFORE and INSTEAD OF triggers, if there are any
+ */
+ if (tmask & TRIGGER_BEFORE) {
+ int regCols = sqlite3GetTempRange(pParse, pTab->nCol + 1);
+
+ /* Create the new column data
+ */
+ for (i = j = 0; i < pTab->nCol; i++) {
+ if (pColumn) {
+ for (j = 0; j < pColumn->nId; j++) {
+ if (pColumn->a[j].idx == i)
+ break;
+ }
+ }
+ if ((!useTempTable && !pList)
+ || (pColumn && j >= pColumn->nId)
+ || (pColumn == 0
+ && IsOrdinaryHiddenColumn(&pTab->aCol[i]))) {
+ if (i == pTab->iAutoIncPKey)
+ sqlite3VdbeAddOp2(v, OP_Integer, -1,
+ regCols + i + 1);
+ else
+ sqlite3ExprCode(pParse,
+ pTab->aCol[i].pDflt,
+ regCols + i + 1);
+ } else if (useTempTable) {
+ sqlite3VdbeAddOp3(v, OP_Column, srcTab, j,
+ regCols + i + 1);
+ } else {
+ assert(pSelect == 0); /* Otherwise useTempTable is true */
+ sqlite3ExprCodeAndCache(pParse,
+ pList->a[j].pExpr,
+ regCols + i + 1);
+ }
+ if (pColumn == 0
+ && !IsOrdinaryHiddenColumn(&pTab->aCol[i]))
+ j++;
+ }
+
+ /* If this is an INSERT on a view with an INSTEAD OF INSERT trigger,
+ * do not attempt any conversions before assembling the record.
+ * If this is a real table, attempt conversions as required by the
+ * table column affinities.
+ */
+ if (!isView) {
+ sqlite3TableAffinity(v, pTab, regCols + 1);
+ }
+
+ /* Fire BEFORE or INSTEAD OF triggers */
+ sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
+ TRIGGER_BEFORE, pTab,
+ regCols - pTab->nCol - 1, onError,
+ endOfLoop);
+
+ sqlite3ReleaseTempRange(pParse, regCols, pTab->nCol + 1);
+ }
+
+ /* Update the count of rows that are inserted
+ */
+ if ((user_session->sql_flags & SQLITE_CountRows) != 0) {
+ sqlite3VdbeAddOp2(v, OP_AddImm, regRowCount, 1);
+ }
+
+ if (!isView) {
/* Set the OPFLAG_USESEEKRESULT flag if either (a) there are no REPLACE
* constraints or (b) there are no triggers and this table is not a
* parent table in a foreign key constraint. It is safe to set the
@@ -857,6 +862,17 @@ sqlite3Insert(Parse * pParse, /* Parser context */
* VdbeCursor.seekResult variable, disabling the OPFLAG_USESEEKRESULT
* functionality.
*/
+
+ /* Generate code to check constraints and generate index keys
+ and do the insertion.
+ */
+ sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
+ iIdxCur, regIns, 0,
+ ipkColumn >= 0, onError,
+ endOfLoop, &isReplace, 0,
+ CONSTRAINT_MAIN_MODE);
+ sqlite3FkCheck(pParse, pTab, 0, regIns, 0);
+
bUseSeek = isReplace == 0 || (pTrigger == 0 &&
((user_session->sql_flags &
SQLITE_ForeignKeys) == 0 ||
@@ -865,12 +881,6 @@ sqlite3Insert(Parse * pParse, /* Parser context */
bUseSeek, onError);
}
- /* Update the count of rows that are inserted
- */
- if ((user_session->sql_flags & SQLITE_CountRows) != 0) {
- sqlite3VdbeAddOp2(v, OP_AddImm, regRowCount, 1);
- }
-
if (pTrigger) {
/* Code AFTER triggers */
sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
@@ -1066,7 +1076,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
u8 overrideError, /* Override onError to this if not OE_Default */
int ignoreDest, /* Jump to this label on an OE_Ignore resolution */
int *pbMayReplace, /* OUT: Set to true if constraint may cause a replace */
- int *aiChng) /* column i is unchanged if aiChng[i]<0 */
+ int *aiChng, /* column i is unchanged if aiChng[i]<0 */
+ int mode)
{
Vdbe *v; /* VDBE under constrution */
Index *pIdx; /* Pointer to one of the indices */
@@ -1112,15 +1123,24 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
continue; /* This column is allowed to be NULL */
if (overrideError != OE_Default) {
onError = overrideError;
- } else if (onError == OE_Default) {
+ } else if (onError == OE_Default && mode == CONSTRAINT_MAIN_MODE) {
onError = OE_Abort;
}
+ if (mode == CONSTRAINT_MAIN_MODE && onError == OE_Replace) {
+ onError = OE_Abort;
+ }
+ if (mode == CONSTRAINT_CHECK_REPLACE && onError != OE_Replace) {
+ continue;
+ }
if (onError == OE_Replace && pTab->aCol[i].pDflt == 0) {
onError = OE_Abort;
}
- assert(onError == OE_Rollback || onError == OE_Abort
- || onError == OE_Fail || onError == OE_Ignore
- || onError == OE_Replace);
+
+ assert((mode == CONSTRAINT_MAIN_MODE && (onError == OE_Rollback
+ || onError == OE_Abort || onError == OE_Fail
+ || onError == OE_Ignore)) ||
+ (mode == CONSTRAINT_CHECK_REPLACE && (onError == OE_Abort ||
+ onError == OE_Replace)));
switch (onError) {
case OE_Abort:
sqlite3MayAbort(pParse);
@@ -1162,8 +1182,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
/* Test all CHECK constraints
*/
#ifndef SQLITE_OMIT_CHECK
- if (pTab->pCheck && (user_session->sql_flags &
- SQLITE_IgnoreChecks) == 0) {
+ if (pTab->pCheck && mode == CONSTRAINT_MAIN_MODE &&
+ (user_session->sql_flags & SQLITE_IgnoreChecks) == 0) {
ExprList *pCheck = pTab->pCheck;
pParse->ckBase = regNewData + 1;
onError =
@@ -1216,12 +1236,41 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
if (aRegIdx[ix] == 0)
continue; /* Skip indices that do not change */
+
+ iThisCur = iIdxCur + ix;
+ addrUniqueOk = sqlite3VdbeMakeLabel(v);
+
+ /* Find out what action to take in case there is a uniqueness conflict */
+ onError = pIdx->onError;
+ if (!IsUniqueIndex(pIdx)) {
+ sqlite3VdbeResolveLabel(v, addrUniqueOk);
+ continue; /* pIdx is not a UNIQUE index */
+ }
+ if (overrideError != OE_Default) {
+ onError = overrideError;
+ } else if (onError == OE_Default) {
+ onError = OE_Abort;
+ }
+ /* If we are checking ON CONFLICT REPLACE before UPDATE/INSERT,
+ * then skip iteration if onError is not OE_Replace
+ */
+ if (mode == CONSTRAINT_CHECK_REPLACE && onError != OE_Replace) {
+ sqlite3VdbeResolveLabel(v, addrUniqueOk);
+ continue;
+ }
+ /* When we check all UNIQUE constraints during UPDATE/INSERT,
+ * onError action for constraints with ON CONFLICT REPLACE
+ * should be ABORT, because REPLACE actions have been already
+ * done in the beginning of insertion/update.
+ */
+ if (mode == CONSTRAINT_MAIN_MODE && onError == OE_Replace) {
+ onError = OE_Abort;
+ }
+
if (bAffinityDone == 0) {
sqlite3TableAffinity(v, pTab, regNewData+1);
bAffinityDone = 1;
}
- iThisCur = iIdxCur + ix;
- addrUniqueOk = sqlite3VdbeMakeLabel(v);
/* Skip partial indices for which the WHERE clause is not true */
if (pIdx->pPartIdxWhere) {
@@ -1409,9 +1458,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
}
/* Generate code that executes if the new index entry is not unique */
- assert(onError == OE_Rollback || onError == OE_Abort
- || onError == OE_Fail || onError == OE_Ignore
- || onError == OE_Replace);
+ assert((mode == CONSTRAINT_MAIN_MODE && (onError == OE_Rollback
+ || onError == OE_Abort || onError == OE_Fail
+ || onError == OE_Ignore || onError == OE_Replace)) ||
+ (mode == CONSTRAINT_CHECK_REPLACE && (onError == OE_Abort ||
+ onError == OE_Replace)));
switch (onError) {
case OE_Fail:
case OE_Rollback:{
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 7caf7cb19..a6f5472e7 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -1559,6 +1559,9 @@ struct FKey {
#define OE_Default 10 /* Do whatever the default action is */
+#define CONSTRAINT_CHECK_REPLACE 1
+#define CONSTRAINT_MAIN_MODE 2
+
/*
* An instance of the following structure is passed as the first
* argument to sqlite3VdbeKeyCompare and is used to control the
@@ -3283,7 +3286,7 @@ int sqlite3GenerateIndexKey(Parse *, Index *, int, int, int, int *, Index *,
int);
void sqlite3ResolvePartIdxLabel(Parse *, int);
void sqlite3GenerateConstraintChecks(Parse *, Table *, int *, int, int, int,
- int, u8, u8, int, int *, int *);
+ int, u8, u8, int, int *, int *, int);
void sqlite3CompleteInsertion(Parse *, Table *, int, int *, int, u8);
int sqlite3OpenTableAndIndices(Parse *, Table *, int, u8, int, u8 *, int *,
int *, u8, u8);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 8d0fe73ff..83272136e 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -544,6 +544,18 @@ sqlite3Update(Parse * pParse, /* The parser context */
}
}
+ if (!isView) {
+ int bReplace = 0; /* True if REPLACE conflict resolution might happen */
+
+ /* Do constraint checks. */
+ assert(regOldPk > 0);
+ sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
+ iIdxCur, regNewPk,
+ regOldPk, chngPk, onError,
+ labelContinue, &bReplace,
+ aXRef, CONSTRAINT_CHECK_REPLACE);
+ }
+
/* Fire any BEFORE UPDATE triggers. This happens before constraints are
* verified. One could argue that this is wrong.
*/
@@ -593,7 +605,7 @@ sqlite3Update(Parse * pParse, /* The parser context */
iIdxCur, regNewPk,
regOldPk, chngPk, onError,
labelContinue, &bReplace,
- aXRef);
+ aXRef, CONSTRAINT_MAIN_MODE);
/* Do FK constraint checks. */
if (hasFK) {
--
2.14.1
More information about the Tarantool-patches
mailing list