[tarantool-patches] [PATCH 1/4] sql: pass space pointer to OP_OpenRead/OpenWrite

Nikita Pettik korablev at tarantool.org
Wed Mar 21 02:48:39 MSK 2018


Originally in SQLite, to open table (i.e. btree) it was required to pass
number of page root to OP_OpenRead or OP_OpenWrite opcodes as an
argument. However, now there are only Tarantool spaces and nothing
prevents from operating directly on pointers to them. On the other hand,
pointers are able to expire after schema changes (i.e. after DML
routine). For this reason, schema version is saved to VDBE at compile
time and checked each time during cursor opening.

Part of #3252
---
 src/box/sql/analyze.c | 17 +++++++++++++++--
 src/box/sql/build.c   |  7 ++++++-
 src/box/sql/expr.c    | 14 ++++++++++++--
 src/box/sql/fkey.c    | 11 ++++++++++-
 src/box/sql/insert.c  | 37 ++++++++++++++++++++++++++++++++-----
 src/box/sql/select.c  | 11 ++++++++++-
 src/box/sql/vdbe.c    | 12 ++++++++++--
 src/box/sql/vdbe.h    |  1 +
 src/box/sql/vdbeInt.h |  1 +
 src/box/sql/vdbeaux.c | 13 +++++++++++++
 src/box/sql/where.c   | 12 +++++++++++-
 11 files changed, 121 insertions(+), 15 deletions(-)

diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index db06d0182..57fc33c70 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -174,10 +174,16 @@ openStatTable(Parse * pParse,	/* Parsing context */
 
 	/* Open the sql_stat[134] tables for writing. */
 	for (i = 0; aTable[i]; i++) {
+		struct space *space =
+			space_by_id(SQLITE_PAGENO_TO_SPACEID(aRoot[i]));
+		assert(space != NULL);
+		int space_ptr_reg = ++pParse->nMem;
+		sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+				       ((int64_t) space));
 		int addr;
 		addr =
 		    sqlite3VdbeAddOp3(v, OP_OpenWrite, iStatCur + i, aRoot[i],
-				      0);
+				      space_ptr_reg);
 		v->aOp[addr].p4.pKeyInfo = 0;
 		v->aOp[addr].p4type = P4_KEYINFO;
 		sqlite3VdbeChangeP5(v, aCreateTbl[i]);
@@ -814,6 +820,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 	int iTabCur;		/* Table cursor */
 	Vdbe *v;		/* The virtual machine being built up */
 	int i;			/* Loop counter */
+	int space_ptr_reg = iMem++;
 	int regStat4 = iMem++;	/* Register to hold Stat4Accum object */
 	int regChng = iMem++;	/* Index of changed index field */
 	int regKey = iMem++;	/* Key argument passed to stat_push() */
@@ -910,7 +917,13 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 
 		/* Open a read-only cursor on the index being analyzed. */
 		assert(sqlite3SchemaToIndex(db, pIdx->pSchema) == 0);
-		sqlite3VdbeAddOp2(v, OP_OpenRead, iIdxCur, pIdx->tnum);
+		struct space *space =
+			space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
+		assert(space != NULL);
+		sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+				       ((int64_t) space));
+		sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
+				  space_ptr_reg);
 		sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
 		VdbeComment((v, "%s", pIdx->zName));
 
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 9ad0c0605..229c8b4d5 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -2603,7 +2603,12 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 	sqlite3VdbeJumpHere(v, addr1);
 	if (memRootPage < 0)
 		sqlite3VdbeAddOp2(v, OP_Clear, tnum, 0);
