[patches] [sql 8/9] sql: fixed code generation for VIEWs

Nikita Pettik korablev at tarantool.org
Tue Jan 23 23:19:19 MSK 2018


Since VIEW materializes into ephemeral table with ROWID in original SQLite,
it was requited to adapt code generation for UPDATE and DELETE
statements on VIEWs.

Part of #2680

Signed-off-by: Nikita Pettik <korablev at tarantool.org>
---
 src/box/sql/delete.c |  92 ++++++++++++++---------------------
 src/box/sql/update.c | 134 +++++++++++++++++++++++----------------------------
 2 files changed, 96 insertions(+), 130 deletions(-)

diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 172509b10..2f9c33b7b 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -250,13 +250,12 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 	int eOnePass;		/* ONEPASS_OFF or _SINGLE or _MULTI */
 	int aiCurOnePass[2];	/* The write cursors opened by WHERE_ONEPASS */
 	u8 *aToOpen = 0;	/* Open cursor iTabCur+j if aToOpen[j] is true */
-	Index *pPk;		/* The PRIMARY KEY index on the table */
+	Index *pPk = 0;		/* The PRIMARY KEY index on the table */
 	int iPk = 0;		/* First of nPk registers holding PRIMARY KEY value */
 	i16 nPk;		/* Number of columns in the PRIMARY KEY */
 	int iKey;		/* Memory cell holding key of row to be deleted */
 	i16 nKey;		/* Number of memory cells in the row key */
 	int iEphCur = 0;	/* Ephemeral table holding all primary key values */
-	int iRowSet = 0;	/* Register for rowset of rows to delete */
 	int addrBypass = 0;	/* Address of jump over the delete logic */
 	int addrLoop = 0;	/* Top of the delete loop */
 	int addrEphOpen = 0;	/* Instruction to open the Ephemeral table */
@@ -382,10 +381,7 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 #endif
 	    ) {
 		assert(!isView);
-		if (HasRowid(pTab)) {
-			sqlite3VdbeAddOp4(v, OP_Clear, pTab->tnum, 0, memCnt,
-					  pTab->zName, P4_STATIC);
-		}
+
 		for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
 			assert(pIdx->pSchema == pTab->pSchema);
 			sqlite3VdbeAddOp2(v, OP_Clear, pIdx->tnum, 0);
@@ -404,16 +400,20 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 		if (sNC.ncFlags & NC_VarSelect)
 			bComplex = 1;
 		wcf |= (bComplex ? 0 : WHERE_ONEPASS_MULTIROW);
-		if (HasRowid(pTab)) {
-			/* For a rowid table, initialize the RowSet to an empty set */
-			pPk = 0;
-			nPk = 1;
-			iRowSet = ++pParse->nMem;
-			sqlite3VdbeAddOp2(v, OP_Null, 0, iRowSet);
+		/* Create an ephemeral table used to hold all primary keys for
+		 * rows to be deleted. Since VIEW is held in ephemeral table,
+		 * there is no PK for it, so columns should be loaded manually.
+		 */
+		if (isView) {
+			nPk = pTab->nCol;
+			iPk = pParse->nMem + 1;
+			pParse->nMem += nPk;
+			iEphCur = pParse->nTab++;
+			KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nPk, 0);
+			addrEphOpen =
+				sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEphCur,
+						  nPk, 0, (char*) pKeyInfo, P4_KEYINFO);
 		} else {
-			/* For a WITHOUT ROWID table, create an ephemeral table used to
-			 * hold all primary keys for rows to be deleted.
-			 */
 			pPk = sqlite3PrimaryKeyIndex(pTab);
 			assert(pPk != 0);
 			nPk = pPk->nKeyCol;
@@ -449,8 +449,8 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 			sqlite3VdbeAddOp2(v, OP_AddImm, memCnt, 1);
 		}
 
