[tarantool-patches] [PATCH 2/2] sql: replace KeyInfo with key_def

Kirill Yukhin kyukhin at tarantool.org
Tue May 8 10:56:08 MSK 2018


KeyInfo is a legacy struct which was heavily used in SQL
front-end. This patch replaces all its usages w/ Tarantool's
natural key description structure called key_def.

This change is a prt of data dictionary intagration effort:
Tarantool indexes don't aware of KeyInfo, that is why it was
evicted.

Legacy KeyInfo memory handling was ref-counting based and now
is replaced w/ memory duplication. This state of affairs should
be improved in future.

Part of #3235
---
 src/box/sql.c                 |  19 ++-
 src/box/sql/analyze.c         |   6 +-
 src/box/sql/build.c           |  79 +++------
 src/box/sql/delete.c          |   6 +-
 src/box/sql/expr.c            |  52 +++---
 src/box/sql/fkey.c            |   2 +-
 src/box/sql/insert.c          |  12 +-
 src/box/sql/pragma.c          |   4 +-
 src/box/sql/select.c          | 381 ++++++++++++++++++------------------------
 src/box/sql/sqliteInt.h       |  50 ++----
 src/box/sql/tarantoolInt.h    |   5 +-
 src/box/sql/update.c          |   6 +-
 src/box/sql/vdbe.c            |  80 +++++----
 src/box/sql/vdbe.h            |  34 ++--
 src/box/sql/vdbeInt.h         |  13 +-
 src/box/sql/vdbeapi.c         | 183 --------------------
 src/box/sql/vdbeaux.c         | 247 ++++++++++-----------------
 src/box/sql/vdbemem.c         |  14 +-
 src/box/sql/vdbesort.c        |  54 +++---
 src/box/sql/where.c           |  21 ++-
 src/box/sql/wherecode.c       |   2 +-
 src/box/tuple.c               |   9 +
 src/box/tuple.h               |   9 +
 test/sql-tap/index1.test.lua  |   2 +
 test/sql-tap/index4.test.lua  |   1 +
 test/sql-tap/selectA.test.lua |   1 +
 26 files changed, 496 insertions(+), 796 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index 838fcf6..42372ea 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -371,7 +371,7 @@ int tarantoolSqlite3Count(BtCursor *pCur, i64 *pnEntry)
  * @retval SQLITE_OK on success, SQLITE_TARANTOOL_ERROR otherwise.
  */
 int tarantoolSqlite3EphemeralCreate(BtCursor *pCur, uint32_t field_count,
-				    struct coll *aColl)
+				    struct key_def *def)
 {
 	assert(pCur);
 	assert(pCur->curFlags & BTCF_TEphemCursor);
@@ -380,11 +380,16 @@ int tarantoolSqlite3EphemeralCreate(BtCursor *pCur, uint32_t field_count,
 	if (ephemer_key_def == NULL)
 		return SQL_TARANTOOL_ERROR;
 	for (uint32_t part = 0; part < field_count; ++part) {
+		struct coll *coll;
+		if (part < def->part_count)
+			coll = def->parts[part].coll;
+		else
+			coll = NULL;
 		key_def_set_part(ephemer_key_def, part /* part no */,
 				 part /* filed no */,
 				 FIELD_TYPE_SCALAR,
 				 ON_CONFLICT_ACTION_NONE /* nullable_action */,
-				 aColl /* coll */,
+				 coll /* coll */,
 				 SORT_ORDER_ASC);
 	}
 
@@ -933,7 +938,8 @@ rename_fail:
  * Performs exactly as extract_key + sqlite3VdbeCompareMsgpack,
  * only faster.
  */
-int tarantoolSqlite3IdxKeyCompare(BtCursor *pCur, UnpackedRecord *pUnpacked,
+int tarantoolSqlite3IdxKeyCompare(struct sqlite3 *db,
+				  BtCursor *pCur, UnpackedRecord *pUnpacked,
 				  int *res)
 {
 	assert(pCur->curFlags & BTCF_TaCursor);
@@ -995,9 +1001,10 @@ int tarantoolSqlite3IdxKeyCompare(BtCursor *pCur, UnpackedRecord *pUnpacked,
 			}
 		}
 		next_fieldno = fieldno + 1;
-		rc = sqlite3VdbeCompareMsgpack(&p, pUnpacked, i);
+		rc = sqlite3VdbeCompareMsgpack(db, &p, pUnpacked, i);
 		if (rc != 0) {
-			if (pUnpacked->pKeyInfo->aSortOrder[i]) {
+			if (pUnpacked->key_def->parts[i].sort_order !=
+			    SORT_ORDER_ASC) {
 				rc = -rc;
 			}
 			*res = rc;
@@ -1011,7 +1018,7 @@ out:
 	original_size = region_used(&fiber()->gc);
 	key = tuple_extract_key(tuple, key_def, &key_size);
 	if (key != NULL) {
-		rc = sqlite3VdbeRecordCompareMsgpack((int)key_size, key,
+		rc = sqlite3VdbeRecordCompareMsgpack(db, (int)key_size, key,
 						     pUnpacked);
 		region_truncate(&fiber()->gc, original_size);
 		assert(rc == *res);
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index f0054c5..ec23481 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -176,8 +176,8 @@ openStatTable(Parse * pParse,	/* Parsing context */
 	/* Open the sql_stat[134] tables for writing. */
 	for (i = 0; aTable[i]; i++) {
 		int addr = emit_open_cursor(pParse, iStatCur + i, aRoot[i]);
-		v->aOp[addr].p4.pKeyInfo = 0;
-		v->aOp[addr].p4type = P4_KEYINFO;
+		v->aOp[addr].p4.key_def = NULL;
+		v->aOp[addr].p4type = P4_KEYDEF;
 		sqlite3VdbeChangeP5(v, aCreateTbl[i]);
 		VdbeComment((v, aTable[i]));
 	}
@@ -914,7 +914,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 				     (void *) space);
 		sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
 				  space_ptr_reg);
-		sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
+		sql_vdbe_set_p4_key_def(pParse, pIdx);
 		VdbeComment((v, "%s", pIdx->zName));
 
 		/* Invoke the stat_init() function. The arguments are:
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index ae662fb..f9b54ef 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -1094,6 +1094,18 @@ sql_column_collation(Table *table, uint32_t column)
 	return space->format->fields[column].coll;
 }
 
+struct key_def*
+sql_index_key_def(struct Index *idx)
+{
+	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
+	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
+	struct space *space = space_by_id(space_id);
+	assert(space != NULL);
+	struct index *index = space_index(space, index_id);
+	assert(index != NULL && index->def != NULL);
+	return key_def_dup(index->def->key_def);
+}
+
 /**
  * Return name of given column collation from index.
  *
@@ -1119,10 +1131,9 @@ sql_index_collation(Index *idx, uint32_t column)
 		return idx->coll_array[column];
 	}
 
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct index *index = space_index(space, index_id);
-	assert(index != NULL && index->def->key_def->part_count >= column);
-	return index->def->key_def->parts[column].coll;
+	struct key_def *key_def = sql_index_key_def(idx);
+	assert(key_def != NULL && key_def->part_count >= column);
+	return key_def->parts[column].coll;
 }
 
 enum sort_order
@@ -1143,10 +1154,9 @@ sql_index_column_sort_order(Index *idx, uint32_t column)
 		return idx->sort_order[column];
 	}
 
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct index *index = space_index(space, index_id);
-	assert(index != NULL && index->def->key_def->part_count >= column);
-	return index->def->key_def->parts[column].sort_order;
+	struct key_def *key_def = sql_index_key_def(idx);
+	assert(key_def != NULL && key_def->part_count >= column);
+	return key_def->parts[column].sort_order;
 }
 
 /**
@@ -1443,9 +1453,7 @@ hasColumn(const i16 * aiCol, int nCol, int x)
  *     (2)  Set the Index.tnum of the PRIMARY KEY Index object in the
  *          schema to the rootpage from the main table.
  *     (3)  Add all table columns to the PRIMARY KEY Index object
- *          so that the PRIMARY KEY is a covering index.  The surplus
- *          columns are part of KeyInfo.nXField and are not used for
- *          sorting or lookup or uniqueness checks.
+ *          so that the PRIMARY KEY is a covering index.
  */
 static void
 convertToWithoutRowidTable(Parse * pParse, Table * pTab)
@@ -2632,7 +2640,6 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 	int tnum;		/* Root page of index */
 	int iPartIdxLabel;	/* Jump to this label to skip a row */
 	Vdbe *v;		/* Generate code into this virtual machine */
-	KeyInfo *pKey;		/* KeyInfo for index */
 	int regRecord;		/* Register holding assembled index record */
 	sqlite3 *db = pParse->db;	/* The database connection */
 	v = sqlite3GetVdbe(pParse);
@@ -2643,14 +2650,14 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
 	} else {
 		tnum = pIndex->tnum;
 	}
-	pKey = sqlite3KeyInfoOfIndex(pParse, db, pIndex);
-	assert(pKey != 0 || db->mallocFailed || pParse->nErr);
+	struct key_def *def = sql_index_key_def(pIndex);
+	assert(def != NULL || db->mallocFailed || pParse->nErr);
 
 	/* Open the sorter cursor if we are to use one. */
 	iSorter = pParse->nTab++;
 	sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
-			  (char *)
-			  sqlite3KeyInfoRef(pKey), P4_KEYINFO);
+			  (char *)def, P4_KEYDEF);
+	assert(0);
 
 	/* Open the table. Loop through all rows of the table, inserting index
 	 * records into the sorter.
@@ -4194,46 +4201,6 @@ sqlite3Reindex(Parse * pParse, Token * pName1, Token * pName2)
 }
 #endif
 
-/*
- * Return a KeyInfo structure that is appropriate for the given Index.
- *
- * The caller should invoke sqlite3KeyInfoUnref() on the returned object
- * when it has finished using it.
- */
-KeyInfo *
-sqlite3KeyInfoOfIndex(Parse * pParse, sqlite3 * db, Index * pIdx)
-{
-	int i;
-	int nCol = pIdx->nColumn;
-	int nTableCol = pIdx->pTable->nCol;
-	KeyInfo *pKey;
-
-	if (pParse && pParse->nErr)
-		return 0;
-
-	/*
-	 * KeyInfo describes the index (i.e. the number of key columns,
-	 * comparator options, and the number of columns beyond the key).
-	 * Since Tarantool iterator yields the full tuple, we need a KeyInfo
-	 * as wide as the table itself.  Otherwize, not enough slots
-	 * for row parser cache are allocated in VdbeCursor object.
-	 */
-	pKey = sqlite3KeyInfoAlloc(db, nCol, nTableCol - nCol);
-	if (pKey) {
-		assert(sqlite3KeyInfoIsWriteable(pKey));
-		for (i = 0; i < nCol; i++) {
-			pKey->aColl[i] = sql_index_collation(pIdx, i);
-			pKey->aSortOrder[i] = sql_index_column_sort_order(pIdx,
-									  i);
-		}
-		if (pParse && pParse->nErr) {
-			sqlite3KeyInfoUnref(pKey);
-			pKey = 0;
-		}
-	}
-	return pKey;
-}
-
 #ifndef SQLITE_OMIT_CTE
 /*
  * This routine is invoked once per CTE by the parser while parsing a
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 3f74b93..b6fc135 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -386,10 +386,10 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 			iPk = pParse->nMem + 1;
 			pParse->nMem += nPk;
 			iEphCur = pParse->nTab++;
-			KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nPk, 0);
+			struct key_def *def = key_def_new(nPk);
 			addrEphOpen =
 				sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEphCur,
-						  nPk, 0, (char*) pKeyInfo, P4_KEYINFO);
+						  nPk, 0, (char*)def, P4_KEYDEF);
 		} else {
 			pPk = sqlite3PrimaryKeyIndex(pTab);
 			assert(pPk != 0);
@@ -400,7 +400,7 @@ sqlite3DeleteFrom(Parse * pParse,	/* The parser context */
 			addrEphOpen =
 			    sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEphCur,
 					      nPk);
-			sqlite3VdbeSetP4KeyInfo(pParse, pPk);
+			sql_vdbe_set_p4_key_def(pParse, pPk);
 		}
 
 		/* Construct a query to find the primary key for every row
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index cc969ca..e5405dd 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2522,7 +2522,7 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
 							  P4_DYNAMIC);
 					emit_open_cursor(pParse, iTab,
 							 pIdx->tnum);
-					sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
+					sql_vdbe_set_p4_key_def(pParse, pIdx);
 					VdbeComment((v, "%s", pIdx->zName));
 					assert(IN_INDEX_INDEX_DESC ==
 					       IN_INDEX_INDEX_ASC + 1);
@@ -2739,7 +2739,6 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 	case TK_IN:{
 			int addr;	/* Address of OP_OpenEphemeral instruction */
 			Expr *pLeft = pExpr->pLeft;	/* the LHS of the IN operator */
-			KeyInfo *pKeyInfo = 0;	/* Key information */
 			int nVal;	/* Size of vector pLeft */
 
 			nVal = sqlite3ExprVectorSize(pLeft);
@@ -2761,7 +2760,7 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 			pExpr->is_ephemeral = 1;
 			addr = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
 						 pExpr->iTable, nVal);
