[patches] [PATCH] sql: remove unused fields from SQL index

Nikita Pettik korablev at tarantool.org
Wed Feb 28 19:38:26 MSK 2018


As part of Tarantool data dictionary integration into SQL, it is
required to delegate usages of index properties to Tarantool, and remove
those which are unused. Hence, fields 'isCovering', 'nKeyCol',
'isResized' have been deleted: all indexes in Tarantool are covering;
number of key columns matches with number of columns (since there is no
more rowid); indexes can't be resized.
Number of columns and uniqueness/nullability are fetched from Tarantool,
when it is possible, i.e. space has been created, not ephemeral and
doesn't represent view.

Closes #3179
---
Branch: https://github.com/tarantool/tarantool/tree/np/gh-3179-index-cleanup 
Issue: https://github.com/tarantool/tarantool/issues/3179

 src/box/sql.c           |  4 +--
 src/box/sql/analyze.c   | 43 +++++++++++-----------
 src/box/sql/build.c     | 84 ++++++++++++++++++++++++++++---------------
 src/box/sql/delete.c    | 13 +++----
 src/box/sql/expr.c      |  4 +--
 src/box/sql/fkey.c      | 10 +++---
 src/box/sql/insert.c    | 41 +++++++++++----------
 src/box/sql/pragma.c    |  6 ++--
 src/box/sql/sqliteInt.h | 11 +++---
 src/box/sql/update.c    |  9 ++---
 src/box/sql/vdbemem.c   |  2 +-
 src/box/sql/where.c     | 96 +++++++++++++++++--------------------------------
 src/box/sql/wherecode.c | 27 +++++++-------
 src/box/sql/whereexpr.c |  2 +-
 14 files changed, 175 insertions(+), 177 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index 9ce270da9..528064676 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1553,7 +1553,7 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 
 	/* If table's PK is single column which is INTEGER, then
 	 * treat it as strict type, not affinity.  */