-		/* Extract the rowid or primary key for the current row */
-		if (pPk) {
+		/* Extract the primary key for the current row */
+		if (!isView) {
 			for (i = 0; i < nPk; i++) {
 				assert(pPk->aiColumn[i] >= 0);
 				sqlite3ExprCodeGetColumnOfTable(v, pTab,
@@ -459,15 +459,13 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 								aiColumn[i],
 								iPk + i);
 			}
-			iKey = iPk;
 		} else {
-			iKey = pParse->nMem + 1;
-			iKey =
-			    sqlite3ExprCodeGetColumn(pParse, pTab, -1, iTabCur,
-						     iKey, 0);
-			if (iKey > pParse->nMem)
-				pParse->nMem = iKey;
+			for (i = 0; i < nPk; i++) {
+				sqlite3VdbeAddOp3(v, OP_Column, iDataCur,
+						  i, iPk + i);
+			}
 		}
+		iKey = iPk;
 
 		if (eOnePass != ONEPASS_OFF) {
 			/* For ONEPASS, no need to store the rowid/primary-key. There is only
@@ -489,22 +487,13 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 			if (addrEphOpen)
 				sqlite3VdbeChangeToNoop(v, addrEphOpen);
 		} else {
-			if (pPk) {
-				/* Add the PK key for this row to the temporary table */
-				iKey = ++pParse->nMem;
-				nKey = 0;	/* Zero tells OP_Found to use a composite key */
-				sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk,
-						  iKey,
-						  sqlite3IndexAffinityStr
-						  (pParse->db, pPk), nPk);
-				sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEphCur,
-						     iKey, iPk, nPk);
-			} else {
-				/* Add the rowid of the row to be deleted to the RowSet */
-				nKey = 1;	/* OP_Seek always uses a single rowid */
-				sqlite3VdbeAddOp2(v, OP_RowSetAdd, iRowSet,
-						  iKey);
-			}
+			/* Add the PK key for this row to the temporary table */
+			iKey = ++pParse->nMem;
+			nKey = 0;	/* Zero tells OP_Found to use a composite key */
+			const char *zAff = isView ? 0 :
+					  sqlite3IndexAffinityStr(pParse->db, pPk);
+			sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, iKey, zAff, nPk);
+			sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEphCur, iKey, iPk, nPk);
 		}
 
 		/* If this DELETE cannot use the ONEPASS strategy, this is the
@@ -537,7 +526,7 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 				sqlite3VdbeJumpHere(v, iAddrOnce);
 		}
 
-		/* Set up a loop over the rowids/primary-keys that were found in the
+		/* Set up a loop over the primary-keys that were found in the
 		 * where-clause loop above.
 		 */
 		if (eOnePass != ONEPASS_OFF) {
@@ -546,19 +535,13 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 				assert(pPk != 0 || pTab->pSelect != 0);
 				sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
 						     addrBypass, iKey, nKey);
+
 				VdbeCoverage(v);
 			}
-		} else if (pPk) {
+		} else {
 			addrLoop = sqlite3VdbeAddOp1(v, OP_Rewind, iEphCur);
 			VdbeCoverage(v);
 			sqlite3VdbeAddOp2(v, OP_RowData, iEphCur, iKey);
-			assert(nKey == 0);	/* OP_Found will use a composite key */
-		} else {
-			addrLoop =
-			    sqlite3VdbeAddOp3(v, OP_RowSetRead, iRowSet, 0,
-					      iKey);
-			VdbeCoverage(v);
-			assert(nKey == 1);
 		}
 
 		/* Delete the row */
@@ -581,13 +564,10 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 		if (eOnePass != ONEPASS_OFF) {
 			sqlite3VdbeResolveLabel(v, addrBypass);
 			sqlite3WhereEnd(pWInfo);
-		} else if (pPk) {
+		} else {
 			sqlite3VdbeAddOp2(v, OP_Next, iEphCur, addrLoop + 1);
 			VdbeCoverage(v);
 			sqlite3VdbeJumpHere(v, addrLoop);
-		} else {
-			sqlite3VdbeGoto(v, addrLoop);
-			sqlite3VdbeJumpHere(v, addrLoop);
 		}
 	}			/* End non-truncate path */
 