-			pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nVal, 1);
+			struct key_def *key_def = key_def_new(nVal);
 
 			if (ExprHasProperty(pExpr, EP_xIsSelect)) {
 				/* Case 1:     expr IN (SELECT ...)
@@ -2787,29 +2786,34 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 					pSelect->iLimit = 0;
 					testcase(pSelect->
 						 selFlags & SF_Distinct);
-					testcase(pKeyInfo == 0);	/* Caused by OOM in sqlite3KeyInfoAlloc() */
 					if (sqlite3Select
 					    (pParse, pSelect, &dest)) {
 						sqlite3DbFree(pParse->db,
 							      dest.zAffSdst);
-						sqlite3KeyInfoUnref(pKeyInfo);
+						if (key_def != NULL)
+							free(key_def);
 						return 0;
 					}
 					sqlite3DbFree(pParse->db,
 						      dest.zAffSdst);
-					assert(pKeyInfo != 0);	/* OOM will cause exit after sqlite3Select() */
+					assert(key_def != NULL);
 					assert(pEList != 0);
 					assert(pEList->nExpr > 0);
-					assert(sqlite3KeyInfoIsWriteable
-					       (pKeyInfo));
 					for (i = 0; i < nVal; i++) {
 						Expr *p =
 						    sqlite3VectorFieldSubexpr
 						    (pLeft, i);
-						pKeyInfo->aColl[i] =
-						    sqlite3BinaryCompareCollSeq
-						    (pParse, p,
-						     pEList->a[i].pExpr);
+
+						struct coll *coll;
+						coll = sqlite3BinaryCompareCollSeq
+							(pParse, p,
+							 pEList->a[i].pExpr);
+
+						key_def_set_part(key_def, i, i,
+								 FIELD_TYPE_SCALAR,
+								 ON_CONFLICT_ACTION_ABORT,
+								 coll,
+								 SORT_ORDER_ASC);
 					}
 				}
 			} else if (ALWAYS(pExpr->x.pList != 0)) {
@@ -2830,14 +2834,18 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 				if (!affinity) {
 					affinity = SQLITE_AFF_BLOB;
 				}
-				if (pKeyInfo) {
+				if (key_def != NULL) {
 					bool unused;
-					assert(sqlite3KeyInfoIsWriteable
-					       (pKeyInfo));
-					pKeyInfo->aColl[0] =
-						sql_expr_coll(pParse,
-							      pExpr->pLeft,
-							      &unused);
+					struct coll *coll;
+					coll = sql_expr_coll(pParse,
+							     pExpr->pLeft,
+							     &unused);
+
+					key_def_set_part(key_def, 0, 0,
+							 FIELD_TYPE_SCALAR,
+							 ON_CONFLICT_ACTION_ABORT,
+							 coll,
+							 SORT_ORDER_ASC);
 				}
 
 				/* Loop through each expression in <exprlist>. */
@@ -2868,9 +2876,9 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 				sqlite3ReleaseTempReg(pParse, r1);
 				sqlite3ReleaseTempReg(pParse, r2);
 			}
-			if (pKeyInfo) {
-				sqlite3VdbeChangeP4(v, addr, (void *)pKeyInfo,
-						    P4_KEYINFO);
+			if (key_def != NULL) {
+				sqlite3VdbeChangeP4(v, addr, (void *)key_def,
+						    P4_KEYDEF);
 			}
 			break;
 		}
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index fb9a310..4b9d3ed 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -437,7 +437,7 @@ fkLookupParent(Parse * pParse,	/* Parse context */
 			int regRec = sqlite3GetTempReg(pParse);
 
 			emit_open_cursor(pParse, iCur, pIdx->tnum);
-			sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
+			sql_vdbe_set_p4_key_def(pParse, pIdx);
 			for (i = 0; i < nCol; i++) {
 				sqlite3VdbeAddOp2(v, OP_Copy,
 						  aiCol[i] + 1 + regData,
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 1a34f71..95a2610 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -55,7 +55,7 @@ sqlite3OpenTable(Parse * pParse,	/* Generate code into this VDBE */
 	assert(pPk != 0);
 	assert(pPk->tnum == pTab->tnum);
 	emit_open_cursor(pParse, iCur, pPk->tnum);
-	sqlite3VdbeSetP4KeyInfo(pParse, pPk);
+	sql_vdbe_set_p4_key_def(pParse, pPk);
 	VdbeComment((v, "%s", pTab->zName));
 }
 
@@ -565,9 +565,9 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
 			regRec = sqlite3GetTempReg(pParse);
 			regCopy = sqlite3GetTempRange(pParse, nColumn);
 			regTempId = sqlite3GetTempReg(pParse);
-			KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, 1+nColumn, 0);
+			struct key_def *def = key_def_new(nColumn + 1);
 			sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, srcTab, nColumn+1,
-					  0, (char*)pKeyInfo, P4_KEYINFO);
+					  0, (char*)def, P4_KEYDEF);
 			addrL = sqlite3VdbeAddOp1(v, OP_Yield, dest.iSDParm);
 			VdbeCoverage(v);
 			sqlite3VdbeAddOp3(v, OP_NextIdEphemeral, srcTab, 2, regTempId);
@@ -1594,7 +1594,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,	/* Parsing context */
 			if (aToOpen == 0 || aToOpen[i + 1]) {
 				sqlite3VdbeAddOp3(v, op, iIdxCur, pIdx->tnum,
 						  space_ptr_reg);
-				sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
+				sql_vdbe_set_p4_key_def(pParse, pIdx);
 				sqlite3VdbeChangeP5(v, p5);
 				VdbeComment((v, "%s", pIdx->zName));
 			}
@@ -1908,10 +1908,10 @@ xferOptimization(Parse * pParse,	/* Parser context */
 		}
 		assert(pSrcIdx);
 		emit_open_cursor(pParse, iSrc, pSrcIdx->tnum);
-		sqlite3VdbeSetP4KeyInfo(pParse, pSrcIdx);
+		sql_vdbe_set_p4_key_def(pParse, pSrcIdx);
 		VdbeComment((v, "%s", pSrcIdx->zName));
 		emit_open_cursor(pParse, iDest, pDestIdx->tnum);
-		sqlite3VdbeSetP4KeyInfo(pParse, pDestIdx);
+		sql_vdbe_set_p4_key_def(pParse, pDestIdx);
 		sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR);
 		VdbeComment((v, "%s", pDestIdx->zName));
 		addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index 738c254..0f1c64d 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -646,8 +646,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
 									  pIdx->
 									  tnum,
 									  0);
-							sqlite3VdbeSetP4KeyInfo
-							    (pParse, pIdx);
+							sql_vdbe_set_p4_key_def(pParse,
+										pIdx);
 						}
 					} else {
 						k = 0;
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index aff534d3..6729eef 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -562,12 +562,11 @@ sqliteProcessJoin(Parse * pParse, Select * p)
 	return 0;
 }
 
-/* Forward reference */
-static KeyInfo *
-keyInfoFromExprList(Parse * pParse,	/* Parsing context */
-		    ExprList * pList,	/* Form the KeyInfo object from this ExprList */
-		    int iStart,		/* Begin with this column of pList */
-		    int nExtra);	/* Add this many extra columns to the end */
+static struct key_def *
+sql_expr_list_to_key_def(struct Parse *parse,
+			 struct ExprList *list,
+			 int start);
+
 
 /*
  * Generate code that will push the record in registers regData
@@ -623,7 +622,6 @@ pushOntoSorter(Parse * pParse,		/* Parser context */
 		int addrJmp;	/* Address of the OP_Jump opcode */
 		VdbeOp *pOp;	/* Opcode that opens the sorter */
 		int nKey;	/* Number of sorting key columns, including OP_Sequence */
-		KeyInfo *pKI;	/* Original KeyInfo on the sorter table */
 
 		regPrevKey = pParse->nMem + 1;
 		pParse->nMem += pSort->nOBSat;
@@ -643,13 +641,13 @@ pushOntoSorter(Parse * pParse,		/* Parser context */
 		if (pParse->db->mallocFailed)
 			return;
 		pOp->p2 = nKey + nData;
-		pKI = pOp->p4.pKeyInfo;
-		memset(pKI->aSortOrder, 0, pKI->nField);	/* Makes OP_Jump below testable */
-		sqlite3VdbeChangeP4(v, -1, (char *)pKI, P4_KEYINFO);
-		testcase(pKI->nXField > 2);
-		pOp->p4.pKeyInfo =
-		    keyInfoFromExprList(pParse, pSort->pOrderBy, nOBSat,
-					pKI->nXField - 1);
+		struct key_def *def = key_def_dup(pOp->p4.key_def);
+		for (uint32_t i = 0; i < def->part_count; ++i)
+			pOp->p4.key_def->parts[i].sort_order = SORT_ORDER_ASC;
+		sqlite3VdbeChangeP4(v, -1, (char *)def, P4_KEYDEF);
+		pOp->p4.key_def = sql_expr_list_to_key_def(pParse,
+							   pSort->pOrderBy,
+							   nOBSat);
 		addrJmp = sqlite3VdbeCurrentAddr(v);
 		sqlite3VdbeAddOp3(v, OP_Jump, addrJmp + 1, 0, addrJmp + 1);
 		VdbeCoverage(v);
@@ -1193,109 +1191,51 @@ selectInnerLoop(Parse * pParse,		/* The parser context */
 	}
 }
 
-/*
- * Allocate a KeyInfo object sufficient for an index of N key columns and
- * X extra columns.
- */
-KeyInfo *
-sqlite3KeyInfoAlloc(sqlite3 * db, int N, int X)
-{
-	int nExtra = (N + X) * (sizeof(struct coll *) + 1);
-	KeyInfo *p = sqlite3DbMallocRawNN(db, sizeof(KeyInfo) + nExtra);
-	if (p) {
-		p->aSortOrder = (u8 *) & p->aColl[N + X];
-		p->nField = (u16) N;
-		p->nXField = (u16) X;
-		p->db = db;
-		p->nRef = 1;
-		p->aColl[0] = NULL;
-		memset(&p[1], 0, nExtra);
-	} else {
-		sqlite3OomFault(db);
-	}
-	return p;
-}
-
-/*
- * Deallocate a KeyInfo object
- */
-void
-sqlite3KeyInfoUnref(KeyInfo * p)
-{
-	if (p) {
-		assert(p->nRef > 0);
-		p->nRef--;
-		if (p->nRef == 0)
-			sqlite3DbFree(p->db, p);
-	}
-}
-
-/*
- * Make a new pointer to a KeyInfo object
- */
-KeyInfo *
-sqlite3KeyInfoRef(KeyInfo * p)
-{
-	if (p) {
-		assert(p->nRef > 0);
-		p->nRef++;
-	}
-	return p;
-}
-
-#ifdef SQLITE_DEBUG
-/*
- * Return TRUE if a KeyInfo object can be change.  The KeyInfo object
- * can only be changed if this is just a single reference to the object.
- *
- * This routine is used only inside of assert() statements.
- */
-int
-sqlite3KeyInfoIsWriteable(KeyInfo * p)
-{
-	return p->nRef == 1;
-}
-#endif				/* SQLITE_DEBUG */
-
-/*
- * Given an expression list, generate a KeyInfo structure that records
+/**
+ * Given an expression list, generate a key_defd structure that records
  * the collating sequence for each expression in that expression list.
  *
  * If the ExprList is an ORDER BY or GROUP BY clause then the resulting
- * KeyInfo structure is appropriate for initializing a virtual index to
+ * key_def structure is appropriate for initializing a virtual index to
  * implement that clause.  If the ExprList is the result set of a SELECT
- * then the KeyInfo structure is appropriate for initializing a virtual
+ * then the key_info structure is appropriate for initializing a virtual
  * index to implement a DISTINCT test.
  *
- * Space to hold the KeyInfo structure is obtained from malloc.  The calling
- * function is responsible for seeing that this structure is eventually
- * freed.
+ * Space to hold the key_info structure is obtained from malloc. 
+ * The calling function is responsible for seeing that this
+ * structure is eventually freed.
+ *
+ * @param Parsing context.
+ * @param Expression list.
+ * @param No of leading parts to skip.
+ *
+ * @retval Allocated key_def, NULL in case of OOM.
  */