-	if (pk_idx && pk_idx->nKeyCol == 1) {
+	if (pk_idx && pk_idx->nColumn == 1) {
 		int pk = pk_idx->aiColumn[0];
 		if (pTable->aCol[pk].affinity == 'D')
 			pk_forced_int = pk;
@@ -1621,7 +1621,7 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
 
 	/* If table's PK is single column which is INTEGER, then
 	 * treat it as strict type, not affinity.  */
-	if (primary_index->nKeyCol == 1) {
+	if (primary_index->nColumn == 1) {
 		int pk = primary_index->aiColumn[0];
 		if (aCol[pk].affinity == 'D')
 			pk_forced_int = pk;
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index ca8d37c76..b4d17dcc9 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -847,7 +847,6 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 	sqlite3VdbeLoadString(v, regTabname, pTab->zName);
 
 	for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-		int nCol;	/* Number of columns in pIdx. "N" */
 		int addrRewind;	/* Address of "OP_Rewind iIdxCur" */
 		int addrNextRow;	/* Address of "next_row:" */
 		const char *zIdxName;	/* Name of the index */
@@ -855,15 +854,16 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 
 		if (pOnlyIdx && pOnlyIdx != pIdx)
 			continue;
+		/* Primary indexes feature automatically generated
+		 * names. Thus, for the sake of clarity, use
+		 * instead more familiar table name.
+		 */
 		if (IsPrimaryKeyIndex(pIdx)) {
-			nCol = pIdx->nKeyCol;
 			zIdxName = pTab->zName;
-			nColTest = nCol;
 		} else {
-			nCol = pIdx->nColumn;
 			zIdxName = pIdx->zName;
-			nColTest = pIdx->uniqNotNull ? pIdx->nKeyCol : nCol;
 		}
+		nColTest = index_column_count(pIdx);
 
 		/* Populate the register containing the index name. */
 		sqlite3VdbeLoadString(v, regIdxname, zIdxName);
@@ -925,8 +925,8 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 		 * The third argument is only used for STAT4
 		 */
 		sqlite3VdbeAddOp2(v, OP_Count, iIdxCur, regStat4 + 3);
-		sqlite3VdbeAddOp2(v, OP_Integer, pIdx->nKeyCol, regStat4 + 1);
-		sqlite3VdbeAddOp2(v, OP_Integer, pIdx->nKeyCol, regStat4 + 2);
+		sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 1);
+		sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 2);
 		sqlite3VdbeAddOp4(v, OP_Function0, 0, regStat4 + 1, regStat4,
 				  (char *)&statInitFuncdef, P4_FUNCDEF);
 		sqlite3VdbeChangeP5(v, 3);
@@ -964,8 +964,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 			 */
 			sqlite3VdbeAddOp0(v, OP_Goto);
 			addrNextRow = sqlite3VdbeCurrentAddr(v);
-			if (nColTest == 1 && pIdx->nKeyCol == 1
-			    && IsUniqueIndex(pIdx)) {
+			if (nColTest == 1 && IsUniqueIndex(pIdx)) {
 				/* For a single-column UNIQUE index, once we have found a non-NULL
 				 * row, we know that all the rest will be distinct, so skip
 				 * subsequent distinctness tests.
@@ -1021,16 +1020,17 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 		assert(regKey == (regStat4 + 2));
 		Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
 		int j, k, regKeyStat;
-		regKeyStat = sqlite3GetTempRange(pParse, pPk->nKeyCol);
-		for (j = 0; j < pPk->nKeyCol; j++) {
+		int nPkColumn = (int)index_column_count(pPk);
+		regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
+		for (j = 0; j < nPkColumn; j++) {
 			k = sqlite3ColumnOfIndex(pIdx, pPk->aiColumn[j]);
 			assert(k >= 0 && k < pTab->nCol);
 			sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
 			VdbeComment((v, "%s", pTab->aCol[pPk->aiColumn[j]].zName));
 		}
 		sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
-				  pPk->nKeyCol, regKey);
-		sqlite3ReleaseTempRange(pParse, regKeyStat, pPk->nKeyCol);
+				  nPkColumn, regKey);
+		sqlite3ReleaseTempRange(pParse, regKeyStat, nPkColumn);
 
 		assert(regChng == (regStat4 + 1));
 		sqlite3VdbeAddOp4(v, OP_Function0, 1, regStat4, regTemp,
@@ -1054,11 +1054,11 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 		int regDLt = regStat1 + 2;
 		int regSample = regStat1 + 3;
 		int regCol = regStat1 + 4;
-		int regSampleKey = regCol + nCol;
+		int regSampleKey = regCol + nColTest;
 		int addrNext;
 		int addrIsNull;
 
-		pParse->nMem = MAX(pParse->nMem, regCol + nCol);
+		pParse->nMem = MAX(pParse->nMem, regCol + nColTest);
 
 		addrNext = sqlite3VdbeCurrentAddr(v);
 		callStatGet(v, regStat4, STAT_GET_KEY, regSampleKey);
@@ -1074,12 +1074,12 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 		 * be taken
 		 */
 		VdbeCoverageNeverTaken(v);
-		for (i = 0; i < nCol; i++) {
+		for (i = 0; i < nColTest; i++) {
 			sqlite3ExprCodeLoadIndexColumn(pParse, pIdx,
 									 iTabCur, i,
 									 regCol + i);
 		}
-		sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nCol,
+		sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nColTest,
 					regSample);
 		sqlite3VdbeAddOp3(v, OP_MakeRecord, regTabname, 6,
 					regTemp);
@@ -1311,7 +1311,7 @@ analysisLoader(void *pData, int argc, char **argv, char **NotUsed)
 
 	if (pIndex) {
 		tRowcnt *aiRowEst = 0;
-		int nCol = pIndex->nKeyCol + 1;
+		int nCol = index_column_count(pIndex) + 1;
 		/* Index.aiRowEst may already be set here if there are duplicate
 		 * _sql_stat1 entries for this index. In that case just clobber
 		 * the old data with the new instead of allocating a new array.
@@ -1392,8 +1392,9 @@ initAvgEq(Index * pIdx)
 			tRowcnt nRow;	/* Number of rows in index */
 			i64 nSum100 = 0;	/* Number of terms contributing to sumEq */
 			i64 nDist100;	/* Number of distinct values in index */
+			int nColumn = index_column_count(pIdx);
 
-			if (!pIdx->aiRowEst || iCol >= pIdx->nKeyCol
+			if (!pIdx->aiRowEst || iCol >= nColumn
 			    || pIdx->aiRowEst[iCol + 1] == 0) {
 				nRow = pFinal->anLt[iCol];
 				nDist100 = (i64) 100 *pFinal->anDLt[iCol];
@@ -1512,9 +1513,7 @@ loadStatTbl(sqlite3 * db,	/* Database handle */
 		if (pIdx == 0 || pIdx->nSample)
 			continue;
 
-		nIdxCol = IsPrimaryKeyIndex(pIdx) ?
-			  pIdx->nKeyCol : pIdx->nColumn;
-
+		nIdxCol = index_column_count(pIdx);
 		pIdx->nSampleCol = nIdxCol;
 		nByte = sizeof(IndexSample) * nSample;
 		nByte += sizeof(tRowcnt) * nIdxCol * 3 * nSample;
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 9f45c6224..f2011163c 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -48,6 +48,7 @@
 #include "tarantoolInt.h"
 #include "box/session.h"
 #include "box/identifier.h"
+#include "box/schema.h"
 
 /*
  * This routine is called after a single SQL statement has been
@@ -333,8 +334,6 @@ freeIndex(sqlite3 * db, Index * p)
 	sqlite3ExprDelete(db, p->pPartIdxWhere);
 	sqlite3ExprListDelete(db, p->aColExpr);
 	sqlite3DbFree(db, p->zColAff);
-	if (p->isResized)
-		sqlite3DbFree(db, (void *)p->azColl);
 	sqlite3_free(p->aiRowEst);
 	sqlite3DbFree(db, p);
 }
@@ -1155,7 +1154,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
 		 * collation type was added. Correct this if it is the case.
 		 */
 		for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
-			assert(pIdx->nKeyCol == 1);
+			assert(pIdx->nColumn == 1);
 			if (pIdx->aiColumn[0] == i) {
 				pIdx->azColl[0] = p->aCol[i].zColl;
 			}
@@ -1464,18 +1463,16 @@ convertToWithoutRowidTable(Parse * pParse, Table * pTab)
 		 * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY KEY(a,b,c,d)".  Later
 		 * code assumes the PRIMARY KEY contains no repeated columns.
 		 */
-		for (i = j = 1; i < pPk->nKeyCol; i++) {
+		for (i = j = 1; i < pPk->nColumn; i++) {
 			if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
 				pPk->nColumn--;
 			} else {
 				pPk->aiColumn[j++] = pPk->aiColumn[i];
 			}
 		}
-		pPk->nKeyCol = j;
+		pPk->nColumn = j;
 	}
 	assert(pPk != 0);
-	if (!db->init.imposterTable)
-		pPk->uniqNotNull = 1;
 }
 
 /*
@@ -2685,7 +2682,7 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 
 	/* Open the sorter cursor if we are to use one. */
 	iSorter = pParse->nTab++;
-	sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nKeyCol,
+	sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
 			  (char *)
 			  sqlite3KeyInfoRef(pKey), P4_KEYINFO);
 
@@ -2697,7 +2694,7 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 	VdbeCoverage(v);
 	regRecord = sqlite3GetTempReg(pParse);
 
-	sqlite3GenerateIndexKey(pParse, pIndex, iTab, regRecord, 0,
+	sqlite3GenerateIndexKey(pParse, pIndex, iTab, regRecord,
 				&iPartIdxLabel, 0, 0);
 	sqlite3VdbeAddOp2(v, OP_SorterInsert, iSorter, regRecord);
 	sqlite3ResolvePartIdxLabel(pParse, iPartIdxLabel);
@@ -2719,7 +2716,7 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 		sqlite3VdbeGoto(v, j2);
 		addr2 = sqlite3VdbeCurrentAddr(v);
 		sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
-				     regRecord, pIndex->nKeyCol);
+				     regRecord, pIndex->nColumn);
 		VdbeCoverage(v);
 		sqlite3UniqueConstraint(pParse, ON_CONFLICT_ACTION_ABORT,
 					pIndex);
@@ -2773,7 +2770,6 @@ sqlite3AllocateIndexObject(sqlite3 * db,	/* Database connection */
 		pExtra += sizeof(i16) * nCol;
 		p->aSortOrder = (u8 *) pExtra;
 		p->nColumn = nCol;
-		p->nKeyCol = nCol;
 		*ppExtra = ((char *)p) + nByte;
 	}
 	return p;
@@ -3073,12 +3069,10 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 	memcpy(pIndex->zName, zName, nName + 1);
 	pIndex->pTable = pTab;
 	pIndex->onError = (u8) onError;
-	pIndex->uniqNotNull = onError != ON_CONFLICT_ACTION_NONE;
 	pIndex->idxType = idxType;
 	pIndex->pSchema = db->mdb.pSchema;
-	pIndex->nKeyCol = pList->nExpr;
+	pIndex->nColumn = pList->nExpr;
 	/* Tarantool have access to each column by any index */
-	pIndex->isCovering = 1;
 	if (pPIWhere) {
 		sqlite3ResolveSelfReference(pParse, pTab, NC_PartIdx, pPIWhere,
 					    0);
@@ -3114,8 +3108,6 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 			assert(j <= 0x7fff);
 			if (j < 0) {
 				j = pTab->iPKey;
-			} else if (pTab->aCol[j].notNull == 0) {
-				pIndex->uniqNotNull = 0;
 			}
 			pIndex->aiColumn[i] = (i16) j;
 		}
@@ -3181,9 +3173,9 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 			assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
 			assert(IsUniqueIndex(pIndex));
 
-			if (pIdx->nKeyCol != pIndex->nKeyCol)
+			if (pIdx->nColumn != pIndex->nColumn)
 				continue;
-			for (k = 0; k < pIdx->nKeyCol; k++) {
+			for (k = 0; k < pIdx->nColumn; k++) {
 				const char *z1;
 				const char *z2;
 				assert(pIdx->aiColumn[k] >= 0);
@@ -3194,7 +3186,7 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 				if (strcmp(z1, z2))
 					break;
 			}
-			if (k == pIdx->nKeyCol) {
+			if (k == pIdx->nColumn) {
 				if (pIdx->onError != pIndex->onError) {
 					/* This constraint creates the same index as a previous
 					 * constraint specified somewhere in the CREATE TABLE statement.
@@ -3356,7 +3348,7 @@ sqlite3DefaultRowEst(Index * pIdx)
 	/*                10,  9,  8,  7,  6 */
 	LogEst aVal[] = { 33, 32, 30, 28, 26 };
 	LogEst *a = pIdx->aiRowLogEst;
-	int nCopy = MIN(ArraySize(aVal), pIdx->nKeyCol);
+	int nCopy = MIN(ArraySize(aVal), pIdx->nColumn);
 	int i;
 
 	/* Set the first entry (number of rows in the index) to the estimated
@@ -3375,14 +3367,53 @@ sqlite3DefaultRowEst(Index * pIdx)
 	 * 6 and each subsequent value (if any) is 5.
 	 */
 	memcpy(&a[1], aVal, nCopy * sizeof(LogEst));
-	for (i = nCopy + 1; i <= pIdx->nKeyCol; i++) {
+	for (i = nCopy + 1; i <= pIdx->nColumn; i++) {
 		a[i] = 23;
 		assert(23 == sqlite3LogEst(5));
 	}
 
 	assert(0 == sqlite3LogEst(1));
 	if (IsUniqueIndex(pIdx))
-		a[pIdx->nKeyCol] = 0;
+		a[pIdx->nColumn] = 0;
+}
+
+/**
+ * Return number of columns in given index.
+ * If space is ephemeral or represents view, use internal
+ * SQL structure to fetch the value.
+ */
+inline uint32_t
+index_column_count(Index *idx)
+{
+	assert(idx != NULL);
+	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
+	struct space *space = space_by_id(space_id);
+	/* It is impossible to find an ephemeral space by id. */
+	if (space == NULL)
+		return idx->nColumn;
+
+	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
+	struct index *index = space_index(space, index_id);
+	/* Views don't feature any functional parts. */
+	if (index == NULL || index->def->key_def->part_count == 0)
+		return idx->nColumn;
+	return index->def->key_def->part_count;
+}
+
+/* Return true if given index is unique and not nullable. */
+inline 	bool
+index_is_unique_not_null(Index *idx)
+{
+	assert(idx != NULL);
+	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
+	struct space *space = space_by_id(space_id);
+	assert(space != NULL);
+
+	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
+	struct index *index = space_index(space, index_id);
+	assert(index != NULL);
+	return (index->def->opts.is_unique &&
+		!index->def->key_def->is_nullable);
 }
 
 /*
@@ -4101,7 +4132,7 @@ sqlite3UniqueConstraint(Parse * pParse,	/* Parsing context */
 	if (pIdx->aColExpr) {
 		sqlite3XPrintf(&errMsg, "index '%q'", pIdx->zName);
 	} else {
-		for (j = 0; j < pIdx->nKeyCol; j++) {
+		for (j = 0; j < pIdx->nColumn; j++) {
 			char *zCol;
 			assert(pIdx->aiColumn[j] >= 0);
 			zCol = pTab->aCol[pIdx->aiColumn[j]].zName;
@@ -4278,7 +4309,6 @@ sqlite3KeyInfoOfIndex(Parse * pParse, sqlite3 * db, Index * pIdx)
 	int i;
 	int nCol = pIdx->nColumn;
 	int nTableCol = pIdx->pTable->nCol;
-	int nKey = pIdx->nKeyCol;
 	KeyInfo *pKey;
 
 	if (pParse && pParse->nErr)
@@ -4291,11 +4321,7 @@ sqlite3KeyInfoOfIndex(Parse * pParse, sqlite3 * db, Index * pIdx)
 	 * as wide as the table itself.  Otherwize, not enough slots
 	 * for row parser cache are allocated in VdbeCursor object.
 	 */
-	if (pIdx->uniqNotNull) {
-		pKey = sqlite3KeyInfoAlloc(db, nKey, nTableCol - nKey);
-	} else {
-		pKey = sqlite3KeyInfoAlloc(db, nCol, nTableCol - nCol);
-	}
+	pKey = sqlite3KeyInfoAlloc(db, nCol, nTableCol - nCol);
 	if (pKey) {
 		assert(sqlite3KeyInfoIsWriteable(pKey));
 		for (i = 0; i < nCol; i++) {
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 832af89fd..9cdc4629d 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -413,7 +413,7 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 		} else {
 			pPk = sqlite3PrimaryKeyIndex(pTab);
 			assert(pPk != 0);
-			nPk = pPk->nKeyCol;
+			nPk = index_column_count(pPk);
 			iPk = pParse->nMem + 1;
 			pParse->nMem += nPk;
 			iEphCur = pParse->nTab++;
@@ -874,10 +874,9 @@ sqlite3GenerateRowIndexDelete(Parse * pParse,	/* Parsing and code generating con
 	pPk = sqlite3PrimaryKeyIndex(pTab);
 	/* In Tarantool it is enough to delete row just from pk */
 	VdbeModuleComment((v, "GenRowIdxDel for %s", pPk->zName));
-	r1 = sqlite3GenerateIndexKey(pParse, pPk, iDataCur, 0, 1,
-				     &iPartIdxLabel, NULL, r1);
-	sqlite3VdbeAddOp3(v, OP_IdxDelete, iIdxCur, r1,
-			  pPk->uniqNotNull ? pPk->nKeyCol : pPk->nColumn);
+	r1 = sqlite3GenerateIndexKey(pParse, pPk, iDataCur, 0, &iPartIdxLabel,
+				     NULL, r1);
+	sqlite3VdbeAddOp3(v, OP_IdxDelete, iIdxCur, r1, index_column_count(pPk));
 	sqlite3ResolvePartIdxLabel(pParse, iPartIdxLabel);
 }
 
@@ -917,7 +916,6 @@ sqlite3GenerateIndexKey(Parse * pParse,	/* Parsing context */
 			Index * pIdx,	/* The index for which to generate a key */
 			int iDataCur,	/* Cursor number from which to take column data */
 			int regOut,	/* Put the new key into this register if not 0 */
-			int prefixOnly,	/* Compute only a unique prefix of the key */
 			int *piPartIdxLabel,	/* OUT: Jump to this label to skip partial index */
 			Index * pPrior,	/* Previously generated index key */
 			int regPrior)	/* Register holding previous generated key */
@@ -939,8 +937,7 @@ sqlite3GenerateIndexKey(Parse * pParse,	/* Parsing context */
 			*piPartIdxLabel = 0;
 		}
 	}
-	nCol = (prefixOnly
-		&& pIdx->uniqNotNull) ? pIdx->nKeyCol : pIdx->nColumn;
+	nCol = index_column_count(pIdx);
 	regBase = sqlite3GetTempRange(pParse, nCol);
 	if (pPrior && (regBase != regPrior || pPrior->pPartIdxWhere))
 		pPrior = 0;
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index b69a176cb..a6731ce2a 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2532,7 +2532,7 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
 				if (pIdx->nColumn >= BMS - 1)
 					continue;
 				if (mustBeUnique) {
-					if (pIdx->nKeyCol > nExpr
+					if (pIdx->nColumn > nExpr
 					    || (pIdx->nColumn > nExpr
 					    && !IsUniqueIndex(pIdx))) {
 							continue;	/* This index is not unique over the IN RHS columns */
@@ -3250,7 +3250,7 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
 		struct Index *pk = sqlite3PrimaryKeyIndex(tab);
 		assert(pk);
 
-		if (pk->nKeyCol == 1
+		if (pk->nColumn == 1
 		    && tab->aCol[pk->aiColumn[0]].affinity == 'D'
 		    && pk->aiColumn[0] < nVector) {
 			int reg_pk = rLhs + pk->aiColumn[0];
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 088d84e27..c7917c595 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -255,7 +255,8 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
 	}
 
 	for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
-		if (pIdx->nKeyCol == nCol && IsUniqueIndex(pIdx)
+		int nIdxCol = index_column_count(pIdx);
+		if (nIdxCol == nCol && IsUniqueIndex(pIdx)
 		    && pIdx->pPartIdxWhere == 0) {
 			/* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the right number
 			 * of columns. If each indexed column corresponds to a foreign key
@@ -619,7 +620,7 @@ fkScanChildren(Parse * pParse,	/* Parse context */
 	Vdbe *v = sqlite3GetVdbe(pParse);
 
 	assert(pIdx == 0 || pIdx->pTable == pTab);
-	assert(pIdx == 0 || pIdx->nKeyCol == pFKey->nCol);
+	assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
 	assert(pIdx != 0);
 
 	if (nIncr < 0) {
@@ -668,7 +669,7 @@ fkScanChildren(Parse * pParse,	/* Parse context */
 		Expr *pEq, *pAll = 0;
 		Index *pPk = sqlite3PrimaryKeyIndex(pTab);
 		assert(pIdx != 0);
-		for (i = 0; i < pPk->nKeyCol; i++) {
+		for (i = 0; i < (int)index_column_count(pPk); i++) {
 			i16 iCol = pIdx->aiColumn[i];
 			assert(iCol >= 0);
 			pLeft = exprTableRegister(pParse, pTab, regData, iCol);
@@ -1157,7 +1158,8 @@ sqlite3FkOldmask(Parse * pParse,	/* Parse context */
 			Index *pIdx = 0;
 			sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
 			if (pIdx) {
-				for (i = 0; i < pIdx->nKeyCol; i++) {
+				int nIdxCol = index_column_count(pIdx);
+				for (i = 0; i < nIdxCol; i++) {
 					assert(pIdx->aiColumn[i] >= 0);
 					mask |= COLUMN_MASK(pIdx->aiColumn[i]);
 				}
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index b20a47970..0aa0905f1 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -86,14 +86,15 @@ sqlite3IndexAffinityStr(sqlite3 * db, Index * pIdx)
 		 * up.
 		 */
 		int n;
+		int nColumn = index_column_count(pIdx);
 		Table *pTab = pIdx->pTable;
 		pIdx->zColAff =
-		    (char *)sqlite3DbMallocRaw(0, pIdx->nColumn + 1);
+		    (char *)sqlite3DbMallocRaw(0, nColumn + 1);
 		if (!pIdx->zColAff) {
 			sqlite3OomFault(db);
 			return 0;
 		}
-		for (n = 0; n < pIdx->nColumn; n++) {
+		for (n = 0; n < nColumn; n++) {
 			i16 x = pIdx->aiColumn[n];
 			if (x >= 0) {
 				pIdx->zColAff[n] = pTab->aCol[x].affinity;
@@ -631,7 +632,7 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 		     pIdx = pIdx->pNext, i++) {
 			assert(pIdx);
 			aRegIdx[i] = ++pParse->nMem;
-			pParse->nMem += pIdx->nColumn;
+			pParse->nMem += index_column_count(pIdx);
 		}
 	}
 
@@ -1093,7 +1094,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 	nCol = pTab->nCol;
 
 	pPk = sqlite3PrimaryKeyIndex(pTab);
-	nPkField = pPk->nKeyCol;
+	nPkField = index_column_count(pPk);
 
 	/* Record that this module has started */
 	VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
@@ -1247,7 +1248,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 		 * the insert or update.  Store that record in the aRegIdx[ix] register
 		 */
 		regIdx = aRegIdx[ix] + 1;
-		for (i = 0; i < pIdx->nColumn; i++) {
+		int nIdxCol = (int)index_column_count(pIdx);
+		for (i = 0; i < nIdxCol; i++) {
 			int iField = pIdx->aiColumn[i];
 			int x;
 			if (iField == XN_EXPR) {
@@ -1275,7 +1277,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 		if (IsPrimaryKeyIndex(pIdx)) {
 			/* If PK is marked as INTEGER, use it as strict type,
 			 * not as affinity. Emit code for type checking */
-			if (pIdx->nKeyCol == 1) {
+			if (nIdxCol == 1) {
 				reg_pk = regNewData + 1 + pIdx->aiColumn[0];
 				if (pTab->zColAff[pIdx->aiColumn[0]] == 'D') {
 					int skip_if_null = sqlite3VdbeMakeLabel(v);
@@ -1300,7 +1302,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 			/* kyukhin: for Tarantool, this should be evaluated to NOP.  */
 			if (IsPrimaryKeyIndex(pIdx) || uniqueByteCodeNeeded) {
 				sqlite3VdbeAddOp3(v, OP_MakeRecord, regIdx,
-						  pIdx->nColumn, aRegIdx[ix]);
+						  nIdxCol, aRegIdx[ix]);
 				VdbeComment((v, "for %s", pIdx->zName));
 			}
 		}
@@ -1361,8 +1363,9 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 					  addrUniqueOk);
 
 		if (uniqueByteCodeNeeded) {
-			sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur, addrUniqueOk,
-					     regIdx, pIdx->nKeyCol);
+			sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
+					     addrUniqueOk, regIdx,
+					     index_column_count(pIdx));
 		}
 		VdbeCoverage(v);
 
@@ -1372,11 +1375,12 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 								 nPkField);
 		if (isUpdate || onError == ON_CONFLICT_ACTION_REPLACE) {
 			int x;
+			int nPkCol = index_column_count(pPk);
 			/* Extract the PRIMARY KEY from the end of the index entry and
 			 * store it in registers regR..regR+nPk-1
 			 */
 			if (pIdx != pPk) {
-				for (i = 0; i < pPk->nKeyCol; i++) {
+				for (i = 0; i < nPkCol; i++) {
 					assert(pPk->aiColumn[i] >= 0);
 					x = sqlite3ColumnOfIndex(pIdx,
 								 pPk->aiColumn[i]);
@@ -1395,18 +1399,18 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 				 * KEY values of this row before the update.
 				 */
 				int addrJump =
-					sqlite3VdbeCurrentAddr(v) + pPk->nKeyCol;
+					sqlite3VdbeCurrentAddr(v) + nPkCol;
 				int op = OP_Ne;
 				int regCmp = (IsPrimaryKeyIndex(pIdx) ?
 					      regIdx : regR);
 
-				for (i = 0; i < pPk->nKeyCol; i++) {
+				for (i = 0; i < nPkCol; i++) {
 					char *p4 = (char *)
 						sqlite3LocateCollSeq(pParse, db,
 								     pPk->azColl[i]);
 					x = pPk->aiColumn[i];
 					assert(x >= 0);
-					if (i == (pPk->nKeyCol - 1)) {
+					if (i == (nPkCol - 1)) {
 						addrJump = addrUniqueOk;
 						op = OP_Eq;
 					}
@@ -1531,8 +1535,7 @@ sqlite3CompleteInsertion(Parse * pParse,	/* The parser context */
 	}
 
 	sqlite3VdbeAddOp4Int(v, opcode, iIdxCur, aRegIdx[0],
-			     aRegIdx[0] + 1,
-			     pIdx->uniqNotNull ? pIdx->nKeyCol : pIdx->nColumn);
+			     aRegIdx[0] + 1, index_column_count(pIdx));
 	sqlite3VdbeChangeP5(v, pik_flags);
 }
 
@@ -1663,16 +1666,18 @@ int sqlite3_xferopt_count;
 static int
 xferCompatibleIndex(Index * pDest, Index * pSrc)
 {
-	int i;
+	uint32_t i;
 	assert(pDest && pSrc);
 	assert(pDest->pTable != pSrc->pTable);
-	if (pDest->nKeyCol != pSrc->nKeyCol) {
+	uint32_t nDestCol = index_column_count(pDest);
+	uint32_t nSrcCol = index_column_count(pSrc);
+	if (nDestCol != nSrcCol) {
 		return 0;	/* Different number of columns */
 	}
 	if (pDest->onError != pSrc->onError) {
 		return 0;	/* Different conflict resolution strategies */
 	}
-	for (i = 0; i < pSrc->nKeyCol; i++) {
+	for (i = 0; i < nSrcCol; i++) {
 		if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
 			return 0;	/* Different columns indexed */
 		}
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index 03ae926e0..fae3fbceb 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -429,15 +429,14 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
 						 * version with more rows and
 						 * columns)
 						 */
-						mx = pIdx->nColumn;
 						pParse->nMem = 6;
 					} else {
 						/* PRAGMA index_info (legacy
 						 * version)
 						 */
-						mx = pIdx->nKeyCol;
 						pParse->nMem = 3;
 					}
+					mx = index_column_count(pIdx);
 					sqlite3CodeVerifySchema(pParse);
 					assert(pParse->nMem <=
 					       pPragma->nPragCName);
@@ -464,8 +463,7 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
 									     azColl
 									     [i],
 									     i <
-									     pIdx->
-									     nKeyCol);
+									     mx);
 						}
 						sqlite3VdbeAddOp2(v,
 								  OP_ResultRow,
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index aebb61029..267d1883e 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -1651,16 +1651,12 @@ struct Index {
 	ExprList *aColExpr;	/* Column expressions */
 	int tnum;		/* DB Page containing root of this index */
 	LogEst szIdxRow;	/* Estimated average row size in bytes */
-	u16 nKeyCol;		/* Number of columns forming the key */
 	u16 nColumn;		/* Number of columns stored in the index */
 	u8 onError;		/* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
 				 * or _NONE
 				 */
 	unsigned idxType:2;	/* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
 	unsigned bUnordered:1;	/* Use this index for == or IN queries only */
-	unsigned uniqNotNull:1;	/* True if UNIQUE and NOT NULL for all columns */
-	unsigned isResized:1;	/* True if resizeIndexObject() has been called */
-	unsigned isCovering:1;	/* True if this is a covering index */
 	unsigned noSkipScan:1;	/* Do not try to use skip-scan if true */
 	int nSample;		/* Number of elements in aSample[] */
 	int nSampleCol;		/* Size of IndexSample.anEq[] and so on */
@@ -3258,8 +3254,7 @@ int sqlite3ExprNeedsNoAffinityChange(const Expr *, char);
 void sqlite3GenerateRowDelete(Parse *, Table *, Trigger *, int, int, int, i16,
 			      u8, enum on_conflict_action, u8, int);
 void sqlite3GenerateRowIndexDelete(Parse *, Table *, int, int);
-int sqlite3GenerateIndexKey(Parse *, Index *, int, int, int, int *, Index *,
-			    int);
+int sqlite3GenerateIndexKey(Parse *, Index *, int, int, int *, Index *, int);
 void sqlite3ResolvePartIdxLabel(Parse *, int);
 void sqlite3GenerateConstraintChecks(Parse *, Table *, int *, int, int, int,
 				     int, u8, u8, int, int *, int *);
@@ -3483,6 +3478,10 @@ int sqlite3FindDbName(const char *);
 int sqlite3AnalysisLoad(sqlite3 *);
 void sqlite3DeleteIndexSamples(sqlite3 *, Index *);
 void sqlite3DefaultRowEst(Index *);
+uint32_t
+index_column_count(Index *);
+bool
+index_is_unique_not_null(Index *);
 void sqlite3RegisterLikeFunctions(sqlite3 *, int);
 int sqlite3IsLikeFunction(sqlite3 *, Expr *, int *, char *);
 void sqlite3SchemaClear(void *);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 85d18cbde..7bb845a39 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -284,16 +284,17 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	 */
 	for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
 		int reg;
+		int nIdxCol = index_column_count(pIdx);
 		if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
 			reg = ++pParse->nMem;
-			pParse->nMem += pIdx->nColumn;
+			pParse->nMem += nIdxCol;
 		} else {
 			reg = 0;
-			for (i = 0; i < pIdx->nKeyCol; i++) {
+			for (i = 0; i < nIdxCol; i++) {
 				i16 iIdxCol = pIdx->aiColumn[i];
 				if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
 					reg = ++pParse->nMem;
-					pParse->nMem += pIdx->nColumn;
+					pParse->nMem += nIdxCol;
 					break;
 				}
 			}
@@ -359,7 +360,7 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 		nPk = nKey;
 	} else {
 		assert(pPk != 0);
-		nPk = pPk->nKeyCol;
+		nPk = index_column_count(pPk);
 	}
 	iPk = pParse->nMem + 1;
 	pParse->nMem += nPk;
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 6a68b6e3a..5dd85b8a6 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1104,7 +1104,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
 			Index *pIdx = p->pIdx;	/* Index being probed */
 			int nByte;	/* Bytes of space to allocate */
 			int i;	/* Counter variable */
-			int nCol = pIdx->nColumn;	/* Number of index columns including rowid */
+			int nCol = index_column_count(pIdx);
 
 			nByte =
 			    sizeof(Mem) * nCol + ROUND8(sizeof(UnpackedRecord));
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 2f1c627e5..8d1e3a393 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -491,7 +491,7 @@ indexColumnNotNull(Index * pIdx, int iCol)
 {
 	int j;
 	assert(pIdx != 0);
-	assert(iCol >= 0 && iCol < pIdx->nColumn);
+	assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
 	j = pIdx->aiColumn[iCol];
 	if (j >= 0) {
 		return !table_column_is_nullable(pIdx->pTable, j);
@@ -557,7 +557,7 @@ isDistinctRedundant(Parse * pParse,		/* Parsing context */
 	for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
 		if (!IsUniqueIndex(pIdx))
 			continue;
-		for (i = 0; i < pIdx->nKeyCol; i++) {
+		for (i = 0; i < (int)index_column_count(pIdx); i++) {
 			if (0 ==
 			    sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
 						 WO_EQ, pIdx)) {
@@ -568,7 +568,7 @@ isDistinctRedundant(Parse * pParse,		/* Parsing context */
 					break;
 			}
 		}
-		if (i == pIdx->nKeyCol) {
+		if (i == (int)index_column_count(pIdx)) {
 			/* This index implies that the DISTINCT qualifier is redundant. */
 			return 1;
 		}
@@ -841,7 +841,7 @@ constructAutomaticIndex(Parse * pParse,			/* The parsing context */
 	regRecord = sqlite3GetTempReg(pParse);
 	regBase =
 	    sqlite3GenerateIndexKey(pParse, pIdx, pLevel->iTabCur, regRecord, 0,
-				    0, 0, 0);
+				    0, 0);
 	sqlite3VdbeAddOp2(v, OP_IdxInsert, pLevel->iIdxCur, regRecord);
 	sqlite3VdbeChangeP5(v, OPFLAG_USESEEKRESULT);
 	if (pPartial)
@@ -1111,7 +1111,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
 char
 sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
 {
-	assert(iCol >= 0 && iCol < pIdx->nColumn);
+	assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
 	if (!pIdx->zColAff) {
 		if (sqlite3IndexAffinityStr(db, pIdx) == 0)
 			return SQLITE_AFF_BLOB;
@@ -1489,7 +1489,7 @@ whereEqualScanEst(Parse * pParse,	/* Parsing & code generating context */
 	int bOk;
 
 	assert(nEq >= 1);
-	assert(nEq <= p->nColumn);
+	assert(nEq <= (int)index_column_count(p));
 	assert(p->aSample != 0);
 	assert(p->nSample > 0);
 	assert(pBuilder->nRecValid < nEq);
@@ -2208,7 +2208,7 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
 	int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
 	int i;
 
-	nCmp = MIN(nCmp, (pIdx->nColumn - nEq));
+	nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
 	for (i = 1; i < nCmp; i++) {
 		/* Test if comparison i of pTerm is compatible with column (i+nEq)
 		 * of the index. If not, exit the loop.
@@ -2299,6 +2299,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
 	LogEst rSize;		/* Number of rows in the table */
 	LogEst rLogSize;	/* Logarithm of table size */
 	WhereTerm *pTop = 0, *pBtm = 0;	/* Top and bottom range constraints */
+	uint32_t nProbeCol = index_column_count(pProbe);
 
 	pNew = pBuilder->pNew;
 	if (db->mallocFailed)
@@ -2318,7 +2319,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
 	if (pProbe->bUnordered)
 		opMask &= ~(WO_GT | WO_GE | WO_LT | WO_LE);
 
-	assert(pNew->nEq < pProbe->nColumn);
+	assert(pNew->nEq < nProbeCol);
 
 	saved_nEq = pNew->nEq;
 	saved_nBtm = pNew->nBtm;
@@ -2415,9 +2416,10 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
 			pNew->wsFlags |= WHERE_COLUMN_EQ;
 			assert(saved_nEq == pNew->nEq);
 			if ((iCol > 0 && nInMul == 0
-				&& saved_nEq == pProbe->nKeyCol - 1)
+				&& saved_nEq == nProbeCol - 1)
 			    ) {
-				if (iCol >= 0 && pProbe->uniqNotNull == 0) {
+				if (iCol >= 0 &&
+				    !index_is_unique_not_null(pProbe)) {
 					pNew->wsFlags |= WHERE_UNQ_WANTED;
 				} else {
 					pNew->wsFlags |= WHERE_ONEROW;
@@ -2565,7 +2567,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
 		}
 
 		if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0
-		    && pNew->nEq < pProbe->nColumn) {
+		    && pNew->nEq < nProbeCol) {
 			whereLoopAddBtreeIndex(pBuilder, pSrc, pProbe,
 					       nInMul + nIn);
 		}
@@ -2593,8 +2595,11 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
 	 * more expensive.
 	 */
 	assert(42 == sqlite3LogEst(18));
-	if (saved_nEq == saved_nSkip && saved_nEq + 1 < pProbe->nKeyCol && pProbe->noSkipScan == 0 && pProbe->aiRowLogEst[saved_nEq + 1] >= 42	/* TUNING: Minimum for skip-scan */
-	    && (rc = whereLoopResize(db, pNew, pNew->nLTerm + 1)) == SQLITE_OK) {
+	if (saved_nEq == saved_nSkip && saved_nEq + 1 < nProbeCol &&
+	    pProbe->noSkipScan == 0 &&
+	    /* TUNING: Minimum for skip-scan */
+	    pProbe->aiRowLogEst[saved_nEq + 1] >= 42 &&
+	    (rc = whereLoopResize(db, pNew, pNew->nLTerm + 1)) == SQLITE_OK) {
 		LogEst nIter;
 		pNew->nEq++;
 		pNew->nSkip++;
@@ -2635,6 +2640,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
 	ExprList *pOB;
 	ExprList *aColExpr;
 	int ii, jj;
+	int nIdxCol = index_column_count(pIndex);
 
 	if (pIndex->bUnordered)
 		return 0;
@@ -2645,12 +2651,12 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
 		if (pExpr->op == TK_COLUMN && pExpr->iTable == iCursor) {
 			if (pExpr->iColumn < 0)
 				return 1;
-			for (jj = 0; jj < pIndex->nKeyCol; jj++) {
+			for (jj = 0; jj < nIdxCol; jj++) {
 				if (pExpr->iColumn == pIndex->aiColumn[jj])
 					return 1;
 			}
 		} else if ((aColExpr = pIndex->aColExpr) != 0) {
-			for (jj = 0; jj < pIndex->nKeyCol; jj++) {
+			for (jj = 0; jj < nIdxCol; jj++) {
 				if (pIndex->aiColumn[jj] != XN_EXPR)
 					continue;
 				if (sqlite3ExprCompare
@@ -2664,27 +2670,6 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
 	return 0;
 }
 
-/*
- * Return a bitmask where 1s indicate that the corresponding column of
- * the table is used by an index.  Only the first 63 columns are considered.
- */
-static Bitmask
-columnsInIndex(Index * pIdx)
-{
-	Bitmask m = 0;
-	int j;
-	for (j = pIdx->nColumn - 1; j >= 0; j--) {
-		int x = pIdx->aiColumn[j];
-		if (x >= 0) {
-			testcase(x == BMS - 1);
-			testcase(x == BMS - 2);
-			if (x < BMS - 1)
-				m |= MASKBIT(x);
-		}
-	}
-	return m;
-}
-
 /* Check to see if a partial index with pPartIndexWhere can be used
  * in the current query.  Return true if it can be and false if not.
  */
@@ -2787,7 +2772,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
 		 */
 		Index *pFirst;	/* First of real indices on the table */
 		memset(&sPk, 0, sizeof(Index));
-		sPk.nKeyCol = 1;
 		sPk.nColumn = 1;
 		sPk.aiColumn = &aiColumnPk;
 		sPk.aiRowLogEst = aiRowEstPk;
@@ -2904,18 +2888,7 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
 			if (rc)
 				break;
 		} else {
-			Bitmask m;
-			if (pProbe->isCovering) {
-				pNew->wsFlags = WHERE_IDX_ONLY | WHERE_INDEXED;
-				m = 0;
-			} else {
-				m = pSrc->colUsed & ~columnsInIndex(pProbe);
-				pNew->wsFlags =
-				    (m ==
-				     0) ? (WHERE_IDX_ONLY | WHERE_INDEXED) :
-				    WHERE_INDEXED;
-			}
-
+			pNew->wsFlags = WHERE_IDX_ONLY | WHERE_INDEXED;
 			/* Full scan via index */
 			pNew->iSortIdx = b ? iSortIdx : 0;
 
@@ -3164,7 +3137,6 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 	u8 distinctColumns;	/* True if the loop has UNIQUE NOT NULL columns */
 	u8 isMatch;		/* iColumn matches a term of the ORDER BY clause */
 	u16 eqOpMask;		/* Allowed equality operators */
-	u16 nKeyCol;		/* Number of key columns in pIndex */
 	u16 nColumn;		/* Total number of ordered columns in the index */
 	u16 nOrderBy;		/* Number terms in the ORDER BY clause */
 	int iLoop;		/* Index of WhereLoop in pPath being processed */
@@ -3285,14 +3257,12 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 		if ((pLoop->wsFlags & WHERE_ONEROW) == 0) {
 			if (pLoop->wsFlags & WHERE_IPK) {
 				pIndex = 0;
-				nKeyCol = 0;
 				nColumn = 1;
 			} else if ((pIndex = pLoop->pIndex) == 0
 				   || pIndex->bUnordered) {
 				return 0;
 			} else {
-				nKeyCol = pIndex->nKeyCol;
-				nColumn = pIndex->nColumn;
+				nColumn = index_column_count(pIndex);
 				isOrderDistinct = IsUniqueIndex(pIndex);
 			}
 
@@ -3436,7 +3406,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 					obSat |= MASKBIT(i);
 				} else {
 					/* No match found */
-					if (j == 0 || j < nKeyCol) {
+					if (j == 0 || j < nColumn) {
 						testcase(isOrderDistinct != 0);
 						isOrderDistinct = 0;
 					}
@@ -4068,14 +4038,16 @@ whereShortCut(WhereLoopBuilder * pBuilder)
 	} else {
 		for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
 			int opMask;
+			int nIdxCol = index_column_count(pIdx);
 			assert(pLoop->aLTermSpace == pLoop->aLTerm);
 			if (!IsUniqueIndex(pIdx)
 			    || pIdx->pPartIdxWhere != 0
-			    || pIdx->nKeyCol > ArraySize(pLoop->aLTermSpace)
+			    || nIdxCol > ArraySize(pLoop->aLTermSpace)
 			    )
 				continue;
-			opMask = pIdx->uniqNotNull ? (WO_EQ | WO_IS) : WO_EQ;
-			for (j = 0; j < pIdx->nKeyCol; j++) {
+			opMask = index_is_unique_not_null(pIdx) ?
+				 (WO_EQ | WO_IS) : WO_EQ;
+			for (j = 0; j < nIdxCol; j++) {
 				pTerm =
 				    sqlite3WhereFindTerm(pWC, iCur, j, 0,
 							 opMask, pIdx);
@@ -4084,14 +4056,10 @@ whereShortCut(WhereLoopBuilder * pBuilder)
 				testcase(pTerm->eOperator & WO_IS);
 				pLoop->aLTerm[j] = pTerm;
 			}
-			if (j != pIdx->nKeyCol)
+			if (j != nIdxCol)
 				continue;
-			pLoop->wsFlags =
-			    WHERE_COLUMN_EQ | WHERE_ONEROW | WHERE_INDEXED;
-			if (pIdx->isCovering
-			    || (pItem->colUsed & ~columnsInIndex(pIdx)) == 0) {
-				pLoop->wsFlags |= WHERE_IDX_ONLY;
-			}
+			pLoop->wsFlags = WHERE_COLUMN_EQ | WHERE_ONEROW |
+					 WHERE_INDEXED | WHERE_IDX_ONLY;
 			pLoop->nLTerm = j;
 			pLoop->nEq = j;
 			pLoop->pIndex = pIdx;
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index 65bcba4d0..568ef99a2 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -1256,8 +1256,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		assert(pWInfo->pOrderBy == 0
 		       || pWInfo->pOrderBy->nExpr == 1
 		       || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
+		int nIdxCol = index_column_count(pIdx);
 		if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
-		    && pWInfo->nOBSat > 0 && (pIdx->nKeyCol > nEq)) {
+		    && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
 			j = pIdx->aiColumn[nEq];
 			/* Allow seek for column with `NOT NULL` == false attribute.
 			 * If a column may contain NULL-s, the comparator installed
@@ -1328,10 +1329,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		 * a forward order scan on a descending index, interchange the
 		 * start and end terms (pRangeStart and pRangeEnd).
 		 */
-		if ((nEq < pIdx->nKeyCol
-		     && bRev == (pIdx->aSortOrder[nEq] == SQLITE_SO_ASC))
-		    || (bRev && pIdx->nKeyCol == nEq)
-		    ) {
+		if ((nEq < nIdxCol &&
+		     bRev == (pIdx->aSortOrder[nEq] == SQLITE_SO_ASC)) ||
+		    (bRev && nIdxCol == nEq)) {
 			SWAP(pRangeEnd, pRangeStart);
 			SWAP(bSeekPastNull, bStopAtNull);
 			SWAP(nBtm, nTop);
@@ -1395,7 +1395,8 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		}
 		struct Index *pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
 		assert(pk);
-		if (pk->nKeyCol == 1
+		int nPkCol = index_column_count(pk);
+		if (nPkCol == 1
 		    && pIdx->pTable->aCol[pk->aiColumn[0]].affinity == 'D') {
 			/* Right now INTEGER PRIMARY KEY is the only option to
 			 * get Tarantool's INTEGER column type. Need special handling
@@ -1513,17 +1514,18 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 			/* pIdx is a covering index.  No need to access the main table. */
 		}  else if (iCur != iIdxCur) {
 			Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-			int iKeyReg = sqlite3GetTempRange(pParse, pPk->nKeyCol);
-			for (j = 0; j < pPk->nKeyCol; j++) {
+			int nPkCol = index_column_count(pPk);
+			int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
+			for (j = 0; j < nPkCol; j++) {
 				k = sqlite3ColumnOfIndex(pIdx,
 							 pPk->aiColumn[j]);
 				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
 						  iKeyReg + j);
 			}
 			sqlite3VdbeAddOp4Int(v, OP_NotFound, iCur, addrCont,
-					     iKeyReg, pPk->nKeyCol);
+					     iKeyReg, nPkCol);
 			VdbeCoverage(v);
-			sqlite3ReleaseTempRange(pParse, iKeyReg, pPk->nKeyCol);
+			sqlite3ReleaseTempRange(pParse, iKeyReg, nPkCol);
 		}
 
 		/* Record the instruction used to terminate the loop. */
@@ -1621,9 +1623,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 		 */
 		if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
 			Index *pPk = sqlite3PrimaryKeyIndex(pTab);
+			int nPkCol = index_column_count(pPk);
 			regRowset = pParse->nTab++;
 			sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
-					  regRowset, pPk->nKeyCol);
+					  regRowset, nPkCol);
 			sqlite3VdbeSetP4KeyInfo(pParse, pPk);
 			regPk = ++pParse->nMem;
 		}
@@ -1724,7 +1727,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 						int iSet =
 						    ((ii == pOrWc->nTerm - 1) ? -1 : ii);
 						Index *pPk = sqlite3PrimaryKeyIndex (pTab);
-						int nPk = pPk->nKeyCol;
+						int nPk = index_column_count(pPk);
 						int iPk;
 
 						/* Read the PK into an array of temp registers. */
diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
index dbfaa0959..9842c8fdd 100644
--- a/src/box/sql/whereexpr.c
+++ b/src/box/sql/whereexpr.c
@@ -933,7 +933,7 @@ exprMightBeIndexed(SrcList * pFrom,	/* The FROM clause */
 	for (pIdx = pFrom->a[i].pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
 		if (pIdx->aColExpr == 0)
 			continue;
-		for (i = 0; i < pIdx->nKeyCol; i++) {
+		for (i = 0; i < pIdx->nColumn; i++) {
 			if (pIdx->aiColumn[i] != XN_EXPR)
 				continue;
 			if (sqlite3ExprCompare
-- 
2.15.1




More information about the Tarantool-patches mailing list