@@ -740,7 +720,6 @@ sqlite3GenerateRowDelete(Parse * pParse,	/* Parsing context */
 	Vdbe *v = pParse->pVdbe;	/* Vdbe */
 	int iOld = 0;		/* First register in OLD.* array */
 	int iLabel;		/* Label resolved to end of generated code */
-	u8 opSeek;		/* Seek opcode */
 
 	/* Vdbe is guaranteed to have been allocated by this stage. */
 	assert(v);
@@ -753,9 +732,8 @@ sqlite3GenerateRowDelete(Parse * pParse,	/* Parsing context */
 	 * not attempt to delete it or fire any DELETE triggers.
 	 */
 	iLabel = sqlite3VdbeMakeLabel(v);
-	opSeek = HasRowid(pTab) ? OP_NotExists : OP_NotFound;
 	if (eMode == ONEPASS_OFF) {
-		sqlite3VdbeAddOp4Int(v, opSeek, iDataCur, iLabel, iPk, nPk);
+		sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, iLabel, iPk, nPk);
 		VdbeCoverageIf(v, opSeek == OP_NotExists);
 		VdbeCoverageIf(v, opSeek == OP_NotFound);
 	}
@@ -807,7 +785,7 @@ sqlite3GenerateRowDelete(Parse * pParse,	/* Parsing context */
 		 * pointing to.
 		 */
 		if (addrStart < sqlite3VdbeCurrentAddr(v)) {
-			sqlite3VdbeAddOp4Int(v, opSeek, iDataCur, iLabel, iPk,
+			sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, iLabel, iPk,
 					     nPk);
 			VdbeCoverageIf(v, opSeek == OP_NotExists);
 			VdbeCoverageIf(v, opSeek == OP_NotFound);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 067f2f18f..decf398aa 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -148,7 +148,6 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	int regNewRowid = 0;	/* The new rowid */
 	int regNew = 0;		/* Content of the NEW.* table in triggers */
 	int regOld = 0;		/* Content of OLD.* table in triggers */
-	int regRowSet = 0;	/* Rowset of rows to be updated */
 	int regKey = 0;		/* composite PRIMARY KEY value */
 
 	memset(&sContext, 0, sizeof(sContext));
@@ -328,7 +327,6 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	sqlite3BeginWriteOperation(pParse, 1);
 
 	/* Allocate required registers. */
-	regRowSet = ++pParse->nMem;
 	regOldRowid = regNewRowid = ++pParse->nMem;
 	if (chngPk || pTrigger || hasFK) {
 		regOld = pParse->nMem + 1;
@@ -351,6 +349,8 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 #if !defined(SQLITE_OMIT_VIEW) && !defined(SQLITE_OMIT_TRIGGER)
 	if (isView) {
 		sqlite3MaterializeView(pParse, pTab, pWhere, iDataCur);
+		/* Number of columns from SELECT plus ID.*/
+		nKey = pTab->nCol + 1;
 	}
 #endif
 
@@ -361,66 +361,70 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		goto update_cleanup;
 	}
 	/* Begin the database scan
+	 * The only difference between VIEW and ordinary table is the fact that
+	 * VIEW is held in ephemeral table and doesn't have explicit PK.
+	 * In this case we have to manually load columns in order to make tuple.
 	 */
-	if (HasRowid(pTab)) {
-		sqlite3VdbeAddOp3(v, OP_Null, 0, regRowSet, regOldRowid);
-		pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
-					   WHERE_ONEPASS_DESIRED |
-					   WHERE_SEEK_TABLE, iIdxCur);
-		if (pWInfo == 0)
-			goto update_cleanup;
-		okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+	int iPk;	/* First of nPk memory cells holding PRIMARY KEY value */
+	i16 nPk;	/* Number of components of the PRIMARY KEY */
+	int addrOpen;	/* Address of the OpenEphemeral instruction */
 
-		/* Remember the rowid of every item to be updated.
-		 */
-		sqlite3VdbeAddOp2(v, OP_Rowid, iDataCur, regOldRowid);
-		if (!okOnePass) {
-			sqlite3VdbeAddOp2(v, OP_RowSetAdd, regRowSet,
-					  regOldRowid);
-		}
-
-		/* End the database scan loop.
-		 */
-		sqlite3WhereEnd(pWInfo);
+	if (isView) {
+		nPk = nKey;
 	} else {
-		int iPk;	/* First of nPk memory cells holding PRIMARY KEY value */
-		i16 nPk;	/* Number of components of the PRIMARY KEY */
-		int addrOpen;	/* Address of the OpenEphemeral instruction */
-
 		assert(pPk != 0);
 		nPk = pPk->nKeyCol;
-		iPk = pParse->nMem + 1;
-		pParse->nMem += nPk;
-		regKey = ++pParse->nMem;
-		iEph = pParse->nTab++;
-		sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
+	}
+	iPk = pParse->nMem + 1;
+	pParse->nMem += nPk;
+	regKey = ++pParse->nMem;
+	iEph = pParse->nTab++;
+	sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
+
+	if (isView) {
+		KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nKey, 0);
+		addrOpen = sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEph,
+					     nKey, 0, (char*)pKeyInfo, P4_KEYINFO);
+	} else {
 		addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
 		sqlite3VdbeSetP4KeyInfo(pParse, pPk);
-		pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
-					   WHERE_ONEPASS_DESIRED, iIdxCur);
-		if (pWInfo == 0)
-			goto update_cleanup;
-		okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+	}
+
+	pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
+				   WHERE_ONEPASS_DESIRED, iIdxCur);
+	if (pWInfo == 0)
+		goto update_cleanup;
+	okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+	if (isView){
+		for (i = 0; i < nPk; i++) {
+			sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
+		}
+	} else {
 		for (i = 0; i < nPk; i++) {
 			assert(pPk->aiColumn[i] >= 0);
 			sqlite3ExprCodeGetColumnOfTable(v, pTab, iDataCur,
 							pPk->aiColumn[i],
 							iPk + i);
 		}
-		if (okOnePass) {
-			sqlite3VdbeChangeToNoop(v, addrOpen);
-			nKey = nPk;
-			regKey = iPk;
-		} else {
-			sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
-					  sqlite3IndexAffinityStr(db, pPk),
-					  nPk);
-			sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEph, regKey, iPk,
-					     nPk);
-		}
-		sqlite3WhereEnd(pWInfo);
 	}
 
