[tarantool-patches] [PATCH] sql: implement point where for DELETE stmts

Kirill Yukhin kyukhin at tarantool.org
Fri Jun 1 14:46:53 MSK 2018


Issue: https://github.com/tarantool/tarantool/issues/3235
Branch: https://github.com/tarantool/tarantool/tree/kyukhin/gh-3235-delete-with-where

Implement support of SQL's DELETE statemets
which a accompanied by point WHERE constraints.
This patch doesn't support any kinds of nested selects
or JOINs.

Part of #3235
---
 src/box/field_def.c           |   1 +
 src/box/sql.c                 |   4 +-
 src/box/sql/build.c           |  41 +++++---
 src/box/sql/delete.c          |  48 ++++++---
 src/box/sql/insert.c          |  33 +++++++
 src/box/sql/sqliteInt.h       |  57 ++++++++++-
 src/box/sql/where.c           | 224 +++++++++++++++++++++++++++++++++++-------
 src/box/sql/whereInt.h        |   2 +
 src/box/sql/wherecode.c       | 115 +++++++++++++++++-----
 test/sql-tap/delete1.test.lua |  24 +++--
 10 files changed, 448 insertions(+), 101 deletions(-)

diff --git a/src/box/field_def.c b/src/box/field_def.c
index 4d39d03..8dbead6 100644
--- a/src/box/field_def.c
+++ b/src/box/field_def.c
@@ -95,6 +95,7 @@ const struct opt_def field_def_reg[] = {
 		     nullable_action, NULL),
 	OPT_DEF("collation", OPT_UINT32, struct field_def, coll_id),
 	OPT_DEF("default", OPT_STRPTR, struct field_def, default_value),
+	OPT_DEF("affinity", OPT_UINT32, struct field_def, affinity),
 	OPT_END,
 };
 
diff --git a/src/box/sql.c b/src/box/sql.c
index 7379cb4..b1d346e 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1453,7 +1453,7 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 		uint32_t cid = def->fields[i].coll_id;
 		struct field_def *field = &def->fields[i];
 		const char *default_str = field->default_value;
-		int base_len = 4;
+		int base_len = 5;
 		if (cid != COLL_NONE)
 			base_len += 1;
 		if (default_str != NULL)
@@ -1474,6 +1474,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 		assert(def->fields[i].is_nullable ==
 			       action_is_nullable(def->fields[i].nullable_action));
 		p = enc->encode_str(p, t, strlen(t));
+		p = enc->encode_str(p, "affinity", 8);
+		p = enc->encode_uint(p, def->fields[i].affinity);
 		p = enc->encode_str(p, "is_nullable", 11);
 		p = enc->encode_bool(p, def->fields[i].is_nullable);
 		p = enc->encode_str(p, "nullable_action", 15);
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 65bba1f..c88ad30 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -1182,9 +1182,14 @@ bool
 space_is_view(Table *table) {
 	assert(table != NULL);
 	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(table->tnum);
-	struct space *space = space_by_id(space_id);
-	assert(space != NULL);
-	return space->def->opts.is_view;
+	if (space_id > 0) {
+		struct space *space = space_by_id(space_id);
+		assert(space != NULL);
+		return space->def->opts.is_view;
+	} else {
+		assert(table->def != NULL);
+		return table->def->opts.is_view;
+	}
 }
 
 struct ExprList *
@@ -1197,23 +1202,13 @@ space_checks_expr_list(uint32_t space_id)
 	return space->def->opts.checks;
 }
 
-/**
- * Create cursor which will be positioned to the space/index.
- * It makes space lookup and loads pointer to it into register,
- * which is passes to OP_OpenWrite as an argument.
- *
- * @param parse_context Parse context.
- * @param cursor Number of cursor to be created.
- * @param entity_id Encoded space and index ids.
- * @retval address of last opcode.
- */
 int
-emit_open_cursor(Parse *parse_context, int cursor, int entity_id)
+emit_open_cursor(struct Parse *parse_context, int cursor, int entity_id)
 {
 	assert(entity_id > 0);
 	struct space *space = space_by_id(SQLITE_PAGENO_TO_SPACEID(entity_id));
 	assert(space != NULL);
-	Vdbe *vdbe = parse_context->pVdbe;
+	struct Vdbe *vdbe = parse_context->pVdbe;
 	int space_ptr_reg = ++parse_context->nMem;
 	sqlite3VdbeAddOp4(vdbe, OP_LoadPtr, 0, space_ptr_reg, 0, (void*)space,
 			  P4_SPACEPTR);
@@ -1221,6 +1216,22 @@ emit_open_cursor(Parse *parse_context, int cursor, int entity_id)
 				 space_ptr_reg);
 }
 
