[tarantool-patches] [PATCH 2/4] sql: introduce opcodes to operate on system spaces

Nikita Pettik korablev at tarantool.org
Wed Mar 21 02:48:40 MSK 2018


Since it is impossible to use space pointers during execution of DDL
(after any DDL schema may change and pointers fetched at compile time
can become expired), special opcodes to operate on system spaces have
been introduced: OP_SInsert and OP_SDelete. They take space id as
an argument and make space lookup each time before insertion or deletion.
However, sometimes it is still required to iterate through space (e.g.
to satisfy WHERE clause) during DDL. As far as now cursors rely on
pointers to space, it is required to convert space id to space pointer
during VDBE execution. Hence, another opcode is added: OP_SIDtoPtr.
Finally, existing opcodes, which are invoked only during DDL, have been
also refactored.

Part of #3252
---
 src/box/sql.c              |  51 ++++++++++++-------
 src/box/sql/opcodes.c      |  45 +++++++++--------
 src/box/sql/opcodes.h      |  53 ++++++++++----------
 src/box/sql/tarantoolInt.h |  11 ++---
 src/box/sql/vdbe.c         | 121 ++++++++++++++++++++++++++++++++++-----------
 5 files changed, 182 insertions(+), 99 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index e943131ae..db4c7e7ea 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -578,13 +578,30 @@ int tarantoolSqlite3Delete(BtCursor *pCur, u8 flags)
 	if (key == NULL)
 		return SQL_TARANTOOL_DELETE_FAIL;
 
+	rc = sql_delete_by_key(pCur->space, key, key_size);
+
+	return rc == 0 ? SQLITE_OK : SQL_TARANTOOL_DELETE_FAIL;
+}
+
+/**
+ * Delete entry from space by its key.
+ *
+ * @param space Space which contains record to be deleted.
+ * @param key Key of record to be deleted.
+ * @param key_size Size of key.
+ *
+ * @retval SQLITE_OK on success, SQL_TARANTOOL_DELETE_FAIL otherwise.
+ */
+int
+sql_delete_by_key(struct space *space, char *key, uint32_t key_size)
+{
 	struct request request;
 	memset(&request, 0, sizeof(request));
 	request.type = IPROTO_DELETE;
 	request.key = key;
 	request.key_end = key + key_size;
-	request.space_id = pCur->space->def->id;
-	rc = sql_execute_dml(&request, pCur->space);
+	request.space_id = space->def->id;
+	int rc = sql_execute_dml(&request, space);
 
 	return rc == 0 ? SQLITE_OK : SQL_TARANTOOL_DELETE_FAIL;
 }
@@ -1053,9 +1070,8 @@ out:
  * Increment max_id and store updated tuple in the cursor
  * object.
  */