-	sqlite3VdbeAddOp4(v, OP_OpenWrite, iIdx, tnum, 0,
+	struct space *space = space_by_id(SQLITE_PAGENO_TO_SPACEID(tnum));
+	assert(space != NULL);
+	int space_ptr_reg = ++pParse->nMem;
+	sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+			       ((int64_t) space));
+	sqlite3VdbeAddOp4(v, OP_OpenWrite, iIdx, tnum, space_ptr_reg,
 			  (char *)pKey, P4_KEYINFO);
 	sqlite3VdbeChangeP5(v,
 			    OPFLAG_BULKCSR | ((memRootPage >= 0) ?
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index b69a176cb..2a553925c 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -35,7 +35,9 @@
  */
 #include <box/coll.h>
 #include "sqliteInt.h"
+#include "tarantoolInt.h"
 #include "box/session.h"
+#include "box/schema.h"
 
 /* Forward declarations */
 static void exprCodeBetween(Parse *, Expr *, int,
@@ -2586,8 +2588,16 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
 							  pIdx->zName),
 							  P4_DYNAMIC);
 #endif
-					sqlite3VdbeAddOp2(v, OP_OpenRead, iTab,
-							  pIdx->tnum);
+					struct space *space =
+						space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
+					assert(space != NULL);
+					int space_ptr_reg = ++pParse->nMem;
+					sqlite3VdbeAddOp4Int64(v, OP_Int64, 0,
+							       space_ptr_reg, 0,
+							       ((int64_t) space));
+					sqlite3VdbeAddOp3(v, OP_OpenRead, iTab,
+							  pIdx->tnum,
+							  space_ptr_reg);
 					sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
 					VdbeComment((v, "%s", pIdx->zName));
 					assert(IN_INDEX_INDEX_DESC ==
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 439f38352..f4d73b289 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -35,7 +35,9 @@
  */
 #include <box/coll.h>
 #include "sqliteInt.h"
+#include "tarantoolInt.h"
 #include "box/session.h"
+#include "box/schema.h"
 
 #ifndef SQLITE_OMIT_FOREIGN_KEY
 #ifndef SQLITE_OMIT_TRIGGER
@@ -434,7 +436,14 @@ fkLookupParent(Parse * pParse,	/* Parse context */
 			int regTemp = sqlite3GetTempRange(pParse, nCol);
 			int regRec = sqlite3GetTempReg(pParse);
 
-			sqlite3VdbeAddOp2(v, OP_OpenRead, iCur, pIdx->tnum);
+			struct space *space =
+				space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
+			assert(space != NULL);
+			int space_ptr_reg = ++pParse->nMem;
+			sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+					       ((int64_t) space));
+			sqlite3VdbeAddOp3(v, OP_OpenRead, iCur, pIdx->tnum,
+					  space_ptr_reg);
 			sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
 			for (i = 0; i < nCol; i++) {
 				sqlite3VdbeAddOp2(v, OP_Copy,
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 54fcca5c9..c57466868 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -34,7 +34,9 @@
  * to handle INSERT statements in SQLite.
  */
 #include "sqliteInt.h"
+#include "tarantoolInt.h"
 #include "box/session.h"
+#include "box/schema.h"
 
 /*
  * Generate code that will open pTab as cursor iCur.
@@ -51,7 +53,12 @@ sqlite3OpenTable(Parse * pParse,	/* Generate code into this VDBE */
 	Index *pPk = sqlite3PrimaryKeyIndex(pTab);
 	assert(pPk != 0);
 	assert(pPk->tnum == pTab->tnum);
-	sqlite3VdbeAddOp3(v, opcode, iCur, pPk->tnum, 0);
+	struct space *space = space_by_id(SQLITE_PAGENO_TO_SPACEID(pPk->tnum));
+	assert(space != NULL);
+	int space_ptr_reg = ++pParse->nMem;
+	sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+			       ((int64_t) space));
+	sqlite3VdbeAddOp3(v, opcode, iCur, pPk->tnum, space_ptr_reg);
 	sqlite3VdbeSetP4KeyInfo(pParse, pPk);
 	VdbeComment((v, "%s", pTab->zName));
 }
@@ -183,7 +190,7 @@ readsTable(Parse * p, Table * pTab)
 	for (i = 1; i < iEnd; i++) {
 		VdbeOp *pOp = sqlite3VdbeGetOp(v, i);
 		assert(pOp != 0);
-		if (pOp->opcode == OP_OpenRead && pOp->p3 == 0) {
+		if (pOp->opcode == OP_OpenRead) {
 			Index *pIndex;
 			int tnum = pOp->p2;
 			if (tnum == pTab->tnum) {
@@ -1560,6 +1567,11 @@ sqlite3OpenTableAndIndices(Parse * pParse,	/* Parsing context */
 		*piDataCur = iDataCur;
 	if (piIdxCur)
 		*piIdxCur = iBase;
+	struct space *space = space_by_id(SQLITE_PAGENO_TO_SPACEID(pTab->tnum));
+	assert(space != NULL);
+	int space_ptr_reg = ++pParse->nMem;
+	sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_ptr_reg, 0,
+			       ((int64_t) space));
 
 	/* One iteration of this cycle adds OpenRead/OpenWrite which
 	 * opens cursor for current index.
@@ -1607,7 +1619,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,	/* Parsing context */
 				p5 = 0;
 			}
 			if (aToOpen == 0 || aToOpen[i + 1]) {
-				sqlite3VdbeAddOp2(v, op, iIdxCur, pIdx->tnum);
+				sqlite3VdbeAddOp3(v, op, iIdxCur, pIdx->tnum,
+						  space_ptr_reg);
 				sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
 				sqlite3VdbeChangeP5(v, p5);
 				VdbeComment((v, "%s", pIdx->zName));
@@ -1911,10 +1924,24 @@ xferOptimization(Parse * pParse,	/* Parser context */
 				break;
 		}
 		assert(pSrcIdx);
-		sqlite3VdbeAddOp2(v, OP_OpenRead, iSrc, pSrcIdx->tnum);
+		struct space *space_src =
+			space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
+		assert(space_src != NULL);
+		int space_src_ptr_reg = ++pParse->nMem;
+		sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_src_ptr_reg, 0,
+				       ((int64_t) space_src));
+		sqlite3VdbeAddOp3(v, OP_OpenRead, iSrc, pSrcIdx->tnum,
+				  space_src_ptr_reg);
 		sqlite3VdbeSetP4KeyInfo(pParse, pSrcIdx);
 		VdbeComment((v, "%s", pSrcIdx->zName));
-		sqlite3VdbeAddOp2(v, OP_OpenWrite, iDest, pDestIdx->tnum);
+		struct space *space_dest =
+			space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
+		assert(space_dest != NULL);
+		int space_dest_ptr_reg = ++pParse->nMem;
+		sqlite3VdbeAddOp4Int64(v, OP_Int64, 0, space_dest_ptr_reg, 0,
+				       ((int64_t) space_dest));
+		sqlite3VdbeAddOp3(v, OP_OpenWrite, iDest, pDestIdx->tnum,
+				  space_dest_ptr_reg);
 		sqlite3VdbeSetP4KeyInfo(pParse, pDestIdx);
 		sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR);
 		VdbeComment((v, "%s", pDestIdx->zName));
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 2a8c83d06..7da3f0e05 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -35,7 +35,9 @@
  */
 #include <box/coll.h>
 #include "sqliteInt.h"
+#include "tarantoolInt.h"
 #include "box/session.h"
+#include "box/schema.h"
 
 /*
  * Trace output macros
@@ -6187,8 +6189,15 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 				}
 
 				/* Open a read-only cursor, execute the OP_Count, close the cursor. */
+				struct space *space =
+					space_by_id(SQLITE_PAGENO_TO_SPACEID(iRoot));
+				assert(space != NULL);
+				int space_ptr_reg = ++pParse->nMem;
+				sqlite3VdbeAddOp4Int64(v, OP_Int64, 0,
+						       space_ptr_reg, 0,
+						       ((int64_t) space));
 				sqlite3VdbeAddOp4Int(v, OP_OpenRead, iCsr,
-						     iRoot, 0, 1);
+						     iRoot, space_ptr_reg, 1);
 				if (pKeyInfo) {
 					sqlite3VdbeChangeP4(v, -1,
 							    (char *)pKeyInfo,
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 9929dfb96..5d1227afa 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3217,9 +3217,17 @@ case OP_OpenWrite:
 
 	assert(p2 >= 1);
 	pBtCur = pCur->uc.pCursor;
-	pBtCur->space = space_by_id(SQLITE_PAGENO_TO_SPACEID(p2));
+	if (box_schema_version() == p->schema_ver) {
+		pIn3 = &aMem[pOp->p3];
+		/* Make sure that 64-bit pointer can fit into int64_t. */
+		assert(sizeof(pBtCur->space) >= sizeof(pIn3->u.i));
+		pBtCur->space = ((struct space *) pIn3->u.i);
+	} else {
+		pBtCur->space = space_by_id(SQLITE_PAGENO_TO_SPACEID(p2));
+	}
+	assert(pBtCur->space != NULL);
 	pBtCur->index = space_index(pBtCur->space, SQLITE_PAGENO_TO_INDEXID(p2));
-	assert(pBtCur->space != NULL && pBtCur->index != NULL);
+	assert(pBtCur->index != NULL);
 	pBtCur->eState = CURSOR_INVALID;
 	pBtCur->curFlags |= BTCF_TaCursor;
 	pCur->pKeyInfo = pKeyInfo;
diff --git a/src/box/sql/vdbe.h b/src/box/sql/vdbe.h
index 7241963e4..a1ecf729d 100644
--- a/src/box/sql/vdbe.h
+++ b/src/box/sql/vdbe.h
@@ -200,6 +200,7 @@ int sqlite3VdbeAddOp3(Vdbe *, int, int, int, int);
 int sqlite3VdbeAddOp4(Vdbe *, int, int, int, int, const char *zP4, int);
 int sqlite3VdbeAddOp4Dup8(Vdbe *, int, int, int, int, const u8 *, int);
 int sqlite3VdbeAddOp4Int(Vdbe *, int, int, int, int, int);
+int sqlite3VdbeAddOp4Int64(Vdbe *, int, int, int, int, int64_t);
 void sqlite3VdbeEndCoroutine(Vdbe *, int);
 #if defined(SQLITE_DEBUG) && !defined(SQLITE_TEST_REALLOC_STRESS)
 void sqlite3VdbeVerifyNoMallocRequired(Vdbe * p, int N);
diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h
index 8b622de5b..99262ab7b 100644
--- a/src/box/sql/vdbeInt.h
+++ b/src/box/sql/vdbeInt.h
@@ -378,6 +378,7 @@ struct Vdbe {
 	i64 nFkConstraint;	/* Number of imm. FK constraints this VM */
 	i64 nStmtDefCons;	/* Number of def. constraints when stmt started */
 	i64 nStmtDefImmCons;	/* Number of def. imm constraints when stmt started */
+	uint32_t schema_ver;	/* Schema version at the moment of VDBE creation. */
 
 	/*
 	 * In recursive triggers we can execute INSERT/UPDATE OR IGNORE
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index 92bf9943b..b35d0712e 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -66,6 +66,7 @@ sqlite3VdbeCreate(Parse * pParse)
 	p->magic = VDBE_MAGIC_INIT;
 	p->pParse = pParse;
 	p->autoCommit = (char)box_txn() == 0 ? 1 : 0;
+	p->schema_ver = box_schema_version();
 	if (!p->autoCommit) {
 		p->psql_txn = in_txn()->psql_txn;
 		p->nDeferredCons = p->psql_txn->nDeferredConsSave;
@@ -413,6 +414,18 @@ sqlite3VdbeAddOp4Int(Vdbe * p,	/* Add the opcode to this VM */
 	return addr;
 }
 
+int
+sqlite3VdbeAddOp4Int64(Vdbe *p, int op, int p1, int p2, int p3, int64_t p4)
+{
+	int addr = sqlite3VdbeAddOp3(p, op, p1, p2, p3);
+	VdbeOp *pOp = &p->aOp[addr];
+	pOp->p4type = P4_INT64;
+	pOp->p4.pI64 = sqlite3DbMallocRawNN(sqlite3VdbeDb(p), sizeof(int64_t));
+	if (p->db->mallocFailed == 0)
+		*pOp->p4.pI64 = p4;
+	return addr;
+}
+
 /* Insert the end of a co-routine
  */
 void
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 2f1c627e5..9d0397eef 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -42,6 +42,8 @@
 #include "vdbeInt.h"
 #include "whereInt.h"
 #include "box/session.h"
+#include "box/schema.h"
+#include "tarantoolInt.h"
 
 /* Forward declaration of methods */
 static int whereLoopResize(sqlite3 *, WhereLoop *, int);
@@ -4606,7 +4608,15 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 			assert(pIx->pSchema == pTab->pSchema);
 			assert(iIndexCur >= 0);
 			if (op) {
-				sqlite3VdbeAddOp2(v, op, iIndexCur, pIx->tnum);
+				struct space *space =
+					space_by_id(SQLITE_PAGENO_TO_SPACEID(pIx->tnum));
+				assert(space != NULL);
+				int space_ptr_reg = ++pParse->nMem;
+				sqlite3VdbeAddOp4Int64(v, OP_Int64, 0,
+						       space_ptr_reg, 0,
+						       ((int64_t) space));
+				sqlite3VdbeAddOp3(v, op, iIndexCur, pIx->tnum,
+						  space_ptr_reg);
 				sqlite3VdbeSetP4KeyInfo(pParse, pIx);
 				if ((pLoop->wsFlags & WHERE_CONSTRAINT) != 0
 				    && (pLoop->
-- 
2.15.1





More information about the Tarantool-patches mailing list