[patches] [sql 8/9] sql: fixed code generation for VIEWs
Nikita Pettik
korablev at tarantool.org
Tue Jan 23 23:19:19 MSK 2018
Since VIEW materializes into ephemeral table with ROWID in original SQLite,
it was requited to adapt code generation for UPDATE and DELETE
statements on VIEWs.
Part of #2680
Signed-off-by: Nikita Pettik <korablev at tarantool.org>
---
src/box/sql/delete.c | 92 ++++++++++++++---------------------
src/box/sql/update.c | 134 +++++++++++++++++++++++----------------------------
2 files changed, 96 insertions(+), 130 deletions(-)
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 172509b10..2f9c33b7b 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -250,13 +250,12 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
int eOnePass; /* ONEPASS_OFF or _SINGLE or _MULTI */
int aiCurOnePass[2]; /* The write cursors opened by WHERE_ONEPASS */
u8 *aToOpen = 0; /* Open cursor iTabCur+j if aToOpen[j] is true */
- Index *pPk; /* The PRIMARY KEY index on the table */
+ Index *pPk = 0; /* The PRIMARY KEY index on the table */
int iPk = 0; /* First of nPk registers holding PRIMARY KEY value */
i16 nPk; /* Number of columns in the PRIMARY KEY */
int iKey; /* Memory cell holding key of row to be deleted */
i16 nKey; /* Number of memory cells in the row key */
int iEphCur = 0; /* Ephemeral table holding all primary key values */
- int iRowSet = 0; /* Register for rowset of rows to delete */
int addrBypass = 0; /* Address of jump over the delete logic */
int addrLoop = 0; /* Top of the delete loop */
int addrEphOpen = 0; /* Instruction to open the Ephemeral table */
@@ -382,10 +381,7 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
#endif
) {
assert(!isView);
- if (HasRowid(pTab)) {
- sqlite3VdbeAddOp4(v, OP_Clear, pTab->tnum, 0, memCnt,
- pTab->zName, P4_STATIC);
- }
+
for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
assert(pIdx->pSchema == pTab->pSchema);
sqlite3VdbeAddOp2(v, OP_Clear, pIdx->tnum, 0);
@@ -404,16 +400,20 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
if (sNC.ncFlags & NC_VarSelect)
bComplex = 1;
wcf |= (bComplex ? 0 : WHERE_ONEPASS_MULTIROW);
- if (HasRowid(pTab)) {
- /* For a rowid table, initialize the RowSet to an empty set */
- pPk = 0;
- nPk = 1;
- iRowSet = ++pParse->nMem;
- sqlite3VdbeAddOp2(v, OP_Null, 0, iRowSet);
+ /* Create an ephemeral table used to hold all primary keys for
+ * rows to be deleted. Since VIEW is held in ephemeral table,
+ * there is no PK for it, so columns should be loaded manually.
+ */
+ if (isView) {
+ nPk = pTab->nCol;
+ iPk = pParse->nMem + 1;
+ pParse->nMem += nPk;
+ iEphCur = pParse->nTab++;
+ KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nPk, 0);
+ addrEphOpen =
+ sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEphCur,
+ nPk, 0, (char*) pKeyInfo, P4_KEYINFO);
} else {
- /* For a WITHOUT ROWID table, create an ephemeral table used to
- * hold all primary keys for rows to be deleted.
- */
pPk = sqlite3PrimaryKeyIndex(pTab);
assert(pPk != 0);
nPk = pPk->nKeyCol;
@@ -449,8 +449,8 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
sqlite3VdbeAddOp2(v, OP_AddImm, memCnt, 1);
}
- /* Extract the rowid or primary key for the current row */
- if (pPk) {
+ /* Extract the primary key for the current row */
+ if (!isView) {
for (i = 0; i < nPk; i++) {
assert(pPk->aiColumn[i] >= 0);
sqlite3ExprCodeGetColumnOfTable(v, pTab,
@@ -459,15 +459,13 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
aiColumn[i],
iPk + i);
}
- iKey = iPk;
} else {
- iKey = pParse->nMem + 1;
- iKey =
- sqlite3ExprCodeGetColumn(pParse, pTab, -1, iTabCur,
- iKey, 0);
- if (iKey > pParse->nMem)
- pParse->nMem = iKey;
+ for (i = 0; i < nPk; i++) {
+ sqlite3VdbeAddOp3(v, OP_Column, iDataCur,
+ i, iPk + i);
+ }
}
+ iKey = iPk;
if (eOnePass != ONEPASS_OFF) {
/* For ONEPASS, no need to store the rowid/primary-key. There is only
@@ -489,22 +487,13 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
if (addrEphOpen)
sqlite3VdbeChangeToNoop(v, addrEphOpen);
} else {
- if (pPk) {
- /* Add the PK key for this row to the temporary table */
- iKey = ++pParse->nMem;
- nKey = 0; /* Zero tells OP_Found to use a composite key */
- sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk,
- iKey,
- sqlite3IndexAffinityStr
- (pParse->db, pPk), nPk);
- sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEphCur,
- iKey, iPk, nPk);
- } else {
- /* Add the rowid of the row to be deleted to the RowSet */
- nKey = 1; /* OP_Seek always uses a single rowid */
- sqlite3VdbeAddOp2(v, OP_RowSetAdd, iRowSet,
- iKey);
- }
+ /* Add the PK key for this row to the temporary table */
+ iKey = ++pParse->nMem;
+ nKey = 0; /* Zero tells OP_Found to use a composite key */
+ const char *zAff = isView ? 0 :
+ sqlite3IndexAffinityStr(pParse->db, pPk);
+ sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, iKey, zAff, nPk);
+ sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEphCur, iKey, iPk, nPk);
}
/* If this DELETE cannot use the ONEPASS strategy, this is the
@@ -537,7 +526,7 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
sqlite3VdbeJumpHere(v, iAddrOnce);
}
- /* Set up a loop over the rowids/primary-keys that were found in the
+ /* Set up a loop over the primary-keys that were found in the
* where-clause loop above.
*/
if (eOnePass != ONEPASS_OFF) {
@@ -546,19 +535,13 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
assert(pPk != 0 || pTab->pSelect != 0);
sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
addrBypass, iKey, nKey);
+
VdbeCoverage(v);
}
- } else if (pPk) {
+ } else {
addrLoop = sqlite3VdbeAddOp1(v, OP_Rewind, iEphCur);
VdbeCoverage(v);
sqlite3VdbeAddOp2(v, OP_RowData, iEphCur, iKey);
- assert(nKey == 0); /* OP_Found will use a composite key */
- } else {
- addrLoop =
- sqlite3VdbeAddOp3(v, OP_RowSetRead, iRowSet, 0,
- iKey);
- VdbeCoverage(v);
- assert(nKey == 1);
}
/* Delete the row */
@@ -581,13 +564,10 @@ sqlite3DeleteFrom(Parse * pParse, /* The parser context */
if (eOnePass != ONEPASS_OFF) {
sqlite3VdbeResolveLabel(v, addrBypass);
sqlite3WhereEnd(pWInfo);
- } else if (pPk) {
+ } else {
sqlite3VdbeAddOp2(v, OP_Next, iEphCur, addrLoop + 1);
VdbeCoverage(v);
sqlite3VdbeJumpHere(v, addrLoop);
- } else {
- sqlite3VdbeGoto(v, addrLoop);
- sqlite3VdbeJumpHere(v, addrLoop);
}
} /* End non-truncate path */
@@ -740,7 +720,6 @@ sqlite3GenerateRowDelete(Parse * pParse, /* Parsing context */
Vdbe *v = pParse->pVdbe; /* Vdbe */
int iOld = 0; /* First register in OLD.* array */
int iLabel; /* Label resolved to end of generated code */
- u8 opSeek; /* Seek opcode */
/* Vdbe is guaranteed to have been allocated by this stage. */
assert(v);
@@ -753,9 +732,8 @@ sqlite3GenerateRowDelete(Parse * pParse, /* Parsing context */
* not attempt to delete it or fire any DELETE triggers.
*/
iLabel = sqlite3VdbeMakeLabel(v);
- opSeek = HasRowid(pTab) ? OP_NotExists : OP_NotFound;
if (eMode == ONEPASS_OFF) {
- sqlite3VdbeAddOp4Int(v, opSeek, iDataCur, iLabel, iPk, nPk);
+ sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, iLabel, iPk, nPk);
VdbeCoverageIf(v, opSeek == OP_NotExists);
VdbeCoverageIf(v, opSeek == OP_NotFound);
}
@@ -807,7 +785,7 @@ sqlite3GenerateRowDelete(Parse * pParse, /* Parsing context */
* pointing to.
*/
if (addrStart < sqlite3VdbeCurrentAddr(v)) {
- sqlite3VdbeAddOp4Int(v, opSeek, iDataCur, iLabel, iPk,
+ sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, iLabel, iPk,
nPk);
VdbeCoverageIf(v, opSeek == OP_NotExists);
VdbeCoverageIf(v, opSeek == OP_NotFound);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 067f2f18f..decf398aa 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -148,7 +148,6 @@ sqlite3Update(Parse * pParse, /* The parser context */
int regNewRowid = 0; /* The new rowid */
int regNew = 0; /* Content of the NEW.* table in triggers */
int regOld = 0; /* Content of OLD.* table in triggers */
- int regRowSet = 0; /* Rowset of rows to be updated */
int regKey = 0; /* composite PRIMARY KEY value */
memset(&sContext, 0, sizeof(sContext));
@@ -328,7 +327,6 @@ sqlite3Update(Parse * pParse, /* The parser context */
sqlite3BeginWriteOperation(pParse, 1);
/* Allocate required registers. */
- regRowSet = ++pParse->nMem;
regOldRowid = regNewRowid = ++pParse->nMem;
if (chngPk || pTrigger || hasFK) {
regOld = pParse->nMem + 1;
@@ -351,6 +349,8 @@ sqlite3Update(Parse * pParse, /* The parser context */
#if !defined(SQLITE_OMIT_VIEW) && !defined(SQLITE_OMIT_TRIGGER)
if (isView) {
sqlite3MaterializeView(pParse, pTab, pWhere, iDataCur);
+ /* Number of columns from SELECT plus ID.*/
+ nKey = pTab->nCol + 1;
}
#endif
@@ -361,66 +361,70 @@ sqlite3Update(Parse * pParse, /* The parser context */
goto update_cleanup;
}
/* Begin the database scan
+ * The only difference between VIEW and ordinary table is the fact that
+ * VIEW is held in ephemeral table and doesn't have explicit PK.
+ * In this case we have to manually load columns in order to make tuple.
*/
- if (HasRowid(pTab)) {
- sqlite3VdbeAddOp3(v, OP_Null, 0, regRowSet, regOldRowid);
- pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
- WHERE_ONEPASS_DESIRED |
- WHERE_SEEK_TABLE, iIdxCur);
- if (pWInfo == 0)
- goto update_cleanup;
- okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+ int iPk; /* First of nPk memory cells holding PRIMARY KEY value */
+ i16 nPk; /* Number of components of the PRIMARY KEY */
+ int addrOpen; /* Address of the OpenEphemeral instruction */
- /* Remember the rowid of every item to be updated.
- */
- sqlite3VdbeAddOp2(v, OP_Rowid, iDataCur, regOldRowid);
- if (!okOnePass) {
- sqlite3VdbeAddOp2(v, OP_RowSetAdd, regRowSet,
- regOldRowid);
- }
-
- /* End the database scan loop.
- */
- sqlite3WhereEnd(pWInfo);
+ if (isView) {
+ nPk = nKey;
} else {
- int iPk; /* First of nPk memory cells holding PRIMARY KEY value */
- i16 nPk; /* Number of components of the PRIMARY KEY */
- int addrOpen; /* Address of the OpenEphemeral instruction */
-
assert(pPk != 0);
nPk = pPk->nKeyCol;
- iPk = pParse->nMem + 1;
- pParse->nMem += nPk;
- regKey = ++pParse->nMem;
- iEph = pParse->nTab++;
- sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
+ }
+ iPk = pParse->nMem + 1;
+ pParse->nMem += nPk;
+ regKey = ++pParse->nMem;
+ iEph = pParse->nTab++;
+ sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
+
+ if (isView) {
+ KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nKey, 0);
+ addrOpen = sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEph,
+ nKey, 0, (char*)pKeyInfo, P4_KEYINFO);
+ } else {
addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
sqlite3VdbeSetP4KeyInfo(pParse, pPk);
- pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
- WHERE_ONEPASS_DESIRED, iIdxCur);
- if (pWInfo == 0)
- goto update_cleanup;
- okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+ }
+
+ pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
+ WHERE_ONEPASS_DESIRED, iIdxCur);
+ if (pWInfo == 0)
+ goto update_cleanup;
+ okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
+ if (isView){
+ for (i = 0; i < nPk; i++) {
+ sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
+ }
+ } else {
for (i = 0; i < nPk; i++) {
assert(pPk->aiColumn[i] >= 0);
sqlite3ExprCodeGetColumnOfTable(v, pTab, iDataCur,
pPk->aiColumn[i],
iPk + i);
}
- if (okOnePass) {
- sqlite3VdbeChangeToNoop(v, addrOpen);
- nKey = nPk;
- regKey = iPk;
- } else {
- sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
- sqlite3IndexAffinityStr(db, pPk),
- nPk);
- sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEph, regKey, iPk,
- nPk);
- }
- sqlite3WhereEnd(pWInfo);
}
+ if (okOnePass) {
+ sqlite3VdbeChangeToNoop(v, addrOpen);
+ nKey = nPk;
+ regKey = iPk;
+ } else {
+ const char *zAff = isView ? 0 :
+ sqlite3IndexAffinityStr(pParse->db, pPk);
+ sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
+ zAff, nPk);
+ sqlite3VdbeAddOp4Int(v, OP_IdxInsert, iEph, regKey, iPk,
+ nPk);
+ }
+ /* End the database scan loop.
+ */
+ sqlite3WhereEnd(pWInfo);
+
+
/* Initialize the count of updated rows
*/
if ((user_session->sql_flags & SQLITE_CountRows)
@@ -460,7 +464,7 @@ sqlite3Update(Parse * pParse, /* The parser context */
/* Top of the update loop */
if (okOnePass) {
labelContinue = labelBreak;
- sqlite3VdbeAddOp2(v, OP_IsNull, pPk ? regKey : regOldRowid,
+ sqlite3VdbeAddOp2(v, OP_IsNull, regKey,
labelBreak);
if (aToOpen[iDataCur - iBaseCur] && !isView) {
assert(pPk);
@@ -470,7 +474,7 @@ sqlite3Update(Parse * pParse, /* The parser context */
}
VdbeCoverageIf(v, pPk == 0);
VdbeCoverageIf(v, pPk != 0);
- } else if (pPk) {
+ } else {
labelContinue = sqlite3VdbeMakeLabel(v);
sqlite3VdbeAddOp2(v, OP_Rewind, iEph, labelBreak);
VdbeCoverage(v);
@@ -478,14 +482,6 @@ sqlite3Update(Parse * pParse, /* The parser context */
sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur, labelContinue,
regKey, 0);
VdbeCoverage(v);
- } else {
- labelContinue =
- sqlite3VdbeAddOp3(v, OP_RowSetRead, regRowSet, labelBreak,
- regOldRowid);
- VdbeCoverage(v);
- sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur, labelContinue,
- regOldRowid);
- VdbeCoverage(v);
}
/* If the record number will change, set register regNewRowid to
@@ -583,13 +579,13 @@ sqlite3Update(Parse * pParse, /* The parser context */
* is deleted or renamed by a BEFORE trigger - is left undefined in the
* documentation.
*/
- if (pPk) {
+ if (!isView) {
sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
labelContinue, regKey, nKey);
VdbeCoverage(v);
} else {
- sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur,
- labelContinue, regOldRowid);
+ sqlite3VdbeAddOp4Int(v, OP_NotFound, iDataCur,
+ labelContinue, regKey - nKey, nKey);
VdbeCoverage(v);
}
@@ -627,16 +623,10 @@ sqlite3Update(Parse * pParse, /* The parser context */
/* Delete the index entries associated with the current record. */
if (bReplace || chngKey) {
- if (pPk) {
- addr1 =
- sqlite3VdbeAddOp4Int(v, OP_NotFound,
- iDataCur, 0, regKey,
- nKey);
- } else {
- addr1 =
- sqlite3VdbeAddOp3(v, OP_NotExists, iDataCur,
- 0, regOldRowid);
- }
+ addr1 =
+ sqlite3VdbeAddOp4Int(v, OP_NotFound,
+ iDataCur, 0, regKey,
+ nKey);
VdbeCoverageNeverTaken(v);
}
sqlite3GenerateRowIndexDelete(pParse, pTab, iDataCur, iIdxCur);
@@ -703,12 +693,10 @@ sqlite3Update(Parse * pParse, /* The parser context */
*/
if (okOnePass) {
/* Nothing to do at end-of-loop for a single-pass */
- } else if (pPk) {
+ } else {
sqlite3VdbeResolveLabel(v, labelContinue);
sqlite3VdbeAddOp2(v, OP_Next, iEph, addrTop);
VdbeCoverage(v);
- } else {
- sqlite3VdbeGoto(v, labelContinue);
}
sqlite3VdbeResolveLabel(v, labelBreak);
--
2.15.1
More information about the Tarantool-patches
mailing list