-int tarantoolSqlite3IncrementMaxid(BtCursor *pCur)
+int tarantoolSqlite3IncrementMaxid(uint64_t *space_max_id)
 {
-	assert(pCur->curFlags & BTCF_TaCursor);
 	/* ["max_id"] */
 	static const char key[] = {
 		(char)0x91, /* MsgPack array(1) */
@@ -1075,7 +1091,10 @@ int tarantoolSqlite3IncrementMaxid(BtCursor *pCur)
 	struct tuple *res;
 	int rc;
 
-	struct iterator *it = index_create_iterator(pCur->index, ITER_ALL,
+	struct space *space_schema = space_by_id(BOX_SCHEMA_ID);
+	assert(space_schema != NULL);
+	struct index *pk = space_index(space_schema, 0);
+	struct iterator *it = index_create_iterator(pk, ITER_ALL,
 						    &key[1], sizeof(key) - 1);
 	if (iterator_next(it, &res) != 0 || res == NULL) {
 		iterator_delete(it);
@@ -1088,20 +1107,16 @@ int tarantoolSqlite3IncrementMaxid(BtCursor *pCur)
 	request.ops = ops;
 	request.ops_end = ops + sizeof(ops);
 	request.type = IPROTO_UPSERT;
-	request.space_id = pCur->space->def->id;
-	rc = sql_execute_dml(&request, pCur->space);
+	request.space_id = BOX_SCHEMA_ID;
+	rc = sql_execute_dml(&request, space_schema);
 	if (rc != 0) {
 		iterator_delete(it);
 		return SQL_TARANTOOL_ERROR;
 	}
-	if (pCur->last_tuple != NULL) {
-		box_tuple_unref(pCur->last_tuple);
-	}
-	box_tuple_ref(res);
-	pCur->last_tuple = res;
-	pCur->eState = CURSOR_VALID;
+	rc = tuple_field_u64(res, 1, space_max_id);
+	(*space_max_id)++;
 	iterator_delete(it);
-	return SQLITE_OK;
+	return rc  == 0 ? SQLITE_OK : SQL_TARANTOOL_ERROR;
 }
 
 /*
@@ -1687,14 +1702,12 @@ int tarantoolSqlite3EphemeralGetMaxId(BtCursor *pCur, uint32_t fieldno,
  * If index is empty - return 0 in max_id and success status
  */
 int
-tarantoolSqlGetMaxId(BtCursor *cur, uint32_t fieldno,
-		     uint64_t *max_id)
+tarantoolSqlGetMaxId(uint32_t space_id, uint64_t *max_id)
 {
 	char key[16];
 	struct tuple *tuple;
 	char *key_end = mp_encode_array(key, 0);
-	if (box_index_max(cur->space->def->id, cur->index->def->iid,
-			  key, key_end, &tuple) != 0)
+	if (box_index_max(space_id, 0 /* PK */, key, key_end, &tuple) != 0)
 		return -1;
 
 	/* Index is empty  */
@@ -1703,5 +1716,5 @@ tarantoolSqlGetMaxId(BtCursor *cur, uint32_t fieldno,
 		return 0;
 	}
 
-	return tuple_field_u64(tuple, fieldno, max_id);
+	return tuple_field_u64(tuple, 0, max_id);
 }
diff --git a/src/box/sql/opcodes.c b/src/box/sql/opcodes.c
index 7a40b28a8..5d892094a 100644
--- a/src/box/sql/opcodes.c
+++ b/src/box/sql/opcodes.c
@@ -115,7 +115,7 @@ const char *sqlite3OpcodeName(int i){
     /* 101 */ "Close"            OpHelp(""),
     /* 102 */ "ColumnsUsed"      OpHelp(""),
     /* 103 */ "Sequence"         OpHelp("r[P2]=cursor[P1].ctr++"),
-    /* 104 */ "NextId"           OpHelp("r[P3]=get_max(space_index[P1]{Column[P2]})"),
+    /* 104 */ "NextId"           OpHelp("r[P2]=get_max(space_id[P1])"),
     /* 105 */ "NextIdEphemeral"  OpHelp("r[P3]=get_max(space_index[P1]{Column[P2]})"),
     /* 106 */ "FCopy"            OpHelp("reg[P2 at cur_frame]= reg[P1 at root_frame(OPFLAG_SAME_FRAME)]"),
     /* 107 */ "Delete"           OpHelp(""),
@@ -128,26 +128,29 @@ const char *sqlite3OpcodeName(int i){
     /* 114 */ "IdxReplace"       OpHelp("key=r[P2]"),
     /* 115 */ "Real"             OpHelp("r[P2]=P4"),
     /* 116 */ "IdxInsert"        OpHelp("key=r[P2]"),
-    /* 117 */ "IdxDelete"        OpHelp("key=r[P2 at P3]"),
-    /* 118 */ "Clear"            OpHelp("space id = P1"),
-    /* 119 */ "ResetSorter"      OpHelp(""),
-    /* 120 */ "ParseSchema2"     OpHelp("rows=r[P1 at P2]"),
-    /* 121 */ "ParseSchema3"     OpHelp("name=r[P1] sql=r[P1+1]"),
-    /* 122 */ "RenameTable"      OpHelp("P1 = root, P4 = name"),
-    /* 123 */ "LoadAnalysis"     OpHelp(""),
-    /* 124 */ "DropTable"        OpHelp(""),
-    /* 125 */ "DropIndex"        OpHelp(""),
-    /* 126 */ "DropTrigger"      OpHelp(""),
-    /* 127 */ "Param"            OpHelp(""),
-    /* 128 */ "FkCounter"        OpHelp("fkctr[P1]+=P2"),
-    /* 129 */ "OffsetLimit"      OpHelp("if r[P1]>0 then r[P2]=r[P1]+max(0,r[P3]) else r[P2]=(-1)"),
-    /* 130 */ "AggStep0"         OpHelp("accum=r[P3] step(r[P2 at P5])"),
-    /* 131 */ "AggStep"          OpHelp("accum=r[P3] step(r[P2 at P5])"),
-    /* 132 */ "AggFinal"         OpHelp("accum=r[P1] N=P2"),
-    /* 133 */ "Expire"           OpHelp(""),
-    /* 134 */ "IncMaxid"         OpHelp(""),
-    /* 135 */ "Noop"             OpHelp(""),
-    /* 136 */ "Explain"          OpHelp(""),
+    /* 117 */ "SInsert"          OpHelp("space id = P1, key = r[P2]"),
+    /* 118 */ "SDelete"          OpHelp("space id = P1, key = r[P2]"),
+    /* 119 */ "SIDtoPtr"         OpHelp("space id = P1, space[out] = r[P2]"),
+    /* 120 */ "IdxDelete"        OpHelp("key=r[P2 at P3]"),
+    /* 121 */ "Clear"            OpHelp("space id = P1"),
+    /* 122 */ "ResetSorter"      OpHelp(""),
+    /* 123 */ "ParseSchema2"     OpHelp("rows=r[P1 at P2]"),
+    /* 124 */ "ParseSchema3"     OpHelp("name=r[P1] sql=r[P1+1]"),
+    /* 125 */ "RenameTable"      OpHelp("P1 = root, P4 = name"),
+    /* 126 */ "LoadAnalysis"     OpHelp(""),
+    /* 127 */ "DropTable"        OpHelp(""),
+    /* 128 */ "DropIndex"        OpHelp(""),
+    /* 129 */ "DropTrigger"      OpHelp(""),
+    /* 130 */ "Param"            OpHelp(""),
+    /* 131 */ "FkCounter"        OpHelp("fkctr[P1]+=P2"),
+    /* 132 */ "OffsetLimit"      OpHelp("if r[P1]>0 then r[P2]=r[P1]+max(0,r[P3]) else r[P2]=(-1)"),
+    /* 133 */ "AggStep0"         OpHelp("accum=r[P3] step(r[P2 at P5])"),
+    /* 134 */ "AggStep"          OpHelp("accum=r[P3] step(r[P2 at P5])"),
+    /* 135 */ "AggFinal"         OpHelp("accum=r[P1] N=P2"),
+    /* 136 */ "Expire"           OpHelp(""),
+    /* 137 */ "IncMaxid"         OpHelp(""),
+    /* 138 */ "Noop"             OpHelp(""),
+    /* 139 */ "Explain"          OpHelp(""),
   };
   return azName[i];
 }
diff --git a/src/box/sql/opcodes.h b/src/box/sql/opcodes.h
index af2ba1963..762a23205 100644
--- a/src/box/sql/opcodes.h
+++ b/src/box/sql/opcodes.h
@@ -104,7 +104,7 @@
 #define OP_Close         101
 #define OP_ColumnsUsed   102
 #define OP_Sequence      103 /* synopsis: r[P2]=cursor[P1].ctr++           */
-#define OP_NextId        104 /* synopsis: r[P3]=get_max(space_index[P1]{Column[P2]}) */
+#define OP_NextId        104 /* synopsis: r[P2]=get_max(space_id[P1])      */
 #define OP_NextIdEphemeral 105 /* synopsis: r[P3]=get_max(space_index[P1]{Column[P2]}) */
 #define OP_FCopy         106 /* synopsis: reg[P2 at cur_frame]= reg[P1 at root_frame(OPFLAG_SAME_FRAME)] */
 #define OP_Delete        107
@@ -117,26 +117,29 @@
 #define OP_IdxReplace    114 /* synopsis: key=r[P2]                        */
 #define OP_Real          115 /* same as TK_FLOAT, synopsis: r[P2]=P4       */
 #define OP_IdxInsert     116 /* synopsis: key=r[P2]                        */
-#define OP_IdxDelete     117 /* synopsis: key=r[P2 at P3]                     */
-#define OP_Clear         118 /* synopsis: space id = P1                    */
-#define OP_ResetSorter   119
-#define OP_ParseSchema2  120 /* synopsis: rows=r[P1 at P2]                    */
-#define OP_ParseSchema3  121 /* synopsis: name=r[P1] sql=r[P1+1]           */
-#define OP_RenameTable   122 /* synopsis: P1 = root, P4 = name             */
-#define OP_LoadAnalysis  123
-#define OP_DropTable     124
-#define OP_DropIndex     125
-#define OP_DropTrigger   126
-#define OP_Param         127
-#define OP_FkCounter     128 /* synopsis: fkctr[P1]+=P2                    */
-#define OP_OffsetLimit   129 /* synopsis: if r[P1]>0 then r[P2]=r[P1]+max(0,r[P3]) else r[P2]=(-1) */
-#define OP_AggStep0      130 /* synopsis: accum=r[P3] step(r[P2 at P5])       */
-#define OP_AggStep       131 /* synopsis: accum=r[P3] step(r[P2 at P5])       */
-#define OP_AggFinal      132 /* synopsis: accum=r[P1] N=P2                 */
-#define OP_Expire        133
-#define OP_IncMaxid      134
-#define OP_Noop          135
-#define OP_Explain       136
+#define OP_SInsert       117 /* synopsis: space id = P1, key = r[P2]       */
+#define OP_SDelete       118 /* synopsis: space id = P1, key = r[P2]       */
+#define OP_SIDtoPtr      119 /* synopsis: space id = P1, space[out] = r[P2] */
+#define OP_IdxDelete     120 /* synopsis: key=r[P2 at P3]                     */
+#define OP_Clear         121 /* synopsis: space id = P1                    */
+#define OP_ResetSorter   122
+#define OP_ParseSchema2  123 /* synopsis: rows=r[P1 at P2]                    */
+#define OP_ParseSchema3  124 /* synopsis: name=r[P1] sql=r[P1+1]           */
+#define OP_RenameTable   125 /* synopsis: P1 = root, P4 = name             */
+#define OP_LoadAnalysis  126
+#define OP_DropTable     127
+#define OP_DropIndex     128
+#define OP_DropTrigger   129
+#define OP_Param         130
+#define OP_FkCounter     131 /* synopsis: fkctr[P1]+=P2                    */
+#define OP_OffsetLimit   132 /* synopsis: if r[P1]>0 then r[P2]=r[P1]+max(0,r[P3]) else r[P2]=(-1) */
+#define OP_AggStep0      133 /* synopsis: accum=r[P3] step(r[P2 at P5])       */
+#define OP_AggStep       134 /* synopsis: accum=r[P3] step(r[P2 at P5])       */
+#define OP_AggFinal      135 /* synopsis: accum=r[P1] N=P2                 */
+#define OP_Expire        136
+#define OP_IncMaxid      137
+#define OP_Noop          138
+#define OP_Explain       139
 
 /* Properties such as "out2" or "jump" that are specified in
 ** comments following the "case" for each opcode in the vdbe.c
@@ -162,11 +165,11 @@
 /*  80 */ 0x00, 0x02, 0x02, 0x02, 0x00, 0x00, 0x00, 0x00,\
 /*  88 */ 0x00, 0x10, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,\
 /*  96 */ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10,\
-/* 104 */ 0x20, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,\
+/* 104 */ 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00,\
 /* 112 */ 0x00, 0x04, 0x00, 0x10, 0x04, 0x00, 0x00, 0x00,\
-/* 120 */ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10,\
-/* 128 */ 0x00, 0x1a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,\
-/* 136 */ 0x00,}
+/* 120 */ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,\
+/* 128 */ 0x00, 0x00, 0x10, 0x00, 0x1a, 0x00, 0x00, 0x00,\
+/* 136 */ 0x00, 0x00, 0x00, 0x00,}
 
 /* The sqlite3P2Values() routine is able to run faster if it knows
 ** the value of the largest JUMP opcode.  The smaller the maximum
diff --git a/src/box/sql/tarantoolInt.h b/src/box/sql/tarantoolInt.h
index 6f1ba3784..0b1e22ca2 100644
--- a/src/box/sql/tarantoolInt.h
+++ b/src/box/sql/tarantoolInt.h
@@ -77,6 +77,7 @@ int tarantoolSqlite3Count(BtCursor * pCur, i64 * pnEntry);
 int tarantoolSqlite3Insert(BtCursor * pCur);
 int tarantoolSqlite3Replace(BtCursor * pCur);
 int tarantoolSqlite3Delete(BtCursor * pCur, u8 flags);
+int sql_delete_by_key(struct space *space, char *key, uint32_t key_size);
 int tarantoolSqlite3ClearTable(struct space *space);
 
 /* Rename table pTab with zNewName by inserting new tuple to _space.
@@ -112,11 +113,10 @@ int tarantoolSqlite3IdxKeyCompare(BtCursor * pCur, UnpackedRecord * pUnpacked,
 				  int *res);
 
 /*
- * The function assumes the cursor is open on _schema.
- * Increment max_id and store updated tuple in the cursor
- * object.
+ * The function assumes to be applied on _schema space.
+ * Increment max_id and store updated id in given argument.
  */
-int tarantoolSqlite3IncrementMaxid(BtCursor * pCur);
+int tarantoolSqlite3IncrementMaxid(uint64_t *space_max_id);
 
 /*
  * Render "format" array for _space entry.
@@ -150,5 +150,4 @@ int tarantoolSqlite3MakeIdxOpts(Index * index, const char *zSql, void *buf);
  * Fetch maximum value from ineger column number `fieldno` of space_id/index_id
  * Return 0 on success, -1 otherwise
  */
-int tarantoolSqlGetMaxId(BtCursor *cur, uint32_t fieldno,
-			 uint64_t * max_id);
+int tarantoolSqlGetMaxId(uint32_t space_id, uint64_t *max_id);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 5d1227afa..d333d4177 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -3791,28 +3791,18 @@ case OP_Sequence: {           /* out2 */
 	break;
 }
 
-/* Opcode: NextId P1 P2 P3 * *
- * Synopsis: r[P3]=get_max(space_index[P1]{Column[P2]})
+/* Opcode: NextId P1 P2 * * *
+ * Synopsis: r[P2]=get_max(space_id[P1])
  *
- * Get next Id of the table. P1 is a table cursor, P2 is column
- * number. Return in P3 maximum id found in provided column,
+ * Get next Id of the table. P1 is a space id.
+ * Return in P2 maximum id found in provided column,
  * incremented by one.
- *
- * This opcode is Tarantool specific and will segfault in case
- * of SQLite cursor.
  */
-case OP_NextId: {     /* out3 */
-	VdbeCursor *pC;    /* The VDBE cursor */
-	int p2;            /* Column number, which stores the id */
-	pC = p->apCsr[pOp->p1];
-	p2 = pOp->p2;
-	pOut = &aMem[pOp->p3];
-
-	/* This opcode is Tarantool specific.  */
-	assert(pC->uc.pCursor->curFlags & BTCF_TaCursor);
+case OP_NextId: {
+	assert(pOp->p1 > 0);
+	pOut = &aMem[pOp->p2];
 
-	tarantoolSqlGetMaxId(pC->uc.pCursor, p2,
-			     (uint64_t *) &pOut->u.i);
+	tarantoolSqlGetMaxId(pOp->p1, (uint64_t *) &pOut->u.i);
 
 	pOut->u.i += 1;
 	pOut->flags = MEM_Int;
@@ -4456,6 +4446,84 @@ case OP_IdxInsert: {        /* in2 */
 	break;
 }
 
+/* Opcode: SInsert P1 P2 * * P5
+ * Synopsis: space id = P1, key = r[P2]
+ *
+ * This opcode is used only during DML routine.
+ * In contrast to ordinary insertion, insertion to system spaces
+ * such as _space or _index will lead to schema changes.
+ * Thus, usage of space pointers is going to be impossible,
+ * as far as pointers can be expired since compilation time.
+ *
+ * If P5 is set to OPFLAG_NCHANGE, account overall changes
+ * made to database.
+ */
+case OP_SInsert: {
+	assert(pOp->p1 > 0);
+	assert(pOp->p2 >= 0);
+
+	pIn2 = &aMem[pOp->p2];
+	struct space *space = space_by_id(pOp->p1);
+	assert(space != NULL);
+	assert(space_is_system(space));
+	/* Create surrogate cursor to pass to SQL bindings. */
+	BtCursor surrogate_cur;
+	surrogate_cur.space = space;
+	surrogate_cur.key = pIn2->z;
+	surrogate_cur.nKey = pIn2->n;
+	surrogate_cur.curFlags = BTCF_TaCursor;
+	rc = tarantoolSqlite3Insert(&surrogate_cur);
+	if (rc)
+		goto abort_due_to_error;
+	if (pOp->p5 & OPFLAG_NCHANGE)
+		p->nChange++;
+	break;
+}
+
+/* Opcode: SDelete P1 P2 * * P5
+ * Synopsis: space id = P1, key = r[P2]
+ *
+ * This opcode is used only during DML routine.
+ * Delete entry with given key from system space.
+ *
+ * If P5 is set to OPFLAG_NCHANGE, account overall changes
+ * made to database.
+ */
+case OP_SDelete: {
+	assert(pOp->p1 > 0);
+	assert(pOp->p2 >= 0);
+
+	pIn2 = &aMem[pOp->p2];
+	struct space *space = space_by_id(pOp->p1);
+	assert(space != NULL);
+	assert(space_is_system(space));
+	rc = sql_delete_by_key(space, pIn2->z, pIn2->n);
+	if (rc)
+		goto abort_due_to_error;
+	if (pOp->p5 & OPFLAG_NCHANGE)
+		p->nChange++;
+	break;
+}
+
+/* Opcode: SIDtoPtr P1 P2 * * *
+ * Synopsis: space id = P1, space[out] = r[P2]
+ *
+ * This opcode makes look up by space id and save found space
+ * into register, specified by the content of register P2.
+ * Such trick is needed during DML routine, since schema may
+ * change and pointers become expired.
+ */
+case OP_SIDtoPtr: {
+	assert(pOp->p1 > 0);
+	assert(pOp->p2 > 0);
+
+	pIn2 = &aMem[pOp->p2];
+	struct space *space = space_by_id(pOp->p1);
+	assert(space != NULL);
+	pIn2->u.i = (int64_t) space;
+	break;
+}
+
 /* Opcode: IdxDelete P1 P2 P3 * *
  * Synopsis: key=r[P2 at P3]
  *
@@ -5385,22 +5453,19 @@ case OP_Init: {          /* jump */
 
 /* Opcode: IncMaxid P1 * * * *
  *
- * The cursor (P1) should be open on _schema.
- * Increment the max_id (max space id) and store updated tuple in the
- * cursor.
+ * Increment the max_id from _schema (max space id)
+ * and store updated id in register specified by first operand.
+ * It is system opcode and must be used only during DDL routine.
  */
 case OP_IncMaxid: {
-	VdbeCursor *pC;
-
-	assert(pOp->p1>=0 && pOp->p1<p->nCursor);
-	pC = p->apCsr[pOp->p1];
-	assert(pC != 0);
+	assert(pOp->p1 > 0);
+	pOut = &aMem[pOp->p1];
 
-	rc = tarantoolSqlite3IncrementMaxid(pC->uc.pCursor);
+	rc = tarantoolSqlite3IncrementMaxid((uint64_t*) &pOut->u.i);
 	if (rc!=SQLITE_OK) {
 		goto abort_due_to_error;
 	}
-	pC->nullRow = 0;
+	pOut->flags = MEM_Int;
 	break;
 }
 
-- 
2.15.1





More information about the Tarantool-patches mailing list