+int
+sql_emit_open_cursor(struct Parse *parse, int cursor, int index_id, struct space *space)
+{
+	assert(space != NULL);
+	Vdbe *vdbe = parse->pVdbe;
+	int space_ptr_reg = ++parse->nMem;
+	sqlite3VdbeAddOp4(vdbe, OP_LoadPtr, 0, space_ptr_reg, 0, (void*)space,
+			  P4_SPACEPTR);
+	struct key_def *def = key_def_dup(space->index[index_id]->def->key_def);
+	if (def == NULL)
+		return 0;
+	return sqlite3VdbeAddOp4(vdbe, OP_OpenWrite, cursor, index_id,
+				 space_ptr_reg,
+				 (char*)def,
+				 P4_KEYDEF);
+}
 /*
  * Generate code that will increment the schema cookie.
  *
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 2b59130..de4e0c1 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -184,7 +184,7 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
 		memset(&nc, 0, sizeof(nc));
 		nc.pParse = parse;
 		if (tab_list->a[0].pTab == NULL) {
-			struct Table *t = malloc(sizeof(struct Table));
+			struct Table *t = calloc(sizeof(struct Table), 1);
 			if (t == NULL) {
 				sqlite3OomFault(parse->db);
 				goto delete_from_cleanup;
@@ -266,7 +266,12 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
 		/* Extract the primary key for the current row */
 		if (!is_view) {
 			for (int i = 0; i < pk_len; i++) {
-				sqlite3ExprCodeGetColumnOfTable(v, table->def,
+				struct space_def *def;
+				if (table != NULL)
+					def = table->def;
+				else
+					def = space->def;
+				sqlite3ExprCodeGetColumnOfTable(v, def,
 								tab_cursor,
 								pk_def->parts[i].fieldno,
 								reg_pk + i);
@@ -339,8 +344,18 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
 			int space_ptr_reg = ++parse->nMem;
 			sqlite3VdbeAddOp4(v, OP_LoadPtr, 0, space_ptr_reg, 0,
 					  (void *)space, P4_SPACEPTR);
+
+			int tnum;
+			if (table != NULL) {
+				tnum = table->tnum;
+			}
+			else {
+				/* index id is 0 for PK.  */
+				tnum = SQLITE_PAGENO_FROM_SPACEID_AND_INDEXID(space->def->id,
+									      0);
+			}
 			sqlite3VdbeAddOp3(v, OP_OpenWrite, tab_cursor,
-					  table->tnum, space_ptr_reg);
+					  tnum, space_ptr_reg);
 			struct key_def *def = key_def_dup(pk_def);
 			if (def == NULL) {
 				sqlite3OomFault(parse->db);
@@ -446,7 +461,8 @@ sql_generate_row_delete(struct Parse *parse, struct Table *table,
 	/* If there are any triggers to fire, allocate a range of registers to
 	 * use for the old.* references in the triggers.
 	 */
-	if (sqlite3FkRequired(table, NULL) || trigger_list != NULL) {
+	if (table != NULL &&
+	    (sqlite3FkRequired(table, NULL) || trigger_list != NULL)) {
 		/* Mask of OLD.* columns in use */
 		/* TODO: Could use temporary registers here. */
 		uint32_t mask =
@@ -505,7 +521,7 @@ sql_generate_row_delete(struct Parse *parse, struct Table *table,
 	 * of the DELETE statement is to fire the INSTEAD OF
 	 * triggers).
 	 */
-	if (table->pSelect == NULL) {
+	if (table == NULL || table->pSelect == NULL) {
 		uint8_t p5 = 0;
 		sqlite3VdbeAddOp2(v, OP_Delete, cursor,
 				  (need_update_count ? OPFLAG_NCHANGE : 0));
@@ -520,16 +536,20 @@ sql_generate_row_delete(struct Parse *parse, struct Table *table,
 		sqlite3VdbeChangeP5(v, p5);
 	}
 
-	/* Do any ON CASCADE, SET NULL or SET DEFAULT operations
-	 * required to handle rows (possibly in other tables) that
-	 * refer via a foreign key to the row just deleted.
-	 */
-	sqlite3FkActions(parse, table, 0, first_old_reg, 0);
+	if (table != NULL) {
+		/* Do any ON CASCADE, SET NULL or SET DEFAULT
+		 * operations required to handle rows (possibly
+		 * in other tables) that refer via a foreign
+		 * key to the row just deleted.
+		 */
 
-	/* Invoke AFTER DELETE trigger programs. */
-	sqlite3CodeRowTrigger(parse, trigger_list,
-			      TK_DELETE, 0, TRIGGER_AFTER, table, first_old_reg,
-			      onconf, label);
+		sqlite3FkActions(parse, table, 0, first_old_reg, 0);
+
+		/* Invoke AFTER DELETE trigger programs. */
+		sqlite3CodeRowTrigger(parse, trigger_list,
+				      TK_DELETE, 0, TRIGGER_AFTER, table,
+				      first_old_reg, onconf, label);
+	}
 
 	/* Jump here if the row had already been deleted before
 	 * any BEFORE trigger programs were invoked. Or if a trigger program
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 3dbf855..8d0ab3b 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -108,6 +108,39 @@ sqlite3IndexAffinityStr(sqlite3 * db, Index * pIdx)
 	return pIdx->zColAff;
 }
 
+char *
+sql_index_affinity_str(struct sqlite3 * db, struct index_def *def)
+{
+	char *aff;
+	/* The first time a column affinity string for a particular index is
+	 * required, it is allocated and populated here. It is then stored as
+	 * a member of the Index structure for subsequent use.
+	 *
+	 * The column affinity string will eventually be deleted by
+	 * sqliteDeleteIndex() when the Index structure itself is cleaned
+	 * up.
+	 */
+	int nColumn = def->key_def->part_count;
+	aff = (char *)sqlite3DbMallocRaw(0, nColumn + 1);
+	if (aff == NULL) {
+		sqlite3OomFault(db);
+		return 0;
+	}
+	int i;
+	struct space *space = space_cache_find(def->space_id);
+	assert(space != NULL);
+
+	for (i = 0; i < nColumn; i++) {
+		uint32_t x = def->key_def->parts[i].fieldno;
+		aff[i] = space->def->fields[x].affinity;
+		if (aff[i] == AFFINITY_UNDEFINED)
+			aff[i] = 'A';
+	}
+	aff[i] = 0;
+
+	return aff;
+}
+
 /*
  * Compute the affinity string for table pTab, if it has not already been
  * computed.  As an optimization, omit trailing AFFINITY_BLOB affinities.
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 943fda9..6ea956d 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2536,6 +2536,8 @@ struct SrcList {
 		char *zName;	/* Name of the table */
 		char *zAlias;	/* The "B" part of a "A AS B" phrase.  zName is the "A" */
 		Table *pTab;	/* An SQL table corresponding to zName */
+		/* A temporary hack: need to store eph. space.  */
+		struct space *space;
 		Select *pSelect;	/* A SELECT statement used in place of a table name */
 		int addrFillSub;	/* Address of subroutine to manifest a subquery */
 		int regReturn;	/* Register holding return address of addrFillSub */
@@ -3552,8 +3554,36 @@ enum sort_order
 sql_index_column_sort_order(Index *idx, uint32_t column);
 
 void sqlite3EndTable(Parse *, Token *, Token *, Select *);
+
+/**
+ * DEPRECATED. All calls to be replaced w/ sql_emit_open_cursor.
+ * Create cursor which will be positioned to the space/index.
+ * It makes space lookup and loads pointer to it into register,
+ * which is passes to OP_OpenWrite as an argument.
+ *
+ * @param parse Parse context.
+ * @param cursor Number of cursor to be created.
+ * @param entity_id Encoded space and index ids.
+ * @retval address of last opcode.
+ */
+int
+emit_open_cursor(struct Parse *parse, int cursor, int entity_id);
+
+/**
+ * Create cursor which will be positioned to the space/index.
+ * It makes space lookup and loads pointer to it into register,
+ * which is passes to OP_OpenWrite as an argument.
+ *
+ * @param parse_context Parse context.
+ * @param cursor Number of cursor to be created.
+ * @param index_id Encoded index id (encoding is void actually, so
+ *        pas it as is). In future will be replaced with pointer
+ *        to struct index.
+ * @retval address of last opcode.
+ */
 int
-emit_open_cursor(Parse *, int, int);
+sql_emit_open_cursor(struct Parse *parse, int cursor, int index_id,
+		     struct space *space);
 
 int sqlite3ParseUri(const char *, const char *, unsigned int *,
 		    sqlite3_vfs **, char **, char **);
@@ -4085,6 +4115,31 @@ int sqlite3VarintLen(u64 v);
 #define putVarint    sqlite3PutVarint
 
 const char *sqlite3IndexAffinityStr(sqlite3 *, Index *);
+
+/**
+ * Return a pointer to the column affinity string associated with index
+ * pIdx. A column affinity string has one character for each column in
+ * the table, according to the affinity of the column:
+ *
+ *  Character      Column affinity
+ *  ------------------------------
+ *  'A'            BLOB
+ *  'B'            TEXT
+ *  'C'            NUMERIC
+ *  'D'            INTEGER
+ *  'F'            REAL
+ *
+ * Memory for the buffer containing the column index affinity string
+ * is managed along with the rest of the Index structure. It will be
+ * released when sqlite3DeleteIndex() is called.
+ *
+ * @param db Database handle.
+ * @param def index_def where from affinity to be extracted.
+ * @retval Affinity string.
+ */
+char *
+sql_index_affinity_str(struct sqlite3 *db, struct index_def *def);
+
 void sqlite3TableAffinity(Vdbe *, Table *, int);
 char sqlite3CompareAffinity(Expr * pExpr, char aff2);
 int sqlite3IndexAffinityOk(Expr * pExpr, char idx_affinity);
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index d312587..14cb23d 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -371,7 +371,10 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
 	pScan->is_column_seen = false;
 	if (pIdx) {
 		int j = iColumn;
-		iColumn = pIdx->aiColumn[j];
+		if (j >= pIdx->nColumn)
+			iColumn = -1;
+		else
+			iColumn = pIdx->aiColumn[j];
 		if (iColumn >= 0) {
 			char affinity =
 				pIdx->pTable->def->fields[iColumn].affinity;
@@ -390,6 +393,34 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
 	return whereScanNext(pScan);
 }
 
+static WhereTerm *
+sql_where_scan_init(struct WhereScan *scan, struct WhereClause *clause,
+		    int cursor, int column, uint32_t op_mask,
+		    struct space_def *space_def, struct key_def *key_def)
+{
+	scan->pOrigWC = scan->pWC = clause;
+	scan->pIdxExpr = NULL;
+	scan->idxaff = 0;
+	scan->coll = NULL;
+	scan->is_column_seen = false;
+	if (key_def != NULL) {
+		int j = column;
+		column = key_def->parts[j].fieldno;
+		if (column >= 0) {
+			scan->idxaff = space_def->fields[column].affinity;
+			scan->coll = key_def->parts[column].coll;
+			scan->is_column_seen = true;
+		}
+	}
+	scan->opMask = op_mask;
+	scan->k = 0;
+	scan->aiCur[0] = cursor;
+	scan->aiColumn[0] = column;
+	scan->nEquiv = 1;
+	scan->iEquiv = 1;
+	return whereScanNext(scan);
+}
+
 /*
  * Search for a term in the WHERE clause that is of the form "X <op> <expr>"
  * where X is a reference to the iColumn of table iCur or of index pIdx
@@ -441,6 +472,36 @@ sqlite3WhereFindTerm(WhereClause * pWC,	/* The WHERE clause to be searched */
 	return pResult;
 }
 
+WhereTerm *
+sql_where_find_term(WhereClause * pWC,	/* The WHERE clause to be searched */
+		    int iCur,		/* Cursor number of LHS */
+		    int iColumn,	/* Column number of LHS */
+		    Bitmask notReady,	/* RHS must not overlap with this mask */
+		    u32 op,		/* Mask of WO_xx values describing operator */
+		    struct space_def *space_def,
+		    struct key_def *key_def)	/* Must be compatible with this index, if not NULL */
+{
+	WhereTerm *pResult = 0;
+	WhereTerm *p;
+	WhereScan scan;
+
+	p = sql_where_scan_init(&scan, pWC, iCur, iColumn, op, space_def,
+				key_def);
+	op &= WO_EQ;
+	while (p) {
+		if ((p->prereqRight & notReady) == 0) {
+			if (p->prereqRight == 0 && (p->eOperator & op) != 0) {
+				testcase(p->eOperator & WO_IS);
+				return p;
+			}
+			if (pResult == 0)
+				pResult = p;
+		}
+		p = whereScanNext(&scan);
+	}
+	return pResult;
+}
+
 /*
  * This function searches pList for an entry that matches the iCol-th column
  * of index pIdx.
@@ -1703,6 +1764,7 @@ whereLoopInit(WhereLoop * p)
 	p->nLTerm = 0;
 	p->nLSlot = ArraySize(p->aLTermSpace);
 	p->wsFlags = 0;
+	p->index_def = NULL;
 }
 
 /*
@@ -3366,7 +3428,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 				/* Get the column number in the table (iColumn) and sort order
 				 * (revIdx) for the j-th column of the index.
 				 */
-				if (pIndex) {
+				if (pIndex && j < pIndex->nColumn) {
 					iColumn = pIndex->aiColumn[j];
 					revIdx = sql_index_column_sort_order(pIndex,
 									     j);
@@ -4046,7 +4108,7 @@ wherePathSolver(WhereInfo * pWInfo, LogEst nRowEst)
  * general-purpose query planner.
  */
 static int
-whereShortCut(WhereLoopBuilder * pBuilder)
+sql_where_shortcut(struct WhereLoopBuilder *builder)
 {
 	WhereInfo *pWInfo;
 	struct SrcList_item *pItem;
@@ -4055,20 +4117,20 @@ whereShortCut(WhereLoopBuilder * pBuilder)
 	WhereLoop *pLoop;
 	int iCur;
 	int j;
-	Table *pTab;
-	Index *pIdx;
 
-	pWInfo = pBuilder->pWInfo;
+	pWInfo = builder->pWInfo;
 	if (pWInfo->wctrlFlags & WHERE_OR_SUBCLAUSE)
 		return 0;
 	assert(pWInfo->pTabList->nSrc >= 1);
 	pItem = pWInfo->pTabList->a;
-	pTab = pItem->pTab;
+	struct space_def *space_def = pItem->pTab->def;
+	assert(space_def != NULL);
+
 	if (pItem->fg.isIndexedBy)
 		return 0;
 	iCur = pItem->iCursor;
 	pWC = &pWInfo->sWC;
-	pLoop = pBuilder->pNew;
+	pLoop = builder->pNew;
 	pLoop->wsFlags = 0;
 	pLoop->nSkip = 0;
 	pTerm = sqlite3WhereFindTerm(pWC, iCur, -1, 0, WO_EQ, 0);
@@ -4080,34 +4142,74 @@ whereShortCut(WhereLoopBuilder * pBuilder)
 		/* TUNING: Cost of a PK lookup is 10 */
 		pLoop->rRun = 33;	/* 33==sqlite3LogEst(10) */
 	} else {
-		for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
+		struct space *space = space_cache_find(space_def->id);
+		if (space != NULL) {
+			for (uint32_t i = 0; i < space->index_count; ++i) {
+				struct index_def *idx_def;
+				idx_def = space->index[i]->def;
+				int opMask;
+				int nIdxCol = idx_def->key_def->part_count;
+				assert(pLoop->aLTermSpace == pLoop->aLTerm);
+				if (!idx_def->opts.is_unique
+				    /* || pIdx->pPartIdxWhere != 0 */
+				    || nIdxCol > ArraySize(pLoop->aLTermSpace)
+					)
+					continue;
+				opMask = WO_EQ;
+				for (j = 0; j < nIdxCol; j++) {
+					pTerm = sql_where_find_term(pWC, iCur,
+								    j, 0,
+								    opMask,
+								    space_def,
+								    idx_def->
+								    key_def);
+					if (pTerm == 0)
+						break;
+					testcase(pTerm->eOperator & WO_IS);
+					pLoop->aLTerm[j] = pTerm;
+				}
+				if (j != nIdxCol)
+					continue;
+				pLoop->wsFlags = WHERE_COLUMN_EQ |
+					WHERE_ONEROW | WHERE_INDEXED |
+					WHERE_IDX_ONLY;
+				pLoop->nLTerm = j;
+				pLoop->nEq = j;
+				pLoop->pIndex = NULL;
+				pLoop->index_def = idx_def;
+				/* TUNING: Cost of a unique index lookup is 15 */
+				pLoop->rRun = 39;	/* 39==sqlite3LogEst(15) */
+				break;
+			}
+		} else {
+			/* Space is ephemeral.  */
+			assert(space_def->id == 0);
 			int opMask;
-			int nIdxCol = index_column_count(pIdx);
+			int nIdxCol = space_def->field_count;
 			assert(pLoop->aLTermSpace == pLoop->aLTerm);
-			if (!index_is_unique(pIdx)
-			    || pIdx->pPartIdxWhere != 0
-			    || nIdxCol > ArraySize(pLoop->aLTermSpace)
-			    )
-				continue;
+			if ( nIdxCol > ArraySize(pLoop->aLTermSpace))
+				return 0;
 			opMask = WO_EQ;
 			for (j = 0; j < nIdxCol; j++) {
-				pTerm =
-				    sqlite3WhereFindTerm(pWC, iCur, j, 0,
-							 opMask, pIdx);
-				if (pTerm == 0)
+				pTerm = sql_where_find_term(pWC, iCur,
+							    j, 0,
+							    opMask,
+							    space_def,
+							    NULL);
+				if (pTerm == NULL)
 					break;
 				pLoop->aLTerm[j] = pTerm;
 			}
 			if (j != nIdxCol)
-				continue;
+				return 0;
 			pLoop->wsFlags = WHERE_COLUMN_EQ | WHERE_ONEROW |
 					 WHERE_INDEXED | WHERE_IDX_ONLY;
 			pLoop->nLTerm = j;
 			pLoop->nEq = j;
-			pLoop->pIndex = pIdx;
+			pLoop->pIndex = NULL;
+			pLoop->index_def = NULL;
 			/* TUNING: Cost of a unique index lookup is 15 */
 			pLoop->rRun = 39;	/* 39==sqlite3LogEst(15) */
-			break;
 		}
 	}
 	if (pLoop->wsFlags) {
@@ -4420,7 +4522,7 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 	}
 #endif
 
-	if (nTabList != 1 || whereShortCut(&sWLB) == 0) {
+	if (nTabList != 1 || sql_where_shortcut(&sWLB) == 0) {
 		rc = whereLoopAddAll(&sWLB);
 		if (rc)
 			goto whereBeginError;
@@ -4583,13 +4685,31 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 		}
 		if (pLoop->wsFlags & WHERE_INDEXED) {
 			Index *pIx = pLoop->pIndex;
+			struct index_def *idx_def = pLoop->index_def;
+			struct space *space = space_cache_find(pTabItem->pTab->def->id);
 			int iIndexCur;
 			int op = OP_OpenRead;
 			/* iAuxArg is always set if to a positive value if ONEPASS is possible */
 			assert(iAuxArg != 0
 			       || (pWInfo->
 				   wctrlFlags & WHERE_ONEPASS_DESIRED) == 0);
-			if (IsPrimaryKeyIndex(pIx)
+			/* Check if index is primary. Either of
+			 * points should be true:
+			 * 1. struct Index is non-NULL and is
+			 *    primary
+			 * 2. idx_def is non-NULL and it is
+			 *    primary
+			 * 3. (goal of this comment) both pIx and
+			 *    idx_def are NULL in which case it is
+			 *    ephemeral table, but not in Tnt sense.
+			 *    It is something w/ defined space_def
+			 *    and nothing else. Skip such loops.
+			 */
+			if (idx_def == NULL && pIx == NULL) continue;
+			bool is_primary = (pIx != NULL && IsPrimaryKeyIndex(pIx)) ||
+					   (idx_def != NULL && (idx_def->iid == 0)) |
+				(idx_def == NULL && pIx == NULL);
+			if (is_primary
 			    && (wctrlFlags & WHERE_OR_SUBCLAUSE) != 0) {
 				/* This is one term of an OR-optimization using
 				 * the PRIMARY KEY.  No need for a separate index
@@ -4597,13 +4717,31 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 				iIndexCur = pLevel->iTabCur;
 				op = 0;
 			} else if (pWInfo->eOnePass != ONEPASS_OFF) {
-				Index *pJ = pTabItem->pTab->pIndex;
-				iIndexCur = iAuxArg;
-				assert(wctrlFlags & WHERE_ONEPASS_DESIRED);
-				while (ALWAYS(pJ) && pJ != pIx) {
-					iIndexCur++;
-					pJ = pJ->pNext;
+				if (pIx != NULL) {
+					Index *pJ = pTabItem->pTab->pIndex;
+					iIndexCur = iAuxArg;
+					assert(wctrlFlags &
+					       WHERE_ONEPASS_DESIRED);
+					while (ALWAYS(pJ) && pJ != pIx) {
+						iIndexCur++;
+						pJ = pJ->pNext;
+					}
+				} else {
+					if (space != NULL) {
+						for(uint32_t i = 0;
+						    i < space->index_count;
+						    ++i) {
+							if (space->index[i]->def ==
+							    idx_def) {
+								iIndexCur = iAuxArg + i;
+								break;
+							}
+						}
+					} else {
+						iIndexCur = iAuxArg;
+					}
 				}
+				assert(wctrlFlags & WHERE_ONEPASS_DESIRED);
 				op = OP_OpenWrite;
 				pWInfo->aiCurOnePass[1] = iIndexCur;
 			} else if (iAuxArg
@@ -4614,11 +4752,17 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 				iIndexCur = pParse->nTab++;
 			}
 			pLevel->iIdxCur = iIndexCur;
-			assert(pIx->pSchema == pTab->pSchema);
 			assert(iIndexCur >= 0);
 			if (op) {
-				emit_open_cursor(pParse, iIndexCur, pIx->tnum);
-				sql_vdbe_set_p4_key_def(pParse, pIx);
+				if (pIx != NULL) {
+					emit_open_cursor(pParse, iIndexCur,
+							 pIx->tnum);
+					sql_vdbe_set_p4_key_def(pParse, pIx);
+				} else {
+					sql_emit_open_cursor(pParse, iIndexCur,
+							     idx_def->iid,
+							     space);
+				}
 				if ((pLoop->wsFlags & WHERE_CONSTRAINT) != 0
 				    && (pLoop->
 					wsFlags & (WHERE_COLUMN_RANGE |
@@ -4627,7 +4771,10 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 					wctrlFlags & WHERE_ORDERBY_MIN) == 0) {
 					sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);	/* Hint to COMDB2 */
 				}
-				VdbeComment((v, "%s", pIx->zName));
+				if (pIx != NULL)
+					VdbeComment((v, "%s", pIx->zName));
+				else
+					VdbeComment((v, "%s", idx_def->name));
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
 				{
 					u64 colUsed = 0;
@@ -4809,7 +4956,6 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
 	for (i = 0, pLevel = pWInfo->a; i < pWInfo->nLevel; i++, pLevel++) {
 		int k, last;
 		VdbeOp *pOp;
-		Index *pIdx = 0;
 		struct SrcList_item *pTabItem = &pTabList->a[pLevel->iFrom];
 		Table *pTab MAYBE_UNUSED = pTabItem->pTab;
 		assert(pTab != 0);
@@ -4836,12 +4982,15 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
 		 * that reference the table and converts them into opcodes that
 		 * reference the index.
 		 */
+		Index *pIdx = NULL;
+		struct index_def *def = NULL;
 		if (pLoop->wsFlags & (WHERE_INDEXED | WHERE_IDX_ONLY)) {
 			pIdx = pLoop->pIndex;
+			def = pLoop->index_def;
 		} else if (pLoop->wsFlags & WHERE_MULTI_OR) {
 			pIdx = pLevel->u.pCovidx;
 		}
-		if (pIdx && !db->mallocFailed) {
+		if ((pIdx != NULL || def != NULL) && !db->mallocFailed) {
 			last = sqlite3VdbeCurrentAddr(v);
 			k = pLevel->addrBody;
 			pOp = sqlite3VdbeGetOp(v, k);
@@ -4850,7 +4999,8 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
 					continue;
 				if (pOp->opcode == OP_Column) {
 					int x = pOp->p2;
-					assert(pIdx->pTable == pTab);
+					assert(pIdx == NULL ||
+					       pIdx->pTable == pTab);
 					if (x >= 0) {
 						pOp->p2 = x;
 						pOp->p1 = pLevel->iIdxCur;
diff --git a/src/box/sql/whereInt.h b/src/box/sql/whereInt.h
index 1303365..548cbcb 100644
--- a/src/box/sql/whereInt.h
+++ b/src/box/sql/whereInt.h
@@ -141,6 +141,8 @@ struct WhereLoop {
 	u16 nBtm;	/* Size of BTM vector */
 	u16 nTop;	/* Size of TOP vector */
 	Index *pIndex;	/* Index used, or NULL */
+	/** Index definition, if there's no pIndex. */
+	struct index_def *index_def;
 	u32 wsFlags;		/* WHERE_* flags describing the plan */
 	u16 nLTerm;		/* Number of entries in aLTerm[] */
 	u16 nSkip;		/* Number of NULL aLTerm[] entries */
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index bf2a2a2..1ce1db0 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -38,6 +38,7 @@
  * that actually generate the bulk of the WHERE loop code.  The original where.c
  * file retains the code that does query planning and analysis.
  */
+#include "box/schema.h"
 #include "sqliteInt.h"
 #include "whereInt.h"
 
@@ -62,6 +63,7 @@ explainIndexColumnName(Index * pIdx, int i)
 static void
 explainAppendTerm(StrAccum * pStr,	/* The text expression being built */
 		  Index * pIdx,		/* Index to read column names from */
+		  struct index_def *def,
 		  int nTerm,		/* Number of terms */
 		  int iTerm,		/* Zero-based index of first term. */
 		  int bAnd,		/* Non-zero to append " AND " */
@@ -78,9 +80,16 @@ explainAppendTerm(StrAccum * pStr,	/* The text expression being built */
 	for (i = 0; i < nTerm; i++) {
 		if (i)
 			sqlite3StrAccumAppend(pStr, ",", 1);
-		sqlite3StrAccumAppendAll(pStr,
-					 explainIndexColumnName(pIdx,
-								iTerm + i));
+		const char *name;
+		if (pIdx != NULL) {
+			name = explainIndexColumnName(pIdx, iTerm + i);
+		} else {
+			assert(def != NULL);
+                        struct space *space = space_cache_find(def->space_id);
+                        assert(space != NULL);
+                        name = space->def->fields[i].name;
+		}
+		sqlite3StrAccumAppendAll(pStr, name);
 	}
 	if (nTerm > 1)
 		sqlite3StrAccumAppend(pStr, ")", 1);
@@ -116,16 +125,26 @@ static void
 explainIndexRange(StrAccum * pStr, WhereLoop * pLoop)
 {
 	Index *pIndex = pLoop->pIndex;
+	struct index_def *def = pLoop->index_def;
 	u16 nEq = pLoop->nEq;
 	u16 nSkip = pLoop->nSkip;
 	int i, j;
 
+	assert(pIndex != NULL || def != NULL);
+
 	if (nEq == 0
 	    && (pLoop->wsFlags & (WHERE_BTM_LIMIT | WHERE_TOP_LIMIT)) == 0)
 		return;
 	sqlite3StrAccumAppend(pStr, " (", 2);
 	for (i = 0; i < nEq; i++) {
-		const char *z = explainIndexColumnName(pIndex, i);
+		const char *z;
+		if (pIndex != NULL) {
+			z = explainIndexColumnName(pIndex, i);
+		} else {
+			struct space *space = space_cache_find(def->space_id);
+			assert(space != NULL);
+			z = space->def->fields[i].name;
+		}
 		if (i)
 			sqlite3StrAccumAppend(pStr, " AND ", 5);
 		sqlite3XPrintf(pStr, i >= nSkip ? "%s=?" : "ANY(%s)", z);
@@ -133,11 +152,11 @@ explainIndexRange(StrAccum * pStr, WhereLoop * pLoop)
 
 	j = i;
 	if (pLoop->wsFlags & WHERE_BTM_LIMIT) {
-		explainAppendTerm(pStr, pIndex, pLoop->nBtm, j, i, ">");
+		explainAppendTerm(pStr, pIndex, def, pLoop->nBtm, j, i, ">");
 		i = 1;
 	}
 	if (pLoop->wsFlags & WHERE_TOP_LIMIT) {
-		explainAppendTerm(pStr, pIndex, pLoop->nTop, j, i, "<");
+		explainAppendTerm(pStr, pIndex, def, pLoop->nTop, j, i, "<");
 	}
 	sqlite3StrAccumAppend(pStr, ")", 1);
 }
@@ -199,13 +218,15 @@ sqlite3WhereExplainOneScan(Parse * pParse,	/* Parse context */
 		}
 		if ((flags & WHERE_IPK) == 0) {
 			const char *zFmt = 0;
-			Index *pIdx;
+			Index *pIdx = pLoop->pIndex;
+			struct index_def *idx_def = pLoop->index_def;
+			if (pIdx == NULL && idx_def == NULL) return 0;
 
-			assert(pLoop->pIndex != 0);
-			pIdx = pLoop->pIndex;
+			assert(pIdx != NULL || idx_def != NULL);
 			assert(!(flags & WHERE_AUTO_INDEX)
 			       || (flags & WHERE_IDX_ONLY));
-			if (IsPrimaryKeyIndex(pIdx)) {
+			if ((pIdx != NULL && IsPrimaryKeyIndex(pIdx)) ||
+			    (idx_def != NULL && idx_def->iid == 0)) {
 				if (isSearch) {
 					zFmt = "PRIMARY KEY";
 				}
@@ -220,7 +241,12 @@ sqlite3WhereExplainOneScan(Parse * pParse,	/* Parse context */
 			}
 			if (zFmt) {
 				sqlite3StrAccumAppend(&str, " USING ", 7);
-				sqlite3XPrintf(&str, zFmt, pIdx->zName);
+				if (pIdx != NULL)
+					sqlite3XPrintf(&str, zFmt, pIdx->zName);
+				else if (idx_def != NULL)
+					sqlite3XPrintf(&str, zFmt, idx_def->name);
+				else
+					sqlite3XPrintf(&str, zFmt, "EPHEMERAL INDEX");
 				explainIndexRange(&str, pLoop);
 			}
 		} else if ((flags & WHERE_IPK) != 0
@@ -675,20 +701,19 @@ codeAllEqualityTerms(Parse * pParse,	/* Parsing context */
 	u16 nEq;		/* The number of == or IN constraints to code */
 	u16 nSkip;		/* Number of left-most columns to skip */
 	Vdbe *v = pParse->pVdbe;	/* The vm under construction */
-	Index *pIdx;		/* The index being used for this loop */
 	WhereTerm *pTerm;	/* A single constraint term */
 	WhereLoop *pLoop;	/* The WhereLoop object */
 	int j;			/* Loop counter */
 	int regBase;		/* Base register */
 	int nReg;		/* Number of registers to allocate */
-	char *zAff;		/* Affinity string to return */
 
 	/* This module is only called on query plans that use an index. */
 	pLoop = pLevel->pWLoop;
 	nEq = pLoop->nEq;
 	nSkip = pLoop->nSkip;
-	pIdx = pLoop->pIndex;
-	assert(pIdx != 0);
+	struct Index *pIdx = pLoop->pIndex;
+	struct index_def *idx_def = pLoop->index_def;
+	assert(pIdx != NULL || idx_def != NULL);
 
 	/* Figure out how many memory cells we will need then allocate them.
 	 */
@@ -696,9 +721,13 @@ codeAllEqualityTerms(Parse * pParse,	/* Parsing context */
 	nReg = pLoop->nEq + nExtraReg;
 	pParse->nMem += nReg;
 
-	zAff =
-	    sqlite3DbStrDup(pParse->db,
-			    sqlite3IndexAffinityStr(pParse->db, pIdx));
+	char *zAff;
+	if (pIdx != NULL) {
+		zAff = sqlite3DbStrDup(pParse->db,
+				       sqlite3IndexAffinityStr(pParse->db, pIdx));
+	} else {
+		zAff = sql_index_affinity_str(pParse->db, idx_def);
+	}
 	assert(zAff != 0 || pParse->db->mallocFailed);
 
 	if (nSkip) {
@@ -1214,7 +1243,6 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		int endEq;	/* True if range end uses ==, >= or <= */
 		int start_constraints;	/* Start of range is constrained */
 		int nConstraint;	/* Number of constraint terms */
-		Index *pIdx;	/* The index we will be using */
 		int iIdxCur;	/* The VDBE cursor for the index */
 		int nExtraReg = 0;	/* Number of extra registers needed */
 		int op;		/* Instruction opcode */
@@ -1227,7 +1255,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 					      * to integer type, used for IPK.
 					      */
 
-		pIdx = pLoop->pIndex;
+		struct Index *pIdx = pLoop->pIndex;
+		struct index_def *idx_def = pLoop->index_def;
+		assert(pIdx != NULL || idx_def != NULL);
 		iIdxCur = pLevel->iIdxCur;
 		assert(nEq >= pLoop->nSkip);
 
@@ -1242,7 +1272,11 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		assert(pWInfo->pOrderBy == 0
 		       || pWInfo->pOrderBy->nExpr == 1
 		       || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
-		int nIdxCol = index_column_count(pIdx);
+		int nIdxCol;
+		if (pIdx != NULL)
+			nIdxCol = index_column_count(pIdx);
+		else
+			nIdxCol = idx_def->key_def->part_count;
 		if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
 		    && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
 			j = pIdx->aiColumn[nEq];
@@ -1379,11 +1413,34 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 			startEq = 0;
 			start_constraints = 1;
 		}
-		struct Index *pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-		assert(pk);
-		int nPkCol = index_column_count(pk);
-		char affinity =
-			pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+		struct Index *pk = NULL;
+		struct index_def *idx_pk = NULL;
+		char affinity;
+		if (pIdx == NULL) {
+			struct space *space = space_cache_find(idx_def->space_id);
+			assert(space != NULL);
+			idx_pk = space->index[0]->def;
+			int fieldno = idx_pk->key_def->parts[0].fieldno;
+			affinity = space->def->fields[fieldno].affinity;
+			if (affinity == AFFINITY_UNDEFINED) {
+				if (idx_pk->key_def->part_count == 1 &&
+				    space->def->fields[fieldno].type ==
+				    FIELD_TYPE_INTEGER)
+					affinity = AFFINITY_INTEGER;
+				else
+					affinity = AFFINITY_BLOB;
+			}
+		} else {
+			pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
+			affinity =
+				pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+		}
+
+		int nPkCol;
+		if (pk != NULL)
+			nPkCol = index_column_count(pk);
+		else
+			nPkCol = idx_pk->key_def->part_count;
 		if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
 			/* Right now INTEGER PRIMARY KEY is the only option to
 			 * get Tarantool's INTEGER column type. Need special handling
@@ -1392,7 +1449,11 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 			 */
 			int limit = pRangeStart == NULL ? nEq : nEq + 1;
 			for (int i = 0; i < limit; i++) {
-				if (pIdx->aiColumn[i] == pk->aiColumn[0]) {
+				if ((pIdx != NULL && pIdx->aiColumn[i] ==
+				     pk->aiColumn[0]) ||
+				    (idx_pk != NULL &&
+				     idx_def->key_def->parts[i].fieldno ==
+				     idx_pk->key_def->parts[0].fieldno)) {
 					/* Here: we know for sure that table has INTEGER
 					   PRIMARY KEY, single column, and Index we're
 					   trying to use for scan contains this column. */
diff --git a/test/sql-tap/delete1.test.lua b/test/sql-tap/delete1.test.lua
index 810ca8a..d6d4762 100755
--- a/test/sql-tap/delete1.test.lua
+++ b/test/sql-tap/delete1.test.lua
@@ -1,6 +1,6 @@
 #!/usr/bin/env tarantool
 test = require("sqltester")
-test:plan(9)
+test:plan(10)
 
 --!./tcltestrunner.lua
 -- ["set","testdir",[["file","dirname",["argv0"]]]]
@@ -132,18 +132,30 @@ test:do_test(
     })
 
 -- Tests for data dictionary integration.
-s = box.schema.create_space('t')
-i = s:create_index('i', {parts={1, 'unsigned'}})
+format = {}
+format[1] = {name = 'id', type = 'scalar'}
+format[2] = {name = 'f', type = 'scalar'}
+s = box.schema.create_space('t', {format = format})
+i = s:create_index('i', {parts = {1, 'scalar'}})
+
 test:do_test(
     "delete1-6.0",
     function()
-	s:replace({1})
-	s:replace({2})
-	s:replace({3})
+	s:replace({1, 4})
+	s:replace({2, 5})
+	s:replace({3, 6})
 	return s:count()
     end,
     3)
 
+test:do_test(
+    "delete1-6.1.1",
+    function()
+        box.sql.execute([[delete from "t" where "id"=2]])
+        return s:count()
+    end,
+    2)
+
 test:do_test(
     "delete1-6.1",
     function()
-- 
2.16.2





More information about the Tarantool-patches mailing list