-static KeyInfo *
-keyInfoFromExprList(Parse * pParse,	/* Parsing context */
-		    ExprList * pList,	/* Form the KeyInfo object from this ExprList */
-		    int iStart,		/* Begin with this column of pList */
-		    int nExtra)		/* Add this many extra columns to the end */
+static struct key_def *
+sql_expr_list_to_key_def(struct Parse *parse,
+			 struct ExprList *list,
+			 int start)
 {
-	int nExpr;
-	KeyInfo *pInfo;
-	struct ExprList_item *pItem;
-	sqlite3 *db = pParse->db;
-	int i;
-
-	nExpr = pList->nExpr;
-	pInfo = sqlite3KeyInfoAlloc(db, nExpr - iStart, nExtra + 1);
-	if (pInfo) {
-		assert(sqlite3KeyInfoIsWriteable(pInfo));
-		for (i = iStart, pItem = pList->a + iStart; i < nExpr;
-		     i++, pItem++) {
+	struct key_def *def;
+	int expr_count = list->nExpr;
+	def = key_def_new(expr_count);
+	if (def != NULL) {
+		int i;
+		struct ExprList_item *item;
+		for (i = start, item = list->a + start; i < expr_count;
+		     ++i, ++item) {
 			bool unused;
-			pInfo->aColl[i - iStart] =
-				sql_expr_coll(pParse, pItem->pExpr, &unused);
-			pInfo->aSortOrder[i - iStart] = pItem->sortOrder;
+			struct coll *coll = sql_expr_coll(parse, item->pExpr,
+							  &unused);
+			enum sort_order sort_order = item->sortOrder;
+			key_def_set_part(def, i-start, i-start,
+					 FIELD_TYPE_SCALAR,
+					 ON_CONFLICT_ACTION_ABORT,
+					 coll, sort_order);
 		}
 	}
-	return pInfo;
+
+	return def;
 }
 
 /*
@@ -2118,54 +2058,62 @@ multiSelectCollSeq(Parse * pParse, Select * p, int iCol, bool *is_found)
 	return coll;
 }
 
-/*
+/**
  * The select statement passed as the second parameter is a compound SELECT
- * with an ORDER BY clause. This function allocates and returns a KeyInfo
+ * with an ORDER BY clause. This function allocates and returns a key_def
  * structure suitable for implementing the ORDER BY.
  *
- * Space to hold the KeyInfo structure is obtained from malloc. The calling
+ * Space to hold the key_def structure is obtained from malloc. The calling
  * function is responsible for ensuring that this structure is eventually
  * freed.
+ *
+ * @param Parsing context.
+ * @param Select struct to analyze.
+ * @param No of extra slots to allocate.
+ *
+ * @retval Allocated key_def, NULL in case of OOM.
  */
-static KeyInfo *
-multiSelectOrderByKeyInfo(Parse * pParse, Select * p, int nExtra)
+static struct key_def *
+sql_multiselect_orderby_to_key_def(struct Parse *parse,
+				   struct Select *s,
+				   int extra)
 {
-	ExprList *pOrderBy = p->pOrderBy;
-	int nOrderBy = p->pOrderBy->nExpr;
-	sqlite3 *db = pParse->db;
-	KeyInfo *pRet = sqlite3KeyInfoAlloc(db, nOrderBy + nExtra, 1);
-	if (pRet) {
-		int i;
-		for (i = 0; i < nOrderBy; i++) {
-			struct ExprList_item *pItem = &pOrderBy->a[i];
-			Expr *pTerm = pItem->pExpr;
+	int ob_count = s->pOrderBy->nExpr;
+	struct key_def *key_def = key_def_new(ob_count + extra);
+	if (key_def != NULL) {
+		ExprList *order_by = s->pOrderBy;
+		for (int i = 0; i < ob_count; i++) {
+			struct ExprList_item *item = &order_by->a[i];
+			struct Expr *term = item->pExpr;
 			struct coll *coll;
 			bool is_found = false;
 
-			if (pTerm->flags & EP_Collate) {
-				coll = sql_expr_coll(pParse, pTerm, &is_found);
+			if (term->flags & EP_Collate) {
+				coll = sql_expr_coll(parse, term, &is_found);
 			} else {
 				coll =
-				    multiSelectCollSeq(pParse, p,
-						       pItem->u.x.iOrderByCol - 1,
+				    multiSelectCollSeq(parse, s,
+						       item->u.x.iOrderByCol - 1,
 						       &is_found);
 				if (coll != NULL) {
-					pOrderBy->a[i].pExpr =
-						sqlite3ExprAddCollateString(pParse, pTerm,
+					order_by->a[i].pExpr =
+						sqlite3ExprAddCollateString(parse, term,
 									    coll->name);
 				} else {
-					pOrderBy->a[i].pExpr =
-						sqlite3ExprAddCollateString(pParse, pTerm,
+					order_by->a[i].pExpr =
+						sqlite3ExprAddCollateString(parse, term,
 									    "BINARY");
 				}
 			}
-			assert(sqlite3KeyInfoIsWriteable(pRet));
-			pRet->aColl[i] = coll;
-			pRet->aSortOrder[i] = pOrderBy->a[i].sortOrder;
+			key_def_set_part(key_def, i, i,
+					 FIELD_TYPE_SCALAR,
+					 ON_CONFLICT_ACTION_ABORT,
+					 coll,
+					 order_by->a[i].sortOrder);
 		}
 	}
 
-	return pRet;
+	return key_def;
 }
 
 #ifndef SQLITE_OMIT_CTE
@@ -2265,16 +2213,17 @@ generateWithRecursiveQuery(Parse * pParse,	/* Parsing context */
 	regCurrent = ++pParse->nMem;
 	sqlite3VdbeAddOp3(v, OP_OpenPseudo, iCurrent, regCurrent, nCol);
 	if (pOrderBy) {
-		KeyInfo *pKeyInfo = multiSelectOrderByKeyInfo(pParse, p, 1);
+		struct key_def *def = sql_multiselect_orderby_to_key_def(pParse,
+									 p, 1);
 		sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iQueue,
-				  pOrderBy->nExpr + 2, 0, (char *)pKeyInfo,
-				  P4_KEYINFO);
+				  pOrderBy->nExpr + 2, 0, (char *)def,
+				  P4_KEYDEF);
 		VdbeComment((v, "Orderby table"));
 		destQueue.pOrderBy = pOrderBy;
 	} else {
-		KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nCol + 1, 0);
+		struct key_def *def = key_def_new(nCol + 1);
 		sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iQueue, nCol + 1, 0,
-				  (char*)pKeyInfo, P4_KEYINFO);
+				  (char*)def, P4_KEYDEF);
 		VdbeComment((v, "Queue table"));
 	}
 	if (iDistinct) {
@@ -2475,9 +2424,10 @@ multiSelect(Parse * pParse,	/* Parsing context */
 	if (dest.eDest == SRT_EphemTab) {
 		assert(p->pEList);
 		int nCols = p->pEList->nExpr;
-		KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nCols + 1, 0);
+		struct key_def *def;
+		def = key_def_new(nCols + 1);
 		sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, dest.iSDParm, nCols + 1,
-				  0, (char*)pKeyInfo, P4_KEYINFO);
+				  0, (char*)def, P4_KEYDEF);
 		VdbeComment((v, "Destination temp"));
 		dest.eDest = SRT_Table;
 	}
@@ -2785,7 +2735,7 @@ multiSelect(Parse * pParse,	/* Parsing context */
 
 	/* Compute collating sequences used by
 	 * temporary tables needed to implement the compound select.
-	 * Attach the KeyInfo structure to all temporary tables.
+	 * Attach the key_def structure to all temporary tables.
 	 *
 	 * This section is run by the right-most SELECT statement only.
 	 * SELECT statements to the left always skip this part.  The right-most
@@ -2793,26 +2743,28 @@ multiSelect(Parse * pParse,	/* Parsing context */
 	 * no temp tables are required.
 	 */
 	if (p->selFlags & SF_UsesEphemeral) {
-		int i;		/* Loop counter */
-		KeyInfo *pKeyInfo;	/* Collating sequence for the result set */
-		Select *pLoop;	/* For looping through SELECT statements */
-		struct coll **apColl;	/* For looping through pKeyInfo->aColl[] */
-		int nCol;	/* Number of columns in result set */
+		int nCol;
+		struct key_def *key_def;
 
 		assert(p->pNext == 0);
 		nCol = p->pEList->nExpr;
-		pKeyInfo = sqlite3KeyInfoAlloc(db, nCol, 1);
-		if (!pKeyInfo) {
+		key_def = key_def_new(nCol);
+		if (key_def == NULL) {
 			rc = SQLITE_NOMEM_BKPT;
 			goto multi_select_end;
 		}
-		for (i = 0, apColl = pKeyInfo->aColl; i < nCol; i++, apColl++) {
-			bool is_found = false;
-			*apColl = multiSelectCollSeq(pParse, p, i, &is_found);
+		for (int i = 0; i < nCol; i++) {
+			bool unused;
+			key_def_set_part(key_def, i, i,
+					 FIELD_TYPE_SCALAR,
+					 ON_CONFLICT_ACTION_ABORT,
+					 multiSelectCollSeq(pParse, p, i,
+							    &unused),
+					 SORT_ORDER_ASC);
 		}
 
-		for (pLoop = p; pLoop; pLoop = pLoop->pPrior) {
-			for (i = 0; i < 2; i++) {
+		for (struct Select *pLoop = p; pLoop; pLoop = pLoop->pPrior) {
+			for (int i = 0; i < 2; i++) {
 				int addr = pLoop->addrOpenEphm[i];
 				if (addr < 0) {
 					/* If [0] is unused then [1] is also unused.  So we can
@@ -2823,13 +2775,12 @@ multiSelect(Parse * pParse,	/* Parsing context */
 				}
 				sqlite3VdbeChangeP2(v, addr, nCol);
 				sqlite3VdbeChangeP4(v, addr,
-						    (char *)
-						    sqlite3KeyInfoRef(pKeyInfo),
-						    P4_KEYINFO);
+						    (char *)key_def_dup(key_def),
+						    P4_KEYDEF);
 				pLoop->addrOpenEphm[i] = -1;
 			}
 		}
-		sqlite3KeyInfoUnref(pKeyInfo);
+		free(key_def);
 	}
 
  multi_select_end:
@@ -2871,7 +2822,7 @@ sqlite3SelectWrongNumTermsError(Parse * pParse, Select * p)
  * If regPrev>0 then it is the first register in a vector that
  * records the previous output.  mem[regPrev] is a flag that is false
  * if there has been no previous output.  If regPrev>0 then code is
- * generated to suppress duplicates.  pKeyInfo is used for comparing
+ * generated to suppress duplicates.  def is used for comparing
  * keys.
  *
  * If the LIMIT found in p->iLimit is reached, jump immediately to
@@ -2884,7 +2835,7 @@ generateOutputSubroutine(Parse * pParse,	/* Parsing context */
 			 SelectDest * pDest,	/* Where to send the data */
 			 int regReturn,		/* The return address register */
 			 int regPrev,		/* Previous result register.  No uniqueness if 0 */
-			 KeyInfo * pKeyInfo,	/* For comparing with previous entry */
+			 struct key_def *def,	/* For comparing with previous entry */
 			 int iBreak)		/* Jump here if we hit the LIMIT */
 {
 	Vdbe *v = pParse->pVdbe;
@@ -2903,8 +2854,8 @@ generateOutputSubroutine(Parse * pParse,	/* Parsing context */
 		addr2 =
 		    sqlite3VdbeAddOp4(v, OP_Compare, pIn->iSdst, regPrev + 1,
 				      pIn->nSdst,
-				      (char *)sqlite3KeyInfoRef(pKeyInfo),
-				      P4_KEYINFO);
+				      (char *)key_def_dup(def),
+				      P4_KEYDEF);
 		sqlite3VdbeAddOp3(v, OP_Jump, addr2 + 2, iContinue, addr2 + 2);
 		VdbeCoverage(v);
 		sqlite3VdbeJumpHere(v, addr1);
@@ -3135,8 +3086,10 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 	int labelEnd;		/* Label for the end of the overall SELECT stmt */
 	int addr1;		/* Jump instructions that get retargetted */
 	int op;			/* One of TK_ALL, TK_UNION, TK_EXCEPT, TK_INTERSECT */
-	KeyInfo *pKeyDup = 0;	/* Comparison information for duplicate removal */
-	KeyInfo *pKeyMerge;	/* Comparison information for merging rows */
+	/* Comparison information for duplicate removal */
+	struct key_def *def_dup = NULL;
+	/* Comparison information for merging rows */
+	struct key_def *def_merge;
 	sqlite3 *db;		/* Database connection */
 	ExprList *pOrderBy;	/* The ORDER BY clause */
 	int nOrderBy;		/* Number of terms in the ORDER BY clause */
@@ -3145,7 +3098,8 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 	int iSub2;		/* EQP id of right-hand query */
 
 	assert(p->pOrderBy != 0);
-	assert(pKeyDup == 0);	/* "Managed" code needs this.  Ticket #3382. */
+	/* "Managed" code needs this.  Ticket #3382. */
+	assert(def_dup == NULL);
 	db = pParse->db;
 	v = pParse->pVdbe;
 	assert(v != 0);		/* Already thrown the error if VDBE alloc failed */
@@ -3190,7 +3144,7 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 		}
 	}
 
-	/* Compute the comparison permutation and keyinfo that is used with
+	/* Compute the comparison permutation and key_def that is used with
 	 * the permutation used to determine if the next
 	 * row of results comes from selectA or selectB.  Also add explicit
 	 * collations to the ORDER BY clause terms so that when the subqueries
@@ -3206,9 +3160,9 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 			assert(pItem->u.x.iOrderByCol <= p->pEList->nExpr);
 			aPermute[i] = pItem->u.x.iOrderByCol - 1;
 		}
-		pKeyMerge = multiSelectOrderByKeyInfo(pParse, p, 1);
+		def_merge = sql_multiselect_orderby_to_key_def(pParse, p, 1);
 	} else {
-		pKeyMerge = 0;
+		def_merge = NULL;
 	}
 
 	/* Reattach the ORDER BY clause to the query.
@@ -3216,27 +3170,30 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 	p->pOrderBy = pOrderBy;
 	pPrior->pOrderBy = sqlite3ExprListDup(pParse->db, pOrderBy, 0);
 
-	/* Allocate a range of temporary registers and the KeyInfo needed
+	/* Allocate a range of temporary registers and the key_def needed
 	 * for the logic that removes duplicate result rows when the
 	 * operator is UNION, EXCEPT, or INTERSECT (but not UNION ALL).
 	 */
 	if (op == TK_ALL) {
 		regPrev = 0;
 	} else {
-		int nExpr = p->pEList->nExpr;
-		assert(nOrderBy >= nExpr || db->mallocFailed);
+		int expr_count = p->pEList->nExpr;
+		assert(nOrderBy >= expr_count || db->mallocFailed);
 		regPrev = pParse->nMem + 1;
-		pParse->nMem += nExpr + 1;
+		pParse->nMem += expr_count + 1;
 		sqlite3VdbeAddOp2(v, OP_Integer, 0, regPrev);
-		pKeyDup = sqlite3KeyInfoAlloc(db, nExpr, 1);
-		if (pKeyDup) {
-			assert(sqlite3KeyInfoIsWriteable(pKeyDup));
-			for (i = 0; i < nExpr; i++) {
+		def_dup = key_def_new(expr_count);
+		if (def_dup != NULL) {
+			for (int i = 0; i < expr_count; i++) {
 				bool is_found = false;
-				pKeyDup->aColl[i] =
-					multiSelectCollSeq(pParse, p, i,
-							   &is_found);
-				pKeyDup->aSortOrder[i] = 0;
+				struct coll *coll;
+				coll = multiSelectCollSeq(pParse, p, i,
+							  &is_found);
+				key_def_set_part(def_dup, i, i,
+						 FIELD_TYPE_SCALAR,
+						 ON_CONFLICT_ACTION_ABORT,
+						 coll,
+						 SORT_ORDER_ASC);
 			}
 		}
 	}
@@ -3311,7 +3268,7 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 	VdbeNoopComment((v, "Output routine for A"));
 	addrOutA = generateOutputSubroutine(pParse,
 					    p, &destA, pDest, regOutA,
-					    regPrev, pKeyDup, labelEnd);
+					    regPrev, def_dup, labelEnd);
 
 	/* Generate a subroutine that outputs the current row of the B
 	 * select as the next output row of the compound select.
@@ -3320,9 +3277,8 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 		VdbeNoopComment((v, "Output routine for B"));
 		addrOutB = generateOutputSubroutine(pParse,
 						    p, &destB, pDest, regOutB,
-						    regPrev, pKeyDup, labelEnd);
+						    regPrev, def_dup, labelEnd);
 	}
-	sqlite3KeyInfoUnref(pKeyDup);
 
 	/* Generate a subroutine to run when the results from select A
 	 * are exhausted and only data in select B remains.
@@ -3402,7 +3358,7 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 	sqlite3VdbeAddOp4(v, OP_Permutation, 0, 0, 0, (char *)aPermute,
 			  P4_INTARRAY);
 	sqlite3VdbeAddOp4(v, OP_Compare, destA.iSdst, destB.iSdst, nOrderBy,
-			  (char *)pKeyMerge, P4_KEYINFO);
+			  (char *)def_merge, P4_KEYDEF);
 	sqlite3VdbeChangeP5(v, OPFLAG_PERMUTE);
 	sqlite3VdbeAddOp3(v, OP_Jump, addrAltB, addrAeqB, addrAgtB);
 	VdbeCoverage(v);
@@ -5135,12 +5091,13 @@ resetAccumulator(Parse * pParse, AggInfo * pAggInfo)
 						"argument");
 				pFunc->iDistinct = -1;
 			} else {
-				KeyInfo *pKeyInfo =
-				    keyInfoFromExprList(pParse, pE->x.pList, 0,
-							0);
+				struct key_def *def;
+				def = sql_expr_list_to_key_def(pParse,
+							       pE->x.pList,
+							       0);
 				sqlite3VdbeAddOp4(v, OP_OpenTEphemeral,
 						  pFunc->iDistinct, 1, 0,
-						  (char *)pKeyInfo, P4_KEYINFO);
+						  (char *)def, P4_KEYDEF);
 			}
 		}
 	}
@@ -5611,23 +5568,22 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 	 * that change.
 	 */
 	if (sSort.pOrderBy) {
-		KeyInfo *pKeyInfo;
-		pKeyInfo =
-		    keyInfoFromExprList(pParse, sSort.pOrderBy, 0, pEList->nExpr);
+		struct key_def *def;
+		def = sql_expr_list_to_key_def(pParse, sSort.pOrderBy, 0);
 		sSort.iECursor = pParse->nTab++;
 		/* Number of columns in transient table equals to number of columns in
 		 * SELECT statement plus number of columns in ORDER BY statement
 		 * and plus one column for ID.
 		 */
 		int nCols = pEList->nExpr + sSort.pOrderBy->nExpr + 1;
-		if (pKeyInfo->aSortOrder[0] == SORT_ORDER_DESC) {
+		if (def->parts[0].sort_order == SORT_ORDER_DESC) {
 			sSort.sortFlags |= SORTFLAG_DESC;
 		}
 		sSort.addrSortIndex =
 		    sqlite3VdbeAddOp4(v, OP_OpenTEphemeral,
 				      sSort.iECursor,
 				      nCols,
-				      0, (char *)pKeyInfo, P4_KEYINFO);
+				      0, (char *)def, P4_KEYDEF);
 		VdbeComment((v, "Sort table"));
 	} else {
 		sSort.addrSortIndex = -1;
@@ -5636,10 +5592,10 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 	/* If the output is destined for a temporary table, open that table.
 	 */
 	if (pDest->eDest == SRT_EphemTab) {
-		KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db,
-							pEList->nExpr + 1, 0);
+		struct key_def *def;
+		def = key_def_new(pEList->nExpr + 1);
 		sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, pDest->iSDParm,
-				  pEList->nExpr + 1, 0, (char*)pKeyInfo, P4_KEYINFO);
+				  pEList->nExpr + 1, 0, (char*)def, P4_KEYDEF);
 
 		VdbeComment((v, "Output table"));
 	}
@@ -5660,12 +5616,12 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 	 */
 	if (p->selFlags & SF_Distinct) {
 		sDistinct.tabTnct = pParse->nTab++;
-		KeyInfo *pKeyInfo = keyInfoFromExprList(pParse, p->pEList, 0, 0);
+		struct key_def *def = sql_expr_list_to_key_def(pParse, p->pEList, 0);
 		sDistinct.addrTnct = sqlite3VdbeAddOp4(v, OP_OpenTEphemeral,
 						       sDistinct.tabTnct,
-						       pKeyInfo->nField,
-						       0, (char *)pKeyInfo,
-						       P4_KEYINFO);
+						       def->part_count,
+						       0, (char *)def,
+						       P4_KEYDEF);
 		VdbeComment((v, "Distinct table"));
 		sDistinct.eTnctType = WHERE_DISTINCT_UNORDERED;
 	} else {
@@ -5806,7 +5762,6 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 		 * much more complex than aggregates without a GROUP BY.
 		 */
 		if (pGroupBy) {
-			KeyInfo *pKeyInfo;	/* Keying information for the group by clause */
 			int addr1;	/* A-vs-B comparision jump */
 			int addrOutputRow;	/* Start of subroutine that outputs a result row */
 			int regOutputRow;	/* Return address register for output subroutine */
@@ -5822,14 +5777,12 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 			 * will be converted into a Noop.
 			 */
 			sAggInfo.sortingIdx = pParse->nTab++;
-			pKeyInfo =
-			    keyInfoFromExprList(pParse, pGroupBy, 0,
-						sAggInfo.nColumn);
+			struct key_def *def = sql_expr_list_to_key_def(pParse, pGroupBy, 0);
 			addrSortingIdx =
 			    sqlite3VdbeAddOp4(v, OP_SorterOpen,
 					      sAggInfo.sortingIdx,
 					      sAggInfo.nSortingColumn, 0,
-					      (char *)pKeyInfo, P4_KEYINFO);
+					      (char *)def, P4_KEYDEF);
 
 			/* Initialize memory locations used by GROUP BY aggregate processing
 			 */
@@ -5867,7 +5820,7 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 			if (sqlite3WhereIsOrdered(pWInfo) == pGroupBy->nExpr) {
 				/* The optimizer is able to deliver rows in group by order so
 				 * we do not have to sort.  The OP_OpenEphemeral table will be
-				 * cancelled later because we still need to use the pKeyInfo
+				 * cancelled later because we still need to use the key_def
 				 */
 				groupBySort = 0;
 			} else {
@@ -5979,8 +5932,8 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 			}
 			sqlite3VdbeAddOp4(v, OP_Compare, iAMem, iBMem,
 					  pGroupBy->nExpr,
-					  (char *)sqlite3KeyInfoRef(pKeyInfo),
-					  P4_KEYINFO);
+					  (char*) key_def_dup(def),
+					  P4_KEYDEF);
 			addr1 = sqlite3VdbeCurrentAddr(v);
 			sqlite3VdbeAddOp3(v, OP_Jump, addr1 + 1, 0, addr1 + 1);
 			VdbeCoverage(v);
@@ -6090,7 +6043,7 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 				 */
 				const int iCsr = pParse->nTab++;	/* Cursor to scan b-tree */
 				Index *pIdx;	/* Iterator variable */
-				KeyInfo *pKeyInfo = 0;	/* Keyinfo for scanned index */
+				struct key_def *def = NULL;
 				Index *pBest;	/* Best index found so far */
 				int iRoot = pTab->tnum;	/* Root page of scanned b-tree */
 
@@ -6100,7 +6053,7 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 				 *
 				 * (2013-10-03) Do not count the entries in a partial index.
 				 *
-				 * In practice the KeyInfo structure will not be used. It is only
+				 * In practice the key_def structure will not be used. It is only
 				 * passed to keep OP_OpenRead happy.
 				 */
 				pBest = sqlite3PrimaryKeyIndex(pTab);
@@ -6116,19 +6069,17 @@ sqlite3Select(Parse * pParse,		/* The parser context */
 						pBest = pIdx;
 					}
 				}
-				if (pBest) {
+				if (pBest != NULL) {
 					iRoot = pBest->tnum;
-					pKeyInfo =
-					    sqlite3KeyInfoOfIndex(pParse, db,
-								  pBest);
+					def = sql_index_key_def(pBest);
 				}
 
 				/* Open a read-only cursor, execute the OP_Count, close the cursor. */
 				emit_open_cursor(pParse, iCsr, iRoot);
-				if (pKeyInfo) {
+				if (def != NULL) {
 					sqlite3VdbeChangeP4(v, -1,
-							    (char *)pKeyInfo,
-							    P4_KEYINFO);
+							    (char *)def,
+							    P4_KEYDEF);
 				}
 				sqlite3VdbeAddOp2(v, OP_Count, iCsr,
 						  sAggInfo.aFunc[0].iMem);
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index a811932..7d32556 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -1462,7 +1462,6 @@ typedef struct IdList IdList;
 typedef struct Index Index;
 typedef struct IndexSample IndexSample;
 typedef struct KeyClass KeyClass;
-typedef struct KeyInfo KeyInfo;
 typedef struct Lookaside Lookaside;
 typedef struct LookasideSlot LookasideSlot;
 typedef struct NameContext NameContext;
@@ -2027,24 +2026,6 @@ struct FKey {
 #define OE_SetDflt  8		/* Set the foreign key value to its default */
 #define OE_Cascade  9		/* Cascade the changes */
 
-/*
- * An instance of the following structure is passed as the first
- * argument to sqlite3VdbeKeyCompare and is used to control the
- * comparison of the two index keys.
- *
- * Note that aSortOrder[] and aColl[] have nField+1 slots.  There
- * are nField slots for the columns of an index.
- */
-struct KeyInfo {
-	u32 nRef;		/* Number of references to this KeyInfo object */
-	u8 enc;			/* Text encoding - one of the SQLITE_UTF* values */
-	u16 nField;		/* Number of key columns in the index */
-	u16 nXField;		/* Number of columns beyond the key columns */
-	sqlite3 *db;		/* The database connection */
-	u8 *aSortOrder;		/* Sort order for each column. */
-	struct coll *aColl[1];	/* Collating sequence for each term of the key */
-};
-
 /*
  * This object holds a record which has been parsed out into individual
  * fields, for the purposes of doing a comparison.
@@ -2059,7 +2040,7 @@ struct KeyInfo {
  * an index b+tree. The goal of the search is to find the entry that
  * is closed to the key described by this object.  This object might hold
  * just a prefix of the key.  The number of fields is given by
- * pKeyInfo->nField.
+ * key_def->part_count.
  *
  * The r1 and r2 fields are the values to return if this key is less than
  * or greater than a key in the btree, respectively.  These are normally
@@ -2069,7 +2050,7 @@ struct KeyInfo {
  * The key comparison functions actually return default_rc when they find
  * an equals comparison.  default_rc can be -1, 0, or +1.  If there are
  * multiple entries in the b-tree with the same key (when only looking
- * at the first pKeyInfo->nFields,) then default_rc can be set to -1 to
+ * at the first key_def->part_count) then default_rc can be set to -1 to
  * cause the search to find the last match, or +1 to cause the search to
  * find the first match.
  *
@@ -2081,7 +2062,8 @@ struct KeyInfo {
  * b-tree.
  */
 struct UnpackedRecord {
-	KeyInfo *pKeyInfo;	/* Collation and sort-order information */
+	/* Collation and sort-order information */
+	struct key_def* key_def;
 	Mem *aMem;		/* Values */
 	u16 nField;		/* Number of entries in apMem[] */
 	i8 default_rc;		/* Comparison result if keys are equal */
@@ -2690,12 +2672,12 @@ struct NameContext {
  *
  * addrOpenEphm[] entries contain the address of OP_OpenEphemeral opcodes.
  * These addresses must be stored so that we can go back and fill in
- * the P4_KEYINFO and P2 parameters later.  Neither the KeyInfo nor
+ * the P4_KEYDEF and P2 parameters later.  Neither the key_def nor
  * the number of columns in P2 can be computed at the same time
  * as the OP_OpenEphm instruction is coded because not
  * enough information about the compound query is known at that point.
- * The KeyInfo for addrOpenTran[0] and [1] contains collating sequences
- * for the result set.  The KeyInfo for addrOpenEphm[2] contains collating
+ * The key_def for addrOpenTran[0] and [1] contains collating sequences
+ * for the result set.  The key_def for addrOpenEphm[2] contains collating
  * sequences for the ORDER BY clause.
  */
 struct Select {
@@ -3529,11 +3511,18 @@ const char *
 index_collation_name(Index *, uint32_t);
 struct coll *
 sql_index_collation(Index *idx, uint32_t column);
-struct coll *
-sql_default_coll();
 bool
 space_is_view(Table *);
 
+/**
+ * Return key_def of provided struct Index.
+ *
+ * @param idx pointer to `struct Index` object
+ * @retval pointer to `struct key_def`
+ */
+struct key_def*
+sql_index_key_def(struct Index *idx);
+
 /**
  * Return name of given column collation from index.
  *
@@ -3926,13 +3915,6 @@ void sqlite3RegisterLikeFunctions(sqlite3 *, int);
 int sqlite3IsLikeFunction(sqlite3 *, Expr *, int *, char *);
 void sqlite3SchemaClear(sqlite3 *);
 Schema *sqlite3SchemaCreate(sqlite3 *);
-KeyInfo *sqlite3KeyInfoAlloc(sqlite3 *, int, int);
-void sqlite3KeyInfoUnref(KeyInfo *);
-KeyInfo *sqlite3KeyInfoRef(KeyInfo *);
-KeyInfo *sqlite3KeyInfoOfIndex(Parse *, sqlite3 *, Index *);
-#ifdef SQLITE_DEBUG
-int sqlite3KeyInfoIsWriteable(KeyInfo *);
-#endif
 int sqlite3CreateFunc(sqlite3 *, const char *, int, int, void *,
 		      void (*)(sqlite3_context *, int, sqlite3_value **),
 		      void (*)(sqlite3_context *, int, sqlite3_value **),
diff --git a/src/box/sql/tarantoolInt.h b/src/box/sql/tarantoolInt.h
index 57fc4a1..af774fc 100644
--- a/src/box/sql/tarantoolInt.h
+++ b/src/box/sql/tarantoolInt.h
@@ -98,7 +98,7 @@ int tarantoolSqlite3RenameParentTable(int iTab, const char *zOldParentName,
 
 /* Interface for ephemeral tables. */
 int tarantoolSqlite3EphemeralCreate(BtCursor * pCur, uint32_t filed_count,
-				    struct coll *aColl);
+				    struct key_def *def);
 /**
  * Insert tuple into ephemeral space.
  * In contrast to ordinary spaces, there is no need to create and
@@ -123,7 +123,8 @@ int tarantoolSqlite3EphemeralGetMaxId(BtCursor * pCur, uint32_t fieldno,
  * the key may span non-adjacent fields in a random order,
  * ex: [4]-[1]-[2]
  */
-int tarantoolSqlite3IdxKeyCompare(BtCursor * pCur, UnpackedRecord * pUnpacked,
+int tarantoolSqlite3IdxKeyCompare(struct sqlite3 *db,
+				  BtCursor * pCur, UnpackedRecord * pUnpacked,
 				  int *res);
 
 /**
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index f3bd0b7..d380b8c 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -358,12 +358,12 @@ sqlite3Update(Parse * pParse,		/* The parser context */
 	sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
 
 	if (isView) {
-		KeyInfo *pKeyInfo = sqlite3KeyInfoAlloc(pParse->db, nKey, 0);
+		struct key_def *def = key_def_new(nKey);
 		addrOpen = sqlite3VdbeAddOp4(v, OP_OpenTEphemeral, iEph,
-					     nKey, 0, (char*)pKeyInfo, P4_KEYINFO);
+					     nKey, 0, (char*)def, P4_KEYDEF);
 	} else {
 		addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
-		sqlite3VdbeSetP4KeyInfo(pParse, pPk);
+		sql_vdbe_set_p4_key_def(pParse, pPk);
 	}
 
 	pWInfo = sqlite3WhereBegin(pParse, pTabList, pWhere, 0, 0,
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 013460f..7f82c3c 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -2243,35 +2243,36 @@ case OP_Permutation: {
  * OPFLAG_PERMUTE bit is clear, then register are compared in sequential
  * order.
  *
- * P4 is a KeyInfo structure that defines collating sequences and sort
+ * P4 is a key_def structure that defines collating sequences and sort
  * orders for the comparison.  The permutation applies to registers
- * only.  The KeyInfo elements are used sequentially.
+ * only.  The key_def elements are used sequentially.
  *
  * The comparison is a sort comparison, so NULLs compare equal,
  * NULLs are less than numbers, numbers are less than strings,
  * and strings are less than blobs.
  */
 case OP_Compare: {
-	int n;
-	int i;
 	int p1;
 	int p2;
-	const KeyInfo *pKeyInfo;
 	int idx;
-	struct coll *pColl;    /* Collating sequence to use on this term */
-	int bRev;          /* True for DESCENDING sort order */
 
-	if ((pOp->p5 & OPFLAG_PERMUTE)==0) aPermute = 0;
-	n = pOp->p3;
-	pKeyInfo = pOp->p4.pKeyInfo;
+	if ((pOp->p5 & OPFLAG_PERMUTE) == 0)
+		aPermute = 0;
+
+	int n = pOp->p3;
+
+	assert(pOp->p4type == P4_KEYDEF);
 	assert(n>0);
-	assert(pKeyInfo!=0);
 	p1 = pOp->p1;
 	p2 = pOp->p2;
+
+	struct key_def *def = pOp->p4.key_def;
 #if SQLITE_DEBUG
 	if (aPermute) {
-		int k, mx = 0;
-		for(k=0; k<n; k++) if (aPermute[k]>mx) mx = aPermute[k];
+		int mx = 0;
+		for(uint32_t k = 0; k < (uint32_t)n; k++)
+			if (aPermute[k] > mx)
+				mx = aPermute[k];
 		assert(p1>0 && p1+mx<=(p->nMem+1 - p->nCursor)+1);
 		assert(p2>0 && p2+mx<=(p->nMem+1 - p->nCursor)+1);
 	} else {
@@ -2279,18 +2280,19 @@ case OP_Compare: {
 		assert(p2>0 && p2+n<=(p->nMem+1 - p->nCursor)+1);
 	}
 #endif /* SQLITE_DEBUG */
-	for(i=0; i<n; i++) {
+	for(int i = 0; i < n; i++) {
 		idx = aPermute ? aPermute[i] : i;
 		assert(memIsValid(&aMem[p1+idx]));
 		assert(memIsValid(&aMem[p2+idx]));
 		REGISTER_TRACE(p1+idx, &aMem[p1+idx]);
 		REGISTER_TRACE(p2+idx, &aMem[p2+idx]);
-		assert(i<pKeyInfo->nField);
-		pColl = pKeyInfo->aColl[i];
-		bRev = pKeyInfo->aSortOrder[i];
-		iCompare = sqlite3MemCompare(&aMem[p1+idx], &aMem[p2+idx], pColl);
+		assert(i < (int)def->part_count);
+		struct coll *coll = def->parts[i].coll;
+		bool is_rev = def->parts[i].sort_order == SORT_ORDER_DESC;
+		iCompare = sqlite3MemCompare(&aMem[p1+idx], &aMem[p2+idx], coll);
 		if (iCompare) {
-			if (bRev) iCompare = -iCompare;
+			if (is_rev)
+				iCompare = -iCompare;
 			break;
 		}
 	}
@@ -3119,8 +3121,8 @@ case OP_SetCookie: {
  * values need not be contiguous but all P1 values should be
  * small integers. It is an error for P1 to be negative.
  *
- * The P4 value may be a pointer to a KeyInfo structure.
- * If it is a pointer to a KeyInfo structure, then said structure
+ * The P4 value may be a pointer to a key_def structure.
+ * If it is a pointer to a key_def structure, then said structure
  * defines the content and collatining sequence of the index
  * being opened. Otherwise, P4 is NULL.
  *
@@ -3208,7 +3210,7 @@ case OP_OpenWrite:
 	pBtCur->index = index;
 	pBtCur->eState = CURSOR_INVALID;
 	/* Key info still contains sorter order and collation. */
-	pCur->pKeyInfo = pOp->p4.pKeyInfo;
+	pCur->key_def = index->def->key_def;
 
 open_cursor_set_hints:
 	assert(OPFLAG_BULKCSR==BTREE_BULKLOAD);
@@ -3223,8 +3225,12 @@ open_cursor_set_hints:
 	break;
 }
 
-/* Opcode: OpenTEphemeral P1 P2 * * *
- * Synopsis: nColumn = P2
+/**
+ * Opcode: OpenTEphemeral P1 P2 * P4 *
+ * Synopsis:
+ * @param P1 index of new cursor to be created
+ * @param P2 number of columns in a new table
+ * @param P4 key def for new table
  *
  * This opcode creates Tarantool's ephemeral table and sets cursor P1 to it.
  */
@@ -3233,21 +3239,21 @@ case OP_OpenTEphemeral: {
 	BtCursor *pBtCur;
 	assert(pOp->p1 >= 0);
 	assert(pOp->p2 > 0);
-	assert(pOp->p4.pKeyInfo != 0);
-	assert(pOp->p4type == P4_KEYINFO);
+	assert(pOp->p4.key_def != NULL);
+	assert(pOp->p4type == P4_KEYDEF);
 
 	pCx = allocateCursor(p, pOp->p1, pOp->p2, CURTYPE_TARANTOOL);
 	if (pCx == 0) goto no_mem;
 	pCx->nullRow = 1;
 
-	pCx->pKeyInfo  = pOp->p4.pKeyInfo;
+	pCx->key_def  = pOp->p4.key_def;
 	pBtCur = pCx->uc.pCursor;
 	/* Ephemeral spaces don't have space_id */
 	pBtCur->eState = CURSOR_INVALID;
 	pBtCur->curFlags = BTCF_TEphemCursor;
 
 	rc = tarantoolSqlite3EphemeralCreate(pCx->uc.pCursor, pOp->p2,
-					     pOp->p4.pKeyInfo->aColl[0]);
+					     pCx->key_def);
 	if (rc) goto abort_due_to_error;
 	break;
 }
@@ -3269,9 +3275,8 @@ case OP_SorterOpen: {
 	assert(pOp->p2>=0);
 	pCx = allocateCursor(p, pOp->p1, pOp->p2, CURTYPE_SORTER);
 	if (pCx==0) goto no_mem;
-	pCx->pKeyInfo = pOp->p4.pKeyInfo;
-	assert(pCx->pKeyInfo->db==db);
-	rc = sqlite3VdbeSorterInit(db, pOp->p3, pCx);
+	pCx->key_def = pOp->p4.key_def;
+	rc = sqlite3VdbeSorterInit(db, pCx);
 	if (rc) goto abort_due_to_error;
 	break;
 }
@@ -3547,7 +3552,7 @@ case OP_SeekGT: {       /* jump, in3 */
 	nField = pOp->p4.i;
 	assert(pOp->p4type==P4_INT32);
 	assert(nField>0);
-	r.pKeyInfo = pC->pKeyInfo;
+	r.key_def = pC->key_def;
 	r.nField = (u16)nField;
 
 	if (reg_ipk > 0) {
@@ -3699,7 +3704,7 @@ case OP_Found: {        /* jump, in3 */
 	assert(pC->eCurType==CURTYPE_TARANTOOL);
 	assert(pC->uc.pCursor!=0);
 	if (pOp->p4.i>0) {
-		r.pKeyInfo = pC->pKeyInfo;
+		r.key_def = pC->key_def;
 		r.nField = (u16)pOp->p4.i;
 		r.aMem = pIn3;
 #ifdef SQLITE_DEBUG
@@ -3712,11 +3717,12 @@ case OP_Found: {        /* jump, in3 */
 		pIdxKey = &r;
 		pFree = 0;
 	} else {
-		pFree = pIdxKey = sqlite3VdbeAllocUnpackedRecord(pC->pKeyInfo);
+		pFree = pIdxKey = sqlite3VdbeAllocUnpackedRecord(db, pC->key_def);
 		if (pIdxKey==0) goto no_mem;
 		assert(pIn3->flags & MEM_Blob );
 		(void)ExpandBlob(pIn3);
-		sqlite3VdbeRecordUnpackMsgpack(pC->pKeyInfo, pIn3->n, pIn3->z, pIdxKey);
+		sqlite3VdbeRecordUnpackMsgpack(db, pC->key_def,
+					       pIn3->n, pIn3->z, pIdxKey);
 	}
 	pIdxKey->default_rc = 0;
 	pIdxKey->opcode = pOp->opcode;
@@ -4492,7 +4498,7 @@ case OP_IdxDelete: {
 	pCrsr = pC->uc.pCursor;
 	assert(pCrsr!=0);
 	assert(pOp->p5==0);
-	r.pKeyInfo = pC->pKeyInfo;
+	r.key_def = pC->key_def;
 	r.nField = (u16)pOp->p3;
 	r.default_rc = 0;
 	r.aMem = &aMem[pOp->p2];
@@ -4576,7 +4582,7 @@ case OP_IdxGE:  {       /* jump */
 	assert(pC->deferredMoveto==0);
 	assert(pOp->p5==0 || pOp->p5==1);
 	assert(pOp->p4type==P4_INT32);
-	r.pKeyInfo = pC->pKeyInfo;
+	r.key_def = pC->key_def;
 	r.nField = (u16)pOp->p4.i;
 	if (pOp->opcode<OP_IdxLT) {
 		assert(pOp->opcode==OP_IdxLE || pOp->opcode==OP_IdxGT);
diff --git a/src/box/sql/vdbe.h b/src/box/sql/vdbe.h
index e244606..4bd48d5 100644
--- a/src/box/sql/vdbe.h
+++ b/src/box/sql/vdbe.h
@@ -77,7 +77,6 @@ struct VdbeOp {
 		struct coll *pColl;	/* Used when p4type is P4_COLLSEQ */
 		Mem *pMem;	/* Used when p4type is P4_MEM */
 		bool b;         /* Used when p4type is P4_BOOL */
-		KeyInfo *pKeyInfo;	/* Used when p4type is P4_KEYINFO */
 		int *ai;	/* Used when p4type is P4_INTARRAY */
 		SubProgram *pProgram;	/* Used when p4type is P4_SUBPROGRAM */
 		Index *pIndex;	/* Used when p4type is P4_INDEX */
@@ -85,6 +84,8 @@ struct VdbeOp {
 		Expr *pExpr;	/* Used when p4type is P4_EXPR */
 #endif
 		int (*xAdvance) (BtCursor *, int *);
+		/* Used when p4type is P4_KEYDEF. */
+		struct key_def *key_def;
 	} p4;
 #ifdef SQLITE_ENABLE_EXPLAIN_COMMENTS
 	char *zComment;		/* Comment to improve readability */
@@ -131,7 +132,6 @@ typedef struct VdbeOpList VdbeOpList;
 #define P4_STATIC   (-2)	/* Pointer to a static string */
 #define P4_COLLSEQ  (-3)	/* P4 is a pointer to a CollSeq structure */
 #define P4_FUNCDEF  (-4)	/* P4 is a pointer to a FuncDef structure */
-#define P4_KEYINFO  (-5)	/* P4 is a pointer to a KeyInfo structure */
 #define P4_EXPR     (-6)	/* P4 is a pointer to an Expr tree */
 #define P4_MEM      (-7)	/* P4 is a pointer to a Mem*    structure */
 #define P4_TRANSIENT  0		/* P4 is a pointer to a transient string */
@@ -147,7 +147,6 @@ typedef struct VdbeOpList VdbeOpList;
 #define P4_PTR      (-18)	/* P4 is a generic pointer */
 #define P4_KEYDEF   (-19)       /* P4 is a pointer to key_def structure. */
 
-
 /* Error message codes for OP_Halt */
 #define P5_ConstraintNotNull 1
 #define P5_ConstraintUnique  2
@@ -226,7 +225,16 @@ int sqlite3VdbeChangeToNoop(Vdbe *, int addr);
 int sqlite3VdbeDeletePriorOpcode(Vdbe *, u8 op);
 void sqlite3VdbeChangeP4(Vdbe *, int addr, const char *zP4, int N);
 void sqlite3VdbeAppendP4(Vdbe *, void *pP4, int p4type);
-void sqlite3VdbeSetP4KeyInfo(Parse *, Index *);
+
+/**
+ * Set the P4 on the most recently added opcode to the key_def for the
+ * index given.
+ * @param Parse context, for error reporting.
+ * @param Index to get key_def from.
+ */
+void
+sql_vdbe_set_p4_key_def(struct Parse *parse, struct Index *index);
+
 VdbeOp *sqlite3VdbeGetOp(Vdbe *, int);
 int sqlite3VdbeMakeLabel(Vdbe *);
 void sqlite3VdbeRunOnlyOnce(Vdbe *);
@@ -257,14 +265,20 @@ char *sqlite3VdbeExpandSql(Vdbe *, const char *);
 #endif
 int sqlite3MemCompare(const Mem *, const Mem *, const struct coll *);
 
-void sqlite3VdbeRecordUnpackMsgpack(KeyInfo *, int, const void *,
-				    UnpackedRecord *);
-int sqlite3VdbeRecordCompare(int, const void *, UnpackedRecord *);
-int sqlite3VdbeRecordCompareWithSkip(int, const void *, UnpackedRecord *, int);
-UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(KeyInfo *);
+void sqlite3VdbeRecordUnpackMsgpack(struct sqlite3 *db,
+				    struct key_def *key_def,
+				    int key_count, const void * msgpack,
+				    UnpackedRecord *dest);
+int sqlite3VdbeRecordCompare(struct sqlite3 *db, int key_count,
+			     const void *key1, UnpackedRecord *key2);
+int sqlite3VdbeRecordCompareWithSkip(struct sqlite3 *db,
+				     int key_count, const void *key1,
+				     struct UnpackedRecord *key2, bool is_skip);
+UnpackedRecord *sqlite3VdbeAllocUnpackedRecord(struct sqlite3 *,
+					       struct key_def *);
 int sql_vdbe_mem_alloc_region(Mem *, uint32_t);
 
-typedef int (*RecordCompare) (int, const void *, UnpackedRecord *);
+typedef int (*RecordCompare) (struct sqlite3 *db, int, const void *, UnpackedRecord *);
 RecordCompare sqlite3VdbeFindCompare(UnpackedRecord *);
 
 #ifndef SQLITE_OMIT_TRIGGER
diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h
index ab9147c..044df10 100644
--- a/src/box/sql/vdbeInt.h
+++ b/src/box/sql/vdbeInt.h
@@ -110,7 +110,8 @@ struct VdbeCursor {
 		int pseudoTableReg;	/* CURTYPE_PSEUDO. Reg holding content. */
 		VdbeSorter *pSorter;	/* CURTYPE_SORTER. Sorter object */
 	} uc;
-	KeyInfo *pKeyInfo;	/* Info about index keys needed by index cursors */
+	/* Info about keys needed by index cursors. */
+	struct key_def *key_def;
 	i16 nField;		/* Number of fields in the header */
 	u16 nHdrParsed;		/* Number of header fields parsed so far */
 	const u8 *aRow;		/* Data for the current row, if all on one page */
@@ -451,7 +452,6 @@ struct PreUpdate {
 	VdbeCursor *pCsr;	/* Cursor to read old values from */
 	int op;			/* One of SQLITE_INSERT, UPDATE, DELETE */
 	u8 *aRecord;		/* old.* database record */
-	KeyInfo keyinfo;
 	UnpackedRecord *pUnpacked;	/* Unpacked version of aRecord[] */
 	UnpackedRecord *pNewUnpacked;	/* Unpacked version of new.* record */
 	int iNewReg;		/* Register for new.* values */
@@ -527,7 +527,7 @@ void sqlite3VdbePreUpdateHook(Vdbe *, VdbeCursor *, int, const char *, Table *,
 #endif
 int sqlite3VdbeTransferError(Vdbe * p);
 
-int sqlite3VdbeSorterInit(sqlite3 *, int, VdbeCursor *);
+int sqlite3VdbeSorterInit(struct sqlite3 *db, struct VdbeCursor *cursor);
 void sqlite3VdbeSorterReset(sqlite3 *, VdbeSorter *);
 void sqlite3VdbeSorterClose(sqlite3 *, VdbeCursor *);
 int sqlite3VdbeSorterRowkey(const VdbeCursor *, Mem *);
@@ -564,9 +564,10 @@ int sqlite3VdbeMemExpandBlob(Mem *);
 
 i64 sqlite3VdbeMsgpackRecordLen(Mem * pMem, u32 n);
 u32 sqlite3VdbeMsgpackRecordPut(u8 * pBuf, Mem * pMem, u32 n);
-int sqlite3VdbeCompareMsgpack(const char **pKey1,
-			      UnpackedRecord * pUnpacked, int iKey2);
-int sqlite3VdbeRecordCompareMsgpack(int nKey1, const void *pKey1,
+int sqlite3VdbeCompareMsgpack(struct sqlite3 *db,
+			      const char **key1,
+			      UnpackedRecord *pUnpacked, int iKey2);
+int sqlite3VdbeRecordCompareMsgpack(struct sqlite3 *db, int nKey1, const void *pKey1,
 				    UnpackedRecord * pPKey2);
 u32 sqlite3VdbeMsgpackGet(const unsigned char *buf, Mem * pMem);
 
diff --git a/src/box/sql/vdbeapi.c b/src/box/sql/vdbeapi.c
index 6e41859..c781fd4 100644
--- a/src/box/sql/vdbeapi.c
+++ b/src/box/sql/vdbeapi.c
@@ -1581,103 +1581,6 @@ sqlite3_expanded_sql(sqlite3_stmt * pStmt)
 #endif
 }
 
-#ifdef SQLITE_ENABLE_PREUPDATE_HOOK
-/*
- * Allocate and populate an UnpackedRecord structure based on the serialized
- * record in nKey/pKey. Return a pointer to the new UnpackedRecord structure
- * if successful, or a NULL pointer if an OOM error is encountered.
- */
-static UnpackedRecord *
-vdbeUnpackRecord(KeyInfo * pKeyInfo, int nKey, const void *pKey)
-{
-	UnpackedRecord *pRet;	/* Return value */
-
-	pRet = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
-	if (pRet) {
-		memset(pRet->aMem, 0, sizeof(Mem) * (pKeyInfo->nField + 1));
-		sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, pRet);
-	}
-	return pRet;
-}
-
-/*
- * This function is called from within a pre-update callback to retrieve
- * a field of the row currently being updated or deleted.
- */
-int
-sqlite3_preupdate_old(sqlite3 * db, int iIdx, sqlite3_value ** ppValue)
-{
-	PreUpdate *p = db->pPreUpdate;
-	int rc = SQLITE_OK;
-
-	/* Test that this call is being made from within an SQLITE_DELETE or
-	 * SQLITE_UPDATE pre-update callback, and that iIdx is within range.
-	 */
-	if (!p || p->op == SQLITE_INSERT) {
-		rc = SQLITE_MISUSE_BKPT;
-		goto preupdate_old_out;
-	}
-	if (iIdx >= p->pCsr->nField || iIdx < 0) {
-		rc = SQLITE_RANGE;
-		goto preupdate_old_out;
-	}
-
-	/* If the old.* record has not yet been loaded into memory, do so now. */
-	if (p->pUnpacked == 0) {
-		u32 nRec;
-		u8 *aRec;
-
-		nRec = sqlite3BtreePayloadSize(p->pCsr->uc.pCursor);
-		aRec = sqlite3DbMallocRaw(db, nRec);
-		if (!aRec)
-			goto preupdate_old_out;
-		rc = sqlite3BtreePayload(p->pCsr->uc.pCursor, 0, nRec, aRec);
-		if (rc == SQLITE_OK) {
-			p->pUnpacked =
-			    vdbeUnpackRecord(&p->keyinfo, nRec, aRec);
-			if (!p->pUnpacked)
-				rc = SQLITE_NOMEM;
-		}
-		if (rc != SQLITE_OK) {
-			sqlite3DbFree(db, aRec);
-			goto preupdate_old_out;
-		}
-		p->aRecord = aRec;
-	}
-
-	if (iIdx >= p->pUnpacked->nField) {
-		*ppValue = (sqlite3_value *) columnNullValue();
-	} else {
-		Mem *pMem = *ppValue = &p->pUnpacked->aMem[iIdx];
-		*ppValue = &p->pUnpacked->aMem[iIdx];
-		if (iIdx == p->pTab->iPKey) {
-			sqlite3VdbeMemSetInt64(pMem, p->iKey1);
-		} else if (p->pTab->aCol[iIdx].affinity == SQLITE_AFF_REAL) {
-			if (pMem->flags & MEM_Int) {
-				sqlite3VdbeMemRealify(pMem);
-			}
-		}
-	}
-
- preupdate_old_out:
-	sqlite3Error(db, rc);
-	return sqlite3ApiExit(db, rc);
-}
-#endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
-
-#ifdef SQLITE_ENABLE_PREUPDATE_HOOK
-/*
- * This function is called from within a pre-update callback to retrieve
- * the number of columns in the row being updated, deleted or inserted.
- */
-int
-sqlite3_preupdate_count(sqlite3 * db)
-{
-	PreUpdate *p = db->pPreUpdate;
-	return (p ? p->keyinfo.nField : 0);
-}
-#endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
-
 #ifdef SQLITE_ENABLE_PREUPDATE_HOOK
 /*
  * This function is designed to be called from within a pre-update callback
@@ -1698,92 +1601,6 @@ sqlite3_preupdate_depth(sqlite3 * db)
 }
 #endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
 
-#ifdef SQLITE_ENABLE_PREUPDATE_HOOK
-/*
- * This function is called from within a pre-update callback to retrieve
- * a field of the row currently being updated or inserted.
- */
-int
-sqlite3_preupdate_new(sqlite3 * db, int iIdx, sqlite3_value ** ppValue)
-{
-	PreUpdate *p = db->pPreUpdate;
-	int rc = SQLITE_OK;
-	Mem *pMem;
-
-	if (!p || p->op == SQLITE_DELETE) {
-		rc = SQLITE_MISUSE_BKPT;
-		goto preupdate_new_out;
-	}
-	if (iIdx >= p->pCsr->nField || iIdx < 0) {
-		rc = SQLITE_RANGE;
-		goto preupdate_new_out;
-	}
-
-	if (p->op == SQLITE_INSERT) {
-		/* For an INSERT, memory cell p->iNewReg contains the serialized record
-		 * that is being inserted. Deserialize it.
-		 */
-		UnpackedRecord *pUnpack = p->pNewUnpacked;
-		if (!pUnpack) {
-			Mem *pData = &p->v->aMem[p->iNewReg];
-			rc = ExpandBlob(pData);
-			if (rc != SQLITE_OK)
-				goto preupdate_new_out;
-			pUnpack =
-			    vdbeUnpackRecord(&p->keyinfo, pData->n, pData->z);
-			if (!pUnpack) {
-				rc = SQLITE_NOMEM;
-				goto preupdate_new_out;
-			}
-			p->pNewUnpacked = pUnpack;
-		}
-		if (iIdx >= pUnpack->nField) {
-			pMem = (sqlite3_value *) columnNullValue();
-		} else {
-			pMem = &pUnpack->aMem[iIdx];
-			if (iIdx == p->pTab->iPKey) {
-				sqlite3VdbeMemSetInt64(pMem, p->iKey2);
-			}
-		}
-	} else {
-		/* For an UPDATE, memory cell (p->iNewReg+1+iIdx) contains the required
-		 * value. Make a copy of the cell contents and return a pointer to it.
-		 * It is not safe to return a pointer to the memory cell itself as the
-		 * caller may modify the value text encoding.
-		 */
-		assert(p->op == SQLITE_UPDATE);
-		if (!p->aNew) {
-			p->aNew =
-			    (Mem *) sqlite3DbMallocZero(db,
-							sizeof(Mem) *
-							p->pCsr->nField);
-			if (!p->aNew) {
-				rc = SQLITE_NOMEM;
-				goto preupdate_new_out;
-			}
-		}
-		assert(iIdx >= 0 && iIdx < p->pCsr->nField);
-		pMem = &p->aNew[iIdx];
-		if (pMem->flags == 0) {
-			if (iIdx == p->pTab->iPKey) {
-				sqlite3VdbeMemSetInt64(pMem, p->iKey2);
-			} else {
-				rc = sqlite3VdbeMemCopy(pMem,
-							&p->v->aMem[p->iNewReg +
-								    1 + iIdx]);
-				if (rc != SQLITE_OK)
-					goto preupdate_new_out;
-			}
-		}
-	}
-	*ppValue = pMem;
-
- preupdate_new_out:
-	sqlite3Error(db, rc);
-	return sqlite3ApiExit(db, rc);
-}
-#endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
-
 #ifdef SQLITE_ENABLE_STMT_SCANSTATUS
 /*
  * Return status data for a single loop within query pStmt.
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index b3998ea..c2352e7 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -948,11 +948,9 @@ freeP4(sqlite3 * db, int p4type, void *p4)
 			sqlite3DbFree(db, p4);
 			break;
 		}
-	case P4_KEYINFO:{
-			if (db->pnBytesFreed == 0)
-				sqlite3KeyInfoUnref((KeyInfo *) p4);
-			break;
-		}
+	case P4_KEYDEF:
+		free(p4);
+		break;
 #ifdef SQLITE_ENABLE_CURSOR_HINTS
 	case P4_EXPR:{
 			sqlite3ExprDelete(db, (Expr *) p4);
@@ -1140,20 +1138,19 @@ sqlite3VdbeAppendP4(Vdbe * p, void *pP4, int n)
 	}
 }
 
-/*
- * Set the P4 on the most recently added opcode to the KeyInfo for the
- * index given.
- */
 void
-sqlite3VdbeSetP4KeyInfo(Parse * pParse, Index * pIdx)
-{
-	Vdbe *v = pParse->pVdbe;
-	KeyInfo *pKeyInfo;
-	assert(v != 0);
-	assert(pIdx != 0);
-	pKeyInfo = sqlite3KeyInfoOfIndex(pParse, pParse->db, pIdx);
-	if (pKeyInfo)
-		sqlite3VdbeAppendP4(v, pKeyInfo, P4_KEYINFO);
+sql_vdbe_set_p4_key_def(struct Parse *parse, struct Index *idx)
+{
+	struct Vdbe *v = parse->pVdbe;
+	assert(v != NULL);
+	assert(idx != NULL);
+	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
+	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
+	struct space *space = space_by_id(space_id);
+	assert(space != NULL);
+	struct index *index = index_find(space, index_id);
+	assert(index != NULL);
+	sqlite3VdbeAppendP4(v, key_def_dup(index->def->key_def), P4_KEYDEF);
 }
 
 #ifdef SQLITE_ENABLE_EXPLAIN_COMMENTS
@@ -1512,26 +1509,28 @@ displayP4(Op * pOp, char *zTemp, int nTemp)
 	assert(nTemp >= 20);
 	sqlite3StrAccumInit(&x, 0, zTemp, nTemp, 0);
 	switch (pOp->p4type) {
-	case P4_KEYINFO:{
-			int j;
-			KeyInfo *pKeyInfo;
+	case P4_KEYDEF:{
+			struct key_def *def;
 
-			if (pOp->p4.pKeyInfo == NULL) {
+			if (pOp->p4.key_def == NULL) {
 				sqlite3XPrintf(&x, "k[NULL]");
 			} else {
-				pKeyInfo = pOp->p4.pKeyInfo;
-				assert(pKeyInfo->aSortOrder != 0);
-				sqlite3XPrintf(&x, "k(%d", pKeyInfo->nField);
-				for (j = 0; j < pKeyInfo->nField; j++) {
-					struct coll *pColl = pKeyInfo->aColl[j];
-					const char *zColl =
-					    pColl ? pColl->name : "";
-					if (strcmp(zColl, "BINARY") == 0)
-						zColl = "B";
+				def = pOp->p4.key_def;
+				sqlite3XPrintf(&x, "k(%d", def->part_count);
+				for (int j = 0; j < (int)def->part_count; j++) {
+					struct coll *coll = def->parts[j].coll;
+					const char *coll_str =
+					    coll != NULL ? coll->name : "";
+					if (strcmp(coll_str, "BINARY") == 0)
+						coll_str = "B";
+					const char *sort_order = "";
+					if (def->parts[j].sort_order ==
+					    SORT_ORDER_DESC) {
+						sort_order = "-";
+					}
 					sqlite3XPrintf(&x, ",%s%s",
-						       pKeyInfo->
-						       aSortOrder[j] ? "-" : "",
-						       zColl);
+						       sort_order,
+						       coll_str);
 				}
 				sqlite3StrAccumAppend(&x, ")", 1);
 			}
@@ -3491,7 +3490,7 @@ sqlite3VdbeSerialGet(const unsigned char *buf,	/* Buffer to deserialize from */
 /*
  * This routine is used to allocate sufficient space for an UnpackedRecord
  * structure large enough to be used with sqlite3VdbeRecordUnpack() if
- * the first argument is a pointer to KeyInfo structure pKeyInfo.
+ * the first argument is a pointer to key_def structure.
  *
  * The space is either allocated using sqlite3DbMallocRaw() or from within
  * the unaligned buffer passed via the second and third arguments (presumably
@@ -3503,20 +3502,19 @@ sqlite3VdbeSerialGet(const unsigned char *buf,	/* Buffer to deserialize from */
  * If an OOM error occurs, NULL is returned.
  */
 UnpackedRecord *
-sqlite3VdbeAllocUnpackedRecord(KeyInfo * pKeyInfo)
+sqlite3VdbeAllocUnpackedRecord(struct sqlite3 *db, struct key_def *key_def)
 {
 	UnpackedRecord *p;	/* Unpacked record to return */
 	int nByte;		/* Number of bytes required for *p */
 	nByte =
-	    ROUND8(sizeof(UnpackedRecord)) + sizeof(Mem) * (pKeyInfo->nField +
+	    ROUND8(sizeof(UnpackedRecord)) + sizeof(Mem) * (key_def->part_count +
 							    1);
-	p = (UnpackedRecord *) sqlite3DbMallocRaw(pKeyInfo->db, nByte);
+	p = (UnpackedRecord *) sqlite3DbMallocRaw(db, nByte);
 	if (!p)
 		return 0;
 	p->aMem = (Mem *) & ((char *)p)[ROUND8(sizeof(UnpackedRecord))];
-	assert(pKeyInfo->aSortOrder != 0);
-	p->pKeyInfo = pKeyInfo;
-	p->nField = pKeyInfo->nField + 1;
+	p->key_def = key_def;
+	p->nField = key_def->part_count + 1;
 	return p;
 }
 
@@ -3545,7 +3543,8 @@ sql_vdbe_mem_alloc_region(Mem *vdbe_mem, uint32_t size)
  * Return false if there is a disagreement.
  */
 static int
-vdbeRecordCompareDebug(int nKey1, const void *pKey1,	/* Left key */
+vdbeRecordCompareDebug(struct sqlite3 *db,
+		       int nKey1, const void *pKey1,	/* Left key */
 		       const UnpackedRecord * pPKey2,	/* Right key */
 		       int desiredResult)		/* Correct answer */
 {
@@ -3555,13 +3554,11 @@ vdbeRecordCompareDebug(int nKey1, const void *pKey1,	/* Left key */
 	int i = 0;
 	int rc = 0;
 	const unsigned char *aKey1 = (const unsigned char *)pKey1;
-	KeyInfo *pKeyInfo;
+	struct key_def *key_def;
 	Mem mem1;
 
-	pKeyInfo = pPKey2->pKeyInfo;
-	if (pKeyInfo->db == 0)
-		return 1;
-	mem1.db = pKeyInfo->db;
+	key_def = pPKey2->key_def;
+	mem1.db = db;
 	/* mem1.flags = 0;  // Will be initialized by sqlite3VdbeSerialGet() */
 	VVA_ONLY(mem1.szMalloc = 0;
 	    )
@@ -3579,10 +3576,7 @@ vdbeRecordCompareDebug(int nKey1, const void *pKey1,	/* Left key */
 	if (szHdr1 > 98307)
 		return SQLITE_CORRUPT;
 	d1 = szHdr1;
-	assert(pKeyInfo->nField + pKeyInfo->nXField >= pPKey2->nField
-	       || CORRUPT_DB);
-	assert(pKeyInfo->aSortOrder != 0);
-	assert(pKeyInfo->nField > 0);
+	assert(key_def->part_count > 0);
 	assert(idx1 <= szHdr1 || CORRUPT_DB);
 	do {
 		u32 serial_type1;
@@ -3609,10 +3603,10 @@ vdbeRecordCompareDebug(int nKey1, const void *pKey1,	/* Left key */
 		/* Do the comparison
 		 */
 		rc = sqlite3MemCompare(&mem1, &pPKey2->aMem[i],
-				       pKeyInfo->aColl[i]);
+				       key_def->parts[i].coll);
 		if (rc != 0) {
 			assert(mem1.szMalloc == 0);	/* See comment below */
-			if (pKeyInfo->aSortOrder[i]) {
+			if (key_def->parts[i].sort_order != SORT_ORDER_ASC) {
 				rc = -rc;	/* Invert the result for DESC sort order. */
 			}
 			goto debugCompareEnd;
@@ -3641,7 +3635,7 @@ vdbeRecordCompareDebug(int nKey1, const void *pKey1,	/* Left key */
 		return 1;
 	if (CORRUPT_DB)
 		return 1;
-	if (pKeyInfo->db->mallocFailed)
+	if (db->mallocFailed)
 		return 1;
 	return 0;
 }
@@ -3917,12 +3911,13 @@ vdbeRecordDecodeInt(u32 serial_type, const u8 * aKey)
  * If database corruption is discovered, set pPKey2->errCode to
  * SQLITE_CORRUPT and return 0. If an OOM error is encountered,
  * pPKey2->errCode is set to SQLITE_NOMEM and, if it is not NULL, the
- * malloc-failed flag set on database handle (pPKey2->pKeyInfo->db).
+ * malloc-failed flag set on database handle.
  */
 int
-sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
+sqlite3VdbeRecordCompareWithSkip(struct sqlite3 *db,
+				 int nKey1, const void *pKey1,	/* Left key */
 				 UnpackedRecord * pPKey2,	/* Right key */
-				 int bSkip)			/* If true, skip the first field */
+				 bool bSkip)			/* If true, skip the first field */
 {
 	u32 d1;			/* Offset into aKey[] of next data element */
 	int i;			/* Index of next field to compare */
@@ -3930,7 +3925,7 @@ sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
 	u32 idx1;		/* Offset of first type in header */
 	int rc = 0;		/* Return value */
 	Mem *pRhs = pPKey2->aMem;	/* Next field of pPKey2 to compare */
-	KeyInfo *pKeyInfo = pPKey2->pKeyInfo;
+	struct key_def *key_def = pPKey2->key_def;
 	const unsigned char *aKey1 = (const unsigned char *)pKey1;
 	Mem mem1;
 
@@ -3957,10 +3952,7 @@ sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
 
 	VVA_ONLY(mem1.szMalloc = 0;
 	    )			/* Only needed by assert() statements */
-	    assert(pPKey2->pKeyInfo->nField + pPKey2->pKeyInfo->nXField >=
-		   pPKey2->nField || CORRUPT_DB);
-	assert(pPKey2->pKeyInfo->aSortOrder != 0);
-	assert(pPKey2->pKeyInfo->nField > 0);
+	assert(pPKey2->key_def->part_count > 0);
 	assert(idx1 <= szHdr1 || CORRUPT_DB);
 	do {
 		u32 serial_type;
@@ -4035,13 +4027,14 @@ sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
 					pPKey2->errCode =
 					    (u8) SQLITE_CORRUPT_BKPT;
 					return 0;	/* Corruption */
-				} else if (pKeyInfo->aColl[i]) {
-					mem1.db = pKeyInfo->db;
+				} else if (key_def->parts[i].coll !=NULL) {
+					mem1.db = db;
 					mem1.flags = MEM_Str;
 					mem1.z = (char *)&aKey1[d1];
+					struct coll* coll;
+					coll = key_def->parts[i].coll;
 					rc = vdbeCompareMemString(&mem1, pRhs,
-								  pKeyInfo->
-								  aColl[i],
+								  coll,
 								  &pPKey2->
 								  errCode);
 				} else {
@@ -4091,11 +4084,10 @@ sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
 		}
 
 		if (rc != 0) {
-			if (pKeyInfo->aSortOrder[i]) {
+			if (key_def->parts[i].sort_order != SORT_ORDER_ASC)
 				rc = -rc;
-			}
 			assert(vdbeRecordCompareDebug
-			       (nKey1, pKey1, pPKey2, rc));
+			       (db, nKey1, pKey1, pPKey2, rc));
 			assert(mem1.szMalloc == 0);	/* See comment below */
 			return rc;
 		}
@@ -4118,18 +4110,19 @@ sqlite3VdbeRecordCompareWithSkip(int nKey1, const void *pKey1,	/* Left key */
 	 * value.
 	 */
 	assert(CORRUPT_DB
-	       || vdbeRecordCompareDebug(nKey1, pKey1, pPKey2,
+	       || vdbeRecordCompareDebug(db, nKey1, pKey1, pPKey2,
 					 pPKey2->default_rc)
-	       || pKeyInfo->db->mallocFailed);
+	       || db->mallocFailed);
 	pPKey2->eqSeen = 1;
 	return pPKey2->default_rc;
 }
 
 int
-sqlite3VdbeRecordCompare(int nKey1, const void *pKey1,	/* Left key */
-			 UnpackedRecord * pPKey2)	/* Right key */
+sqlite3VdbeRecordCompare(struct sqlite3 *db,
+			 int key_count, const void *key1,	/* Left key */
+			 UnpackedRecord *key2)	/* Right key */
 {
-	return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, pPKey2, 0);
+	return sqlite3VdbeRecordCompareWithSkip(db, key_count, key1, key2, false);
 }
 
 /*
@@ -4164,7 +4157,7 @@ sqlite3VdbeIdxKeyCompare(sqlite3 * db,			/* Database connection */
 	assert(sqlite3CursorIsValid(pCur));
 	if (pCur->curFlags & BTCF_TaCursor ||
 	    pCur->curFlags & BTCF_TEphemCursor) {
-		return tarantoolSqlite3IdxKeyCompare(pCur, pUnpacked, res);
+		return tarantoolSqlite3IdxKeyCompare(db, pCur, pUnpacked, res);
 	}
 	unreachable();
 	return SQLITE_OK;
@@ -4286,68 +4279,6 @@ vdbeFreeUnpacked(sqlite3 * db, UnpackedRecord * p)
 }
 #endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
 
-#ifdef SQLITE_ENABLE_PREUPDATE_HOOK
-/*
- * Invoke the pre-update hook. If this is an UPDATE or DELETE pre-update call,
- * then cursor passed as the second argument should point to the row about
- * to be update or deleted. If the application calls sqlite3_preupdate_old(),
- * the required value will be read from the row the cursor points to.
- */
-void
-sqlite3VdbePreUpdateHook(Vdbe * v,		/* Vdbe pre-update hook is invoked by */
-			 VdbeCursor * pCsr,	/* Cursor to grab old.* values from */
-			 int op,		/* SQLITE_INSERT, UPDATE or DELETE */
-			 Table * pTab,		/* Modified table */
-			 i64 iKey1,		/* Initial key value */
-			 int iReg)		/* Register for new.* record */
-{
-	sqlite3 *db = v->db;
-	i64 iKey2;
-	PreUpdate preupdate;
-	const char *zTbl = pTab->zName;
-	static const u8 fakeSortOrder = 0;
-
-	assert(db->pPreUpdate == 0);
-	memset(&preupdate, 0, sizeof(PreUpdate));
-	if (op == SQLITE_UPDATE) {
-		iKey2 = v->aMem[iReg].u.i;
-	} else {
-		iKey2 = iKey1;
-	}
-
-	assert(pCsr->nField == pTab->nCol
-	       || (pCsr->nField == pTab->nCol + 1 && op == SQLITE_DELETE
-		   && iReg == -1)
-	    );
-
-	preupdate.v = v;
-	preupdate.pCsr = pCsr;
-	preupdate.op = op;
-	preupdate.iNewReg = iReg;
-	preupdate.keyinfo.db = db;
-	preupdate.keyinfo.nField = pTab->nCol;
-	preupdate.keyinfo.aSortOrder = (u8 *) & fakeSortOrder;
-	preupdate.iKey1 = iKey1;
-	preupdate.iKey2 = iKey2;
-	preupdate.pTab = pTab;
-
-	db->pPreUpdate = &preupdate;
-	db->xPreUpdateCallback(db->pPreUpdateArg, db, op, zTbl, iKey1,
-			       iKey2);
-	db->pPreUpdate = 0;
-	sqlite3DbFree(db, preupdate.aRecord);
-	vdbeFreeUnpacked(db, preupdate.pUnpacked);
-	vdbeFreeUnpacked(db, preupdate.pNewUnpacked);
-	if (preupdate.aNew) {
-		int i;
-		for (i = 0; i < pCsr->nField; i++) {
-			sqlite3VdbeMemRelease(&preupdate.aNew[i]);
-		}
-		sqlite3DbFree(db, preupdate.aNew);
-	}
-}
-#endif				/* SQLITE_ENABLE_PREUPDATE_HOOK */
-
 i64
 sqlite3VdbeMsgpackRecordLen(Mem * pRec, u32 n)
 {
@@ -4420,7 +4351,7 @@ sqlite3VdbeMsgpackRecordPut(u8 * pBuf, Mem * pRec, u32 n)
 }
 
 int
-sqlite3VdbeCompareMsgpack(const char **pKey1,
+sqlite3VdbeCompareMsgpack(struct sqlite3 *db, const char **pKey1,
 			  UnpackedRecord * pUnpacked, int iKey2)
 {
 	const char *aKey1 = *pKey1;
@@ -4493,16 +4424,17 @@ sqlite3VdbeCompareMsgpack(const char **pKey1,
 		}
 	case MP_STR:{
 			if (pKey2->flags & MEM_Str) {
-				KeyInfo *pKeyInfo = pUnpacked->pKeyInfo;
+				struct key_def *key_def = pUnpacked->key_def;
 				mem1.n = mp_decode_strl(&aKey1);
 				mem1.z = (char *)aKey1;
 				aKey1 += mem1.n;
-				if (pKeyInfo->aColl[iKey2]) {
-					mem1.db = pKeyInfo->db;
+				struct coll *coll;
+				coll = key_def->parts[iKey2].coll;
+				if (coll != NULL) {
+					mem1.db = db;
 					mem1.flags = MEM_Str;
 					rc = vdbeCompareMemString(&mem1, pKey2,
-								  pKeyInfo->
-								  aColl[iKey2],
+								  coll,
 								  &pUnpacked->
 								  errCode);
 				} else {
@@ -4553,29 +4485,30 @@ sqlite3VdbeCompareMsgpack(const char **pKey1,
 }
 
 int
-sqlite3VdbeRecordCompareMsgpack(int nKey1, const void *pKey1,	/* Left key */
-				UnpackedRecord * pPKey2)	/* Right key */
+sqlite3VdbeRecordCompareMsgpack(struct sqlite3 *db,
+				int key_count, const void *key1,	/* Left key */
+				UnpackedRecord * key2)	/* Right key */
 {
-	(void)nKey1;		/* assume valid data */
+	(void)key_count;		/* assume valid data */
 
 	int rc = 0;		/* Return value */
-	const char *aKey1 = (const char *)pKey1;
-	u32 i, n = mp_decode_array(&aKey1);
+	u32 i, n = mp_decode_array((const char**)&key1);
 
-	n = MIN(n, pPKey2->nField);
+	n = MIN(n, key2->nField);
 
 	for (i = 0; i != n; i++) {
-		rc = sqlite3VdbeCompareMsgpack(&aKey1, pPKey2, i);
+		rc = sqlite3VdbeCompareMsgpack(db, (const char**)&key1, key2, i);
 		if (rc != 0) {
-			if (pPKey2->pKeyInfo->aSortOrder[i]) {
+			if (key2->key_def->parts[i].sort_order !=
+			    SORT_ORDER_ASC) {
 				rc = -rc;
 			}
 			return rc;
 		}
 	}
 
-	pPKey2->eqSeen = 1;
-	return pPKey2->default_rc;
+	key2->eqSeen = 1;
+	return key2->default_rc;
 }
 
 u32
@@ -4654,7 +4587,8 @@ sqlite3VdbeMsgpackGet(const unsigned char *buf,	/* Buffer to deserialize from */
 }
 
 void
-sqlite3VdbeRecordUnpackMsgpack(KeyInfo * pKeyInfo,	/* Information about the record format */
+sqlite3VdbeRecordUnpackMsgpack(struct sqlite3 *db,
+			       struct key_def *key_def,	/* Information about the record format */
 			       int nKey,		/* Size of the binary record */
 			       const void *pKey,	/* The binary record */
 			       UnpackedRecord * p)	/* Populate this structure before returning. */
@@ -4664,10 +4598,11 @@ sqlite3VdbeRecordUnpackMsgpack(KeyInfo * pKeyInfo,	/* Information about the reco
 	(void)nKey;
 	Mem *pMem = p->aMem;
 	n = mp_decode_array(&zParse);
-	n = p->nField = MIN(n, pKeyInfo->nField);
+	n = p->nField = MIN(n, key_def->part_count);
 	p->default_rc = 0;
+	p->key_def = key_def;
 	while (n--) {
-		pMem->db = pKeyInfo->db;
+		pMem->db = db;
 		/* pMem->flags = 0; // sqlite3VdbeSerialGet() will set this for us */
 		pMem->szMalloc = 0;
 		pMem->z = 0;
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 9dd254f..93f451d 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1089,14 +1089,13 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
 			int i;	/* Counter variable */
 			int nCol = index_column_count(pIdx);
 
-			nByte =
-			    sizeof(Mem) * nCol + ROUND8(sizeof(UnpackedRecord));
+			nByte = sizeof(Mem) * nCol +
+				ROUND8(sizeof(UnpackedRecord));
 			pRec =
 			    (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
-			if (pRec) {
-				pRec->pKeyInfo =
-				    sqlite3KeyInfoOfIndex(p->pParse, db, pIdx);
-				if (pRec->pKeyInfo) {
+			if (pRec != NULL) {
+				pRec->key_def = sql_index_key_def(pIdx);
+				if (pRec->key_def != NULL) {
 					pRec->aMem =
 					    (Mem *) ((u8 *) pRec +
 						     ROUND8(sizeof
@@ -1655,13 +1654,12 @@ sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
 {
 	if (pRec) {
 		int i;
-		int nCol = pRec->pKeyInfo->nField;
+		int nCol = pRec->key_def->part_count;
 		Mem *aMem = pRec->aMem;
 		sqlite3 *db = aMem[0].db;
 		for (i = 0; i < nCol; i++) {
 			sqlite3VdbeMemRelease(&aMem[i]);
 		}
-		sqlite3KeyInfoUnref(pRec->pKeyInfo);
 		sqlite3DbFree(db, pRec);
 	}
 }
diff --git a/src/box/sql/vdbesort.c b/src/box/sql/vdbesort.c
index be3cc4c..9651fae 100644
--- a/src/box/sql/vdbesort.c
+++ b/src/box/sql/vdbesort.c
@@ -343,7 +343,7 @@ struct VdbeSorter {
 	PmaReader *pReader;	/* Readr data from here after Rewind() */
 	MergeEngine *pMerger;	/* Or here, if bUseThreads==0 */
 	sqlite3 *db;		/* Database connection */
-	KeyInfo *pKeyInfo;	/* How to compare records */
+	struct key_def *key_def;
 	UnpackedRecord *pUnpacked;	/* Used by VdbeSorterCompare() */
 	SorterList list;	/* List of in-memory records */
 	int iMemory;		/* Offset of free space in list.aMemory */
@@ -796,7 +796,7 @@ vdbePmaReaderInit(SortSubtask * pTask,	/* Task context */
 
 /*
  * Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
- * size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
+ * size nKey2 bytes). Use (pTask->key_def) for the collation sequences
  * used by the comparison. Return the result of the comparison.
  *
  * If IN/OUT parameter *pbKey2Cached is true when this function is called,
@@ -808,7 +808,7 @@ vdbePmaReaderInit(SortSubtask * pTask,	/* Task context */
  * to SQLITE_NOMEM.
  */
 static int
-vdbeSorterCompare(SortSubtask * pTask,	/* Subtask context (for pKeyInfo) */
+vdbeSorterCompare(SortSubtask * pTask,	/* Subtask context (for key_def) */
 		  int *pbKey2Cached,	/* True if pTask->pUnpacked is pKey2 */
 		  const void *pKey1, int nKey1,	/* Left side of comparison */
 		  const void *pKey2, int nKey2	/* Right side of comparison */
@@ -816,17 +816,19 @@ vdbeSorterCompare(SortSubtask * pTask,	/* Subtask context (for pKeyInfo) */
 {
 	UnpackedRecord *r2 = pTask->pUnpacked;
 	if (!*pbKey2Cached) {
-		sqlite3VdbeRecordUnpackMsgpack(pTask->pSorter->pKeyInfo, nKey2,
-					       pKey2, r2);
+		sqlite3VdbeRecordUnpackMsgpack(pTask->pSorter->db,
+					       pTask->pSorter->key_def,
+					       nKey2, pKey2, r2);
 		*pbKey2Cached = 1;
 	}
-	return sqlite3VdbeRecordCompareMsgpack(nKey1, pKey1, r2);
+	return sqlite3VdbeRecordCompareMsgpack(pTask->pSorter->db,
+					       nKey1, pKey1, r2);
 }
 
 /*
  * Initialize the temporary index cursor just opened as a sorter cursor.
  *
- * Usually, the sorter module uses the value of (pCsr->pKeyInfo->nField)
+ * Usually, the sorter module uses the value of (pCsr->key_def->part_count)
  * to determine the number of fields that should be compared from the
  * records being sorted. However, if the value passed as argument nField
  * is non-zero and the sorter is able to guarantee a stable sort, nField
@@ -844,16 +846,12 @@ vdbeSorterCompare(SortSubtask * pTask,	/* Subtask context (for pKeyInfo) */
  */
 int
 sqlite3VdbeSorterInit(sqlite3 * db,	/* Database connection (for malloc()) */
-		      int nField,	/* Number of key fields in each record */
 		      VdbeCursor * pCsr	/* Cursor that holds the new sorter */
     )
 {
 	int pgsz;		/* Page size of main database */
 	int i;			/* Used to iterate through aTask[] */
 	VdbeSorter *pSorter;	/* The new sorter */
-	KeyInfo *pKeyInfo;	/* Copy of pCsr->pKeyInfo with db==0 */
-	int szKeyInfo;		/* Size of pCsr->pKeyInfo in bytes */
-	int sz;			/* Size of pSorter in bytes */
 	int rc = SQLITE_OK;
 #if SQLITE_MAX_WORKER_THREADS==0
 #define nWorker 0
@@ -875,25 +873,15 @@ sqlite3VdbeSorterInit(sqlite3 * db,	/* Database connection (for malloc()) */
 	}
 #endif
 
-	assert(pCsr->pKeyInfo);
+	assert(pCsr->key_def != NULL);
 	assert(pCsr->eCurType == CURTYPE_SORTER);
-	szKeyInfo =
-	    sizeof(KeyInfo) + (pCsr->pKeyInfo->nField - 1) * sizeof(struct coll *);
-	sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
 
-	pSorter = (VdbeSorter *) sqlite3DbMallocZero(db, sz + szKeyInfo);
+	pSorter = (VdbeSorter *) sqlite3DbMallocZero(db, sizeof(VdbeSorter));
 	pCsr->uc.pSorter = pSorter;
 	if (pSorter == 0) {
 		rc = SQLITE_NOMEM_BKPT;
 	} else {
-		pSorter->pKeyInfo = pKeyInfo =
-		    (KeyInfo *) ((u8 *) pSorter + sz);
-		memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
-		pKeyInfo->db = 0;
-		if (nField && nWorker == 0) {
-			pKeyInfo->nXField += (pKeyInfo->nField - nField);
-			pKeyInfo->nField = nField;
-		}
+		pSorter->key_def = pCsr->key_def;
 		pSorter->pgsz = pgsz = 1024;
 		pSorter->nTask = nWorker + 1;
 		pSorter->iPrev = (u8) (nWorker - 1);
@@ -929,8 +917,8 @@ sqlite3VdbeSorterInit(sqlite3 * db,	/* Database connection (for malloc()) */
 			}
 		}
 
-		if ((pKeyInfo->nField + pKeyInfo->nXField) < 13
-		    && (pKeyInfo->aColl[0] == NULL)) {
+		if (pCsr->key_def->part_count < 13
+		    && (pCsr->key_def->parts[0].coll == NULL)) {
 			pSorter->typeMask =
 			    SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
 		}
@@ -1280,10 +1268,11 @@ vdbeSortAllocUnpacked(SortSubtask * pTask)
 {
 	if (pTask->pUnpacked == 0) {
 		pTask->pUnpacked =
-		    sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
+			sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->db,
+						       pTask->pSorter->key_def);
 		if (pTask->pUnpacked == 0)
 			return SQLITE_NOMEM_BKPT;
-		pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
+		pTask->pUnpacked->nField = pTask->pSorter->key_def->part_count;
 		pTask->pUnpacked->errCode = 0;
 	}
 	return SQLITE_OK;
@@ -2824,7 +2813,6 @@ sqlite3VdbeSorterCompare(const VdbeCursor * pCsr,	/* Sorter cursor */
 {
 	VdbeSorter *pSorter;
 	UnpackedRecord *r2;
-	KeyInfo *pKeyInfo;
 	int i;
 	void *pKey;
 	int nKey;		/* Sorter key to compare pVal with */
@@ -2832,10 +2820,9 @@ sqlite3VdbeSorterCompare(const VdbeCursor * pCsr,	/* Sorter cursor */
 	assert(pCsr->eCurType == CURTYPE_SORTER);
 	pSorter = pCsr->uc.pSorter;
 	r2 = pSorter->pUnpacked;
-	pKeyInfo = pCsr->pKeyInfo;
 	if (r2 == 0) {
 		r2 = pSorter->pUnpacked =
-		    sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
+			sqlite3VdbeAllocUnpackedRecord(pSorter->db,  pCsr->key_def);
 		if (r2 == 0)
 			return SQLITE_NOMEM_BKPT;
 		r2->nField = nKeyCol;
@@ -2843,7 +2830,8 @@ sqlite3VdbeSorterCompare(const VdbeCursor * pCsr,	/* Sorter cursor */
 	assert(r2->nField == nKeyCol);
 
 	pKey = vdbeSorterRowkey(pSorter, &nKey);
-	sqlite3VdbeRecordUnpackMsgpack(pKeyInfo, nKey, pKey, r2);
+	sqlite3VdbeRecordUnpackMsgpack(pSorter->db, pCsr->key_def,
+				       nKey, pKey, r2);
 	for (i = 0; i < nKeyCol; i++) {
 		if (r2->aMem[i].flags & MEM_Null) {
 			*pRes = -1;
@@ -2851,6 +2839,6 @@ sqlite3VdbeSorterCompare(const VdbeCursor * pCsr,	/* Sorter cursor */
 		}
 	}
 
-	*pRes = sqlite3VdbeRecordCompareMsgpack(pVal->n, pVal->z, r2);
+	*pRes = sqlite3VdbeRecordCompareMsgpack(pSorter->db, pVal->n, pVal->z, r2);
 	return SQLITE_OK;
 }
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index bad964a..fb40f76 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -812,7 +812,7 @@ constructAutomaticIndex(Parse * pParse,			/* The parsing context */
 	assert(pLevel->iIdxCur >= 0);
 	pLevel->iIdxCur = pParse->nTab++;
 	sqlite3VdbeAddOp2(v, OP_OpenAutoindex, pLevel->iIdxCur, nKeyCol + 1);
-	sqlite3VdbeSetP4KeyInfo(pParse, pIdx);
+	sql_vdbe_set_p4_key_def(pParse, pIdx);
 	VdbeComment((v, "for %s", pTable->zName));
 
 	/* Fill the automatic index with content */
@@ -971,9 +971,9 @@ whereKeyStats(Parse * pParse,	/* Database connection */
 		}
 
 		pRec->nField = n;
-		res =
-		    sqlite3VdbeRecordCompareMsgpack(aSample[iSamp].n,
-						    aSample[iSamp].p, pRec);
+		res = sqlite3VdbeRecordCompareMsgpack(pParse->db,
+						      aSample[iSamp].n,
+						      aSample[iSamp].p, pRec);
 		if (res < 0) {
 			iLower =
 			    aSample[iSamp].anLt[n - 1] + aSample[iSamp].anEq[n -
@@ -1002,7 +1002,8 @@ whereKeyStats(Parse * pParse,	/* Database connection */
 			assert(iCol == nField - 1);
 			pRec->nField = nField;
 			assert(0 ==
-			       sqlite3VdbeRecordCompareMsgpack(aSample[i].n,
+			       sqlite3VdbeRecordCompareMsgpack(pParse->db,
+							       aSample[i].n,
 							       aSample[i].p,
 							       pRec)
 			       || pParse->db->mallocFailed);
@@ -1014,7 +1015,8 @@ whereKeyStats(Parse * pParse,	/* Database connection */
 			assert(i <= pIdx->nSample && i >= 0);
 			pRec->nField = iCol + 1;
 			assert(i == pIdx->nSample
-			       || sqlite3VdbeRecordCompareMsgpack(aSample[i].n,
+			       || sqlite3VdbeRecordCompareMsgpack(pParse->db,
+								  aSample[i].n,
 								  aSample[i].p,
 								  pRec) > 0
 			       || pParse->db->mallocFailed);
@@ -1027,13 +1029,14 @@ whereKeyStats(Parse * pParse,	/* Database connection */
 			if (iCol > 0) {
 				pRec->nField = iCol;
 				assert(sqlite3VdbeRecordCompareMsgpack
-				       (aSample[i].n, aSample[i].p, pRec) <= 0
+				       (pParse->db, aSample[i].n, aSample[i].p,
+					pRec) <= 0
 				       || pParse->db->mallocFailed);
 			}
 			if (i > 0) {
 				pRec->nField = nField;
 				assert(sqlite3VdbeRecordCompareMsgpack
-				       (aSample[i - 1].n, aSample[i - 1].p,
+				       (pParse->db, aSample[i - 1].n, aSample[i - 1].p,
 					pRec) < 0 || pParse->db->mallocFailed);
 			}
 		}
@@ -4571,7 +4574,7 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
 			assert(iIndexCur >= 0);
 			if (op) {
 				emit_open_cursor(pParse, iIndexCur, pIx->tnum);
-				sqlite3VdbeSetP4KeyInfo(pParse, pIx);
+				sql_vdbe_set_p4_key_def(pParse, pIx);
 				if ((pLoop->wsFlags & WHERE_CONSTRAINT) != 0
 				    && (pLoop->
 					wsFlags & (WHERE_COLUMN_RANGE |
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index 9610f76..a295578 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -1618,7 +1618,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
 			regRowset = pParse->nTab++;
 			sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
 					  regRowset, nPkCol);
-			sqlite3VdbeSetP4KeyInfo(pParse, pPk);
+			sql_vdbe_set_p4_key_def(pParse, pPk);
 			regPk = ++pParse->nMem;
 		}
 		iRetInit = sqlite3VdbeAddOp2(v, OP_Integer, 0, regReturn);
diff --git a/src/box/tuple.c b/src/box/tuple.c
index df2686e..7bcd9f8 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -474,3 +474,12 @@ tuple_str(const struct tuple *tuple)
 		return "<failed to format tuple>";
 	return buf;
 }
+
+const char *
+mp_str(const char *data)
+{
+	char *buf = tt_static_buf();
+	if (mp_snprint(buf, TT_STATIC_BUF_LEN, data) < 0)
+		return "<failed to format message pack>";
+	return buf;
+}
diff --git a/src/box/tuple.h b/src/box/tuple.h
index 97b81cf..9a459d9 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -395,6 +395,15 @@ tuple_snprint(char *buf, int size, const struct tuple *tuple);
 const char *
 tuple_str(const struct tuple *tuple);
 
+/**
+ * Format msgpack into string using a static buffer.
+ * Useful for debugger. Example: [1, 2, "string"]
+ * @param msgpack to format
+ * @return formatted null-terminated string
+ */
+const char *
+mp_str(const char *data);
+
 /**
  * Get the format of the tuple.
  * @param tuple Tuple.
diff --git a/test/sql-tap/index1.test.lua b/test/sql-tap/index1.test.lua
index 201838d..c0177c8 100755
--- a/test/sql-tap/index1.test.lua
+++ b/test/sql-tap/index1.test.lua
@@ -528,7 +528,9 @@ test:do_execsql_test(
         INSERT INTO t1 VALUES(2, 2,4);
         INSERT INTO t1 VALUES(3, 3,8);
         INSERT INTO t1 VALUES(4, 1,12);
+	pragma vdbe_debug=1;
         SELECT b FROM t1 WHERE a=1 ORDER BY b;
+	pragma vdbe_debug=0;
     ]], {
         -- <index-10.0>
         2, 12
diff --git a/test/sql-tap/index4.test.lua b/test/sql-tap/index4.test.lua
index 22e5066..85d3b3c 100755
--- a/test/sql-tap/index4.test.lua
+++ b/test/sql-tap/index4.test.lua
@@ -22,6 +22,7 @@ testprefix = "index4"
 test:do_execsql_test(
     1.1,
     [[
+    pragma vdbe_debug=1;
           CREATE TABLE t1(x primary key);
         BEGIN;
           INSERT INTO t1 VALUES(randomblob(102));
diff --git a/test/sql-tap/selectA.test.lua b/test/sql-tap/selectA.test.lua
index fc482e9..fa3a025 100755
--- a/test/sql-tap/selectA.test.lua
+++ b/test/sql-tap/selectA.test.lua
@@ -1155,6 +1155,7 @@ test:do_execsql_test(
 test:do_execsql_test(
     "selectA-2.94",
     [[
+    pragma vdbe_debug=1;
         SELECT lower((SELECT c FROM t1 UNION ALL SELECT z FROM t2 ORDER BY 1));
     ]], {
         -- <selectA-2.94>
-- 
2.16.2







More information about the Tarantool-patches mailing list