+	if (okOnePass) {
+		sqlite3VdbeChangeToNoop(v, addrOpen);
+		nKey = nPk;
+		regKey = iPk;
+	} else {
+		const char *zAff = isView ? 0 :
+				   sqlite3IndexAffinityStr(pParse->db, pPk);
+		sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
+					  zAff, nPk);
+		sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEph, regKey, iPk,
+				     nPk);
+	}
+	/* End the database scan loop.
+	 */
+	sqlite3WhereEnd(pWInfo);
+
+
 	/* Initialize the count of updated rows
 	 */
 	if ((user_session->sql_flags & SQLITE_CountRows)
@@ -460,7 +464,7 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	/* Top of the update loop */
 	if (okOnePass) {
 		labelContinue = labelBreak;
-		sqlite3VdbeAddOp2(v, OP_IsNull, pPk ? regKey : regOldRowid,
+		sqlite3VdbeAddOp2(v, OP_IsNull, regKey,
 				  labelBreak);
 		if (aToOpen[iDataCur - iBaseCur] && !isView) {
 			assert(pPk);
@@ -470,7 +474,7 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		}
 		VdbeCoverageIf(v, pPk == 0);
 		VdbeCoverageIf(v, pPk != 0);
-	} else if (pPk) {
+	} else {
 		labelContinue = sqlite3VdbeMakeLabel(v);
 		sqlite3VdbeAddOp2(v, OP_Rewind, iEph, labelBreak);
 		VdbeCoverage(v);
@@ -478,14 +482,6 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelContinue,
 				     regKey, 0);
 		VdbeCoverage(v);
-	} else {
-		labelContinue =
-		    sqlite3VdbeAddOp3(v, OP_RowSetRead, regRowSet, labelBreak,
-				      regOldRowid);
-		VdbeCoverage(v);
-		sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, labelContinue,
-				  regOldRowid);
-		VdbeCoverage(v);
 	}
 
 	/* If the record number will change, set register regNewRowid to
@@ -583,13 +579,13 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		 * is deleted or renamed by a BEFORE trigger - is left undefined in the
 		 * documentation.
 		 */
-		if (pPk) {
+		if (!isView) {
 			sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
 					     labelContinue, regKey, nKey);
 			VdbeCoverage(v);
 		} else {
-			sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur,
-					  labelContinue, regOldRowid);
+			sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
+					     labelContinue, regKey - nKey, nKey);
 			VdbeCoverage(v);
 		}
 
@@ -627,16 +623,10 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 
 		/* Delete the index entries associated with the current record.  */
 		if (bReplace || chngKey) {
-			if (pPk) {
-				addr1 =
-				    sqlite3VdbeAddOp4Int(v, OP_NotFound,
-							 iDataCur, 0, regKey,
-							 nKey);
-			} else {
-				addr1 =
-				    sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur,
-						      0, regOldRowid);
-			}
+			addr1 =
+				sqlite3VdbeAddOp4Int(v, OP_NotFound,
+						     iDataCur, 0, regKey,
+						     nKey);
 			VdbeCoverageNeverTaken(v);
 		}
 		sqlite3GenerateRowIndexDelete(pParse, pTab, iDataCur, iIdxCur);
@@ -703,12 +693,10 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	 */
 	if (okOnePass) {
 		/* Nothing to do at end-of-loop for a single-pass */
-	} else if (pPk) {
+	} else {
 		sqlite3VdbeResolveLabel(v, labelContinue);
 		sqlite3VdbeAddOp2(v, OP_Next, iEph, addrTop);
 		VdbeCoverage(v);
-	} else {
-		sqlite3VdbeGoto(v, labelContinue);
 	}
 	sqlite3VdbeResolveLabel(v, labelBreak);
 
-- 
2.15.1




More information about the Tarantool-patches mailing list