[patches] [constraints 2/2] sql: change constraints checks order

Bulat Niatshin niatshin at tarantool.org
Mon Feb 12 21:07:28 MSK 2018


- Modify constraints checks execution order, if user specified ON
  CONFLICT REPLACE
- Execute constraint check with ON CONFLICT REPLACE before BEFORE
  triggers, if it fails, then remove conflicting entry and insert new,
  after that - execute query as usual.

For #2963

Signed-off-by: Bulat Niatshin <niatshin at tarantool.org>
---
 src/box/sql/insert.c    | 219 +++++++++++++++++++++++++++++-------------------
 src/box/sql/sqliteInt.h |   5 +-
 src/box/sql/update.c    |  14 +++-
 3 files changed, 152 insertions(+), 86 deletions(-)

diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index bc1906fb6..9bea98217 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -341,6 +341,9 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 	int regData;		/* register holding first column to insert */
 	int *aRegIdx = 0;	/* One register allocated to each index */
 
+	int isReplace;	/* Set to true if constraints may cause a replace */
+	int bUseSeek;	/* True to use OPFLAG_SEEKRESULT */
+
 #ifndef SQLITE_OMIT_TRIGGER
 	int isView;		/* True if attempting to insert into a view */
 	Trigger *pTrigger;	/* List of triggers on pTab, if required */
@@ -661,67 +664,10 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 		VdbeCoverage(v);
 	}
 
-	/* Run the BEFORE and INSTEAD OF triggers, if there are any
-	 */
-	endOfLoop = sqlite3VdbeMakeLabel(v);
-	if (tmask & TRIGGER_BEFORE) {
-		int regCols = sqlite3GetTempRange(pParse, pTab->nCol + 1);
-
-		/* Create the new column data
-		 */
-		for (i = j = 0; i < pTab->nCol; i++) {
-			if (pColumn) {
-				for (j = 0; j < pColumn->nId; j++) {
-					if (pColumn->a[j].idx == i)
-						break;
-				}
-			}
-			if ((!useTempTable && !pList)
-			    || (pColumn && j >= pColumn->nId)
-			    || (pColumn == 0
-				&& IsOrdinaryHiddenColumn(&pTab->aCol[i]))) {
-				if (i == pTab->iAutoIncPKey)
-					sqlite3VdbeAddOp2(v, OP_Integer, -1,
-							  regCols + i + 1);
-				else
-					sqlite3ExprCode(pParse,
-							pTab->aCol[i].pDflt,
-							regCols + i + 1);
-			} else if (useTempTable) {
-				sqlite3VdbeAddOp3(v, OP_Column, srcTab, j,
-						  regCols + i + 1);
-			} else {
-				assert(pSelect == 0);	/* Otherwise useTempTable is true */
-				sqlite3ExprCodeAndCache(pParse,
-							pList->a[j].pExpr,
-							regCols + i + 1);
-			}
-			if (pColumn == 0
-			    && !IsOrdinaryHiddenColumn(&pTab->aCol[i]))
-				j++;
-		}
-
-		/* If this is an INSERT on a view with an INSTEAD OF INSERT trigger,
-		 * do not attempt any conversions before assembling the record.
-		 * If this is a real table, attempt conversions as required by the
-		 * table column affinities.
-		 */
-		if (!isView) {
-			sqlite3TableAffinity(v, pTab, regCols + 1);
-		}
-
-		/* Fire BEFORE or INSTEAD OF triggers */
-		sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
-				      TRIGGER_BEFORE, pTab,
-				      regCols - pTab->nCol - 1, onError,
-				      endOfLoop);
-
-		sqlite3ReleaseTempRange(pParse, regCols, pTab->nCol + 1);
-	}
-
 	/* Compute the content of the next row to insert into a range of
 	 * registers beginning at regIns.
 	 */
+	endOfLoop = sqlite3VdbeMakeLabel(v);
 	if (!isView) {
 		if (ipkColumn >= 0) {
 			if (useTempTable) {
@@ -836,18 +782,77 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 						iRegStore);
 			}
 		}
-
-		/* Generate code to check constraints and generate index keys
-		   and do the insertion.
-		 */
-		int isReplace;	/* Set to true if constraints may cause a replace */
-		int bUseSeek;	/* True to use OPFLAG_SEEKRESULT */
 		sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
 						iIdxCur, regIns, 0,
 						ipkColumn >= 0, onError,
-						endOfLoop, &isReplace, 0);
-		sqlite3FkCheck(pParse, pTab, 0, regIns, 0);
+						endOfLoop, &isReplace, 0,
+						CONSTRAINT_CHECK_REPLACE);
+	}
 
+	/* Run the BEFORE and INSTEAD OF triggers, if there are any
+	 */
+	if (tmask & TRIGGER_BEFORE) {
+		int regCols = sqlite3GetTempRange(pParse, pTab->nCol + 1);
+
+		/* Create the new column data
+		 */
+		for (i = j = 0; i < pTab->nCol; i++) {
+			if (pColumn) {
+				for (j = 0; j < pColumn->nId; j++) {
+					if (pColumn->a[j].idx == i)
+						break;
+				}
+			}
+			if ((!useTempTable && !pList)
+			    || (pColumn && j >= pColumn->nId)
+			    || (pColumn == 0
+				&& IsOrdinaryHiddenColumn(&pTab->aCol[i]))) {
+				if (i == pTab->iAutoIncPKey)
+					sqlite3VdbeAddOp2(v, OP_Integer, -1,
+							  regCols + i + 1);
+				else
+					sqlite3ExprCode(pParse,
+							pTab->aCol[i].pDflt,
+							regCols + i + 1);
+			} else if (useTempTable) {
+				sqlite3VdbeAddOp3(v, OP_Column, srcTab, j,
+						  regCols + i + 1);
+			} else {
+				assert(pSelect == 0);	/* Otherwise useTempTable is true */
+				sqlite3ExprCodeAndCache(pParse,
+							pList->a[j].pExpr,
+							regCols + i + 1);
+			}
+			if (pColumn == 0
+			    && !IsOrdinaryHiddenColumn(&pTab->aCol[i]))
+				j++;
+		}
+
+		/* If this is an INSERT on a view with an INSTEAD OF INSERT trigger,
+		 * do not attempt any conversions before assembling the record.
+		 * If this is a real table, attempt conversions as required by the
+		 * table column affinities.
+		 */
+		if (!isView) {
+			sqlite3TableAffinity(v, pTab, regCols + 1);
+		}
+
+		/* Fire BEFORE or INSTEAD OF triggers */
+		sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
+				      TRIGGER_BEFORE, pTab,
+				      regCols - pTab->nCol - 1, onError,
+				      endOfLoop);
+
+		sqlite3ReleaseTempRange(pParse, regCols, pTab->nCol + 1);
+	}
+
+	/* Update the count of rows that are inserted
+	 */
+	if ((user_session->sql_flags & SQLITE_CountRows) != 0) {
+		sqlite3VdbeAddOp2(v, OP_AddImm, regRowCount, 1);
+	}
+
+	if (!isView) {
 		/* Set the OPFLAG_USESEEKRESULT flag if either (a) there are no REPLACE
 		 * constraints or (b) there are no triggers and this table is not a
 		 * parent table in a foreign key constraint. It is safe to set the
@@ -857,6 +862,17 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 		 * VdbeCursor.seekResult variable, disabling the OPFLAG_USESEEKRESULT
 		 * functionality.
 		 */
+
+		/* Generate code to check constraints and generate index keys
+		   and do the insertion.
+		 */
+		sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
+						iIdxCur, regIns, 0,
+						ipkColumn >= 0, onError,
+						endOfLoop, &isReplace, 0,
+						CONSTRAINT_MAIN_MODE);
+		sqlite3FkCheck(pParse, pTab, 0, regIns, 0);
+
 		bUseSeek = isReplace == 0 || (pTrigger == 0 &&
 					      ((user_session->sql_flags &
 						SQLITE_ForeignKeys) == 0 ||
@@ -865,12 +881,6 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 					 bUseSeek, onError);
 	}
 
-	/* Update the count of rows that are inserted
-	 */
-	if ((user_session->sql_flags & SQLITE_CountRows) != 0) {
-		sqlite3VdbeAddOp2(v, OP_AddImm, regRowCount, 1);
-	}
-
 	if (pTrigger) {
 		/* Code AFTER triggers */
 		sqlite3CodeRowTrigger(pParse, pTrigger, TK_INSERT, 0,
@@ -1066,7 +1076,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 				u8 overrideError,	/* Override onError to this if not OE_Default */
 				int ignoreDest,		/* Jump to this label on an OE_Ignore resolution */
 				int *pbMayReplace,	/* OUT: Set to true if constraint may cause a replace */
-				int *aiChng)		/* column i is unchanged if aiChng[i]<0 */
+				int *aiChng,		/* column i is unchanged if aiChng[i]<0 */
+				int mode)
 {
 	Vdbe *v;		/* VDBE under constrution */
 	Index *pIdx;		/* Pointer to one of the indices */
@@ -1112,15 +1123,24 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 			continue;	/* This column is allowed to be NULL */
 		if (overrideError != OE_Default) {
 			onError = overrideError;
-		} else if (onError == OE_Default) {
+		} else if (onError == OE_Default && mode == CONSTRAINT_MAIN_MODE) {
 			onError = OE_Abort;
 		}
+		if (mode == CONSTRAINT_MAIN_MODE && onError == OE_Replace) {
+			onError = OE_Abort;
+		}
+		if (mode == CONSTRAINT_CHECK_REPLACE && onError != OE_Replace) {
+			continue;
+		}
 		if (onError == OE_Replace && pTab->aCol[i].pDflt == 0) {
 			onError = OE_Abort;
 		}
-		assert(onError == OE_Rollback || onError == OE_Abort
-		       || onError == OE_Fail || onError == OE_Ignore
-		       || onError == OE_Replace);
+
+		assert((mode == CONSTRAINT_MAIN_MODE && (onError == OE_Rollback
+		       || onError == OE_Abort || onError == OE_Fail
+		       || onError == OE_Ignore)) ||
+		       (mode == CONSTRAINT_CHECK_REPLACE && (onError == OE_Abort ||
+		       onError == OE_Replace)));
 		switch (onError) {
 		case OE_Abort:
 			sqlite3MayAbort(pParse);
@@ -1162,8 +1182,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 	/* Test all CHECK constraints
 	 */
 #ifndef SQLITE_OMIT_CHECK
-	if (pTab->pCheck && (user_session->sql_flags &
-			     SQLITE_IgnoreChecks) == 0) {
+	if (pTab->pCheck && mode == CONSTRAINT_MAIN_MODE &&
+	    (user_session->sql_flags & SQLITE_IgnoreChecks) == 0) {
 		ExprList *pCheck = pTab->pCheck;
 		pParse->ckBase = regNewData + 1;
 		onError =
@@ -1216,12 +1236,41 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 
 		if (aRegIdx[ix] == 0)
 			continue;	/* Skip indices that do not change */
+
+		iThisCur = iIdxCur + ix;
+		addrUniqueOk = sqlite3VdbeMakeLabel(v);
+
+		/* Find out what action to take in case there is a uniqueness conflict */
+		onError = pIdx->onError;
+		if (!IsUniqueIndex(pIdx)) {
+			sqlite3VdbeResolveLabel(v, addrUniqueOk);
+			continue;	/* pIdx is not a UNIQUE index */
+		}
+		if (overrideError != OE_Default) {
+			onError = overrideError;
+		} else if (onError == OE_Default) {
+			onError = OE_Abort;
+		}
+		/* If we are checking ON CONFLICT REPLACE before UPDATE/INSERT,
+		 * then skip iteration if onError is not OE_Replace
+		 */
+		if (mode == CONSTRAINT_CHECK_REPLACE && onError != OE_Replace) {
+			sqlite3VdbeResolveLabel(v, addrUniqueOk);
+			continue;
+		}
+		/* When we check all UNIQUE constraints during UPDATE/INSERT,
+		 * onError action for constraints with ON CONFLICT REPLACE
+		 * should be ABORT, because REPLACE actions have been already
+		 * done in the beginning of insertion/update.
+		 */
+		if (mode == CONSTRAINT_MAIN_MODE && onError == OE_Replace) {
+			onError = OE_Abort;
+		}
+
 		if (bAffinityDone == 0) {
 			sqlite3TableAffinity(v, pTab, regNewData+1);
 			bAffinityDone = 1;
 		}
-		iThisCur = iIdxCur + ix;
-		addrUniqueOk = sqlite3VdbeMakeLabel(v);
 
 		/* Skip partial indices for which the WHERE clause is not true */
 		if (pIdx->pPartIdxWhere) {
@@ -1409,9 +1458,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 		}
 
 		/* Generate code that executes if the new index entry is not unique */
-		assert(onError == OE_Rollback || onError == OE_Abort
-		       || onError == OE_Fail || onError == OE_Ignore
-		       || onError == OE_Replace);
+		assert((mode == CONSTRAINT_MAIN_MODE && (onError == OE_Rollback
+		       || onError == OE_Abort || onError == OE_Fail
+		       || onError == OE_Ignore || onError == OE_Replace)) ||
+		       (mode == CONSTRAINT_CHECK_REPLACE && (onError == OE_Abort ||
+		       onError == OE_Replace)));
 		switch (onError) {
 		case OE_Fail:
 		case OE_Rollback:{
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 7caf7cb19..a6f5472e7 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -1559,6 +1559,9 @@ struct FKey {
 
 #define OE_Default  10		/* Do whatever the default action is */
 
+#define CONSTRAINT_CHECK_REPLACE 1
+#define CONSTRAINT_MAIN_MODE 2
+
 /*
  * An instance of the following structure is passed as the first
  * argument to sqlite3VdbeKeyCompare and is used to control the
@@ -3283,7 +3286,7 @@ int sqlite3GenerateIndexKey(Parse *, Index *, int, int, int, int *, Index *,
 			    int);
 void sqlite3ResolvePartIdxLabel(Parse *, int);
 void sqlite3GenerateConstraintChecks(Parse *, Table *, int *, int, int, int,
-				     int, u8, u8, int, int *, int *);
+				     int, u8, u8, int, int *, int *, int);
 void sqlite3CompleteInsertion(Parse *, Table *, int, int *, int, u8);
 int sqlite3OpenTableAndIndices(Parse *, Table *, int, u8, int, u8 *, int *,
 			       int *, u8, u8);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 8d0fe73ff..83272136e 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -544,6 +544,18 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		}
 	}
 
+	if (!isView) {
+		int bReplace = 0;	/* True if REPLACE conflict resolution might happen */
+
+		/* Do constraint checks. */
+		assert(regOldPk > 0);
+		sqlite3GenerateConstraintChecks(pParse, pTab, aRegIdx, iDataCur,
+						iIdxCur, regNewPk,
+						regOldPk, chngPk, onError,
+						labelContinue, &bReplace,
+						aXRef, CONSTRAINT_CHECK_REPLACE);
+	}
+
 	/* Fire any BEFORE UPDATE triggers. This happens before constraints are
 	 * verified. One could argue that this is wrong.
 	 */
@@ -593,7 +605,7 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 						iIdxCur, regNewPk,
 						regOldPk, chngPk, onError,
 						labelContinue, &bReplace,
-						aXRef);
+						aXRef, CONSTRAINT_MAIN_MODE);
 
 		/* Do FK constraint checks. */
 		if (hasFK) {
-- 
2.14.1




More information about the Tarantool-patches mailing list