[tarantool-patches] Re: [PATCH v11] sql: add index_def to struct Index

Ivan Koptelov ivan.koptelov at tarantool.org
Wed Jul 4 18:55:56 MSK 2018


Thank you for the review!
> Firstly, I have found some bugs:
>
> tarantool> create table t1(a,b,c,d, primary key(a,a,a,b,c));
> ---
> ...
>
> It is OK, but when I do this:
>
> tarantool> create index i1 on t1(b,c,a,c)
> ---
> ...
>
> tarantool> create index i1 on t1(b,c,a,c)
> ---
> ...
>
> tarantool> create index i1 on t1(b,c,a,c)
> ---
>>
> No error is raised and index isn’t created.
>
> After you find and fix this bug, add also test on this case.
  Fixed, added test.
> Then:
>
> CREATE TABLE t11 (s1 INT, a, constraint c1 UNIQUE(s1) on conflict replace, PRIMARY KEY(s1));
>
> In this case creation of unique constraint c1 is omitted, but no errors or warnings are shown.
> It is not a problem now, but when ALTER TABLE DROP CONSTRAINT is implemented,
> it will be possible to drop c1 constraint. Eventually, user would be disappointed if tried to drop
> this constraint but got an error.
It seems to be out of the scope of the patch. Appropriate ticket:
https://github.com/tarantool/tarantool/issues/3498

>> @@ -1022,19 +1019,18 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
>> 		 */
>> 		assert(regKey == (regStat4 + 2));
>> 		Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
>> -		int j, k, regKeyStat;
>> -		int nPkColumn = (int)index_column_count(pPk);
>> -		regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
>> -		for (j = 0; j < nPkColumn; j++) {
>> -			k = pPk->aiColumn[j];
>> -			assert(k >= 0 && k < (int)pTab->def->field_count);
>> -			sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
>> -			VdbeComment((v, "%s",
>> -				pTab->def->fields[pPk->aiColumn[j]].name));
>> +		int pk_part_count = (int) pPk->def->key_def->part_count;
>> +		int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
>> +		for (int j = 0; j < pk_part_count; ++j) {
> Use uint32_t instead of int:
>
> -               int pk_part_count = (int) pPk->def->key_def->part_count;
> +               uint32_t pk_part_count = pPk->def->key_def->part_count;
>                  int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
> -               for (int j = 0; j < pk_part_count; ++j) {
> -                       int k = pPk->def->key_def->parts[j].fieldno;
> -                       assert(k >= 0 && k < (int) pTab->def->field_count);
> +               for (uint32_t j = 0; j < pk_part_count; ++j) {
> +                       uint32_t k = pPk->def->key_def->parts[j].fieldno;
> +                       assert(k >= 0 && k < pTab->def->field_count);
>
>> +struct Index *
>> +sql_index_alloc(struct sqlite3 *db, uint32_t part_count)
>> {
>> -	Index *p;		/* Allocated index object */
>> -	int nByte;		/* Bytes of space for Index object + arrays */
>> -
>> -	nByte = ROUND8(sizeof(Index)) +		    /* Index structure   */
>> -	    ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
>> -	    ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
>> -	    ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
>> -		   sizeof(i16) * nCol +		    /* Index.aiColumn    */
>> -		   sizeof(enum sort_order) * nCol); /* Index.sort_order  */
>> -	p = sqlite3DbMallocZero(db, nByte + nExtra);
>> -	if (p) {
>> -		char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
>> -		p->coll_array = (struct coll **)pExtra;
>> -		pExtra += ROUND8(sizeof(struct coll **) * nCol);
>> -		p->coll_id_array = (uint32_t *) pExtra;
>> -		pExtra += ROUND8(sizeof(uint32_t) * nCol);
>> -		p->aiRowLogEst = (LogEst *) pExtra;
>> -		pExtra += sizeof(LogEst) * (nCol + 1);
>> -		p->aiColumn = (i16 *) pExtra;
>> -		pExtra += sizeof(i16) * nCol;
>> -		p->sort_order = (enum sort_order *) pExtra;
>> -		p->nColumn = nCol;
>> -		*ppExtra = ((char *)p) + nByte;
>> -	}
>> +	/* Size of struct Index and aiRowLogEst. */
>> +	int nByte = ROUND8(sizeof(struct Index)) +
>> +		    ROUND8(sizeof(LogEst) * (part_count + 1));
> Do we really need this alignment?
No. Removed.
>
>> +	struct Index *p = sqlite3DbMallocZero(db, nByte);
>> +	if (p != NULL)
>> +		p->aiRowLogEst = (LogEst *) ((char *)p + ROUND8(sizeof(*p)));
>> 	return p;
>> }
>> @@ -2631,46 +2520,187 @@ addIndexToTable(Index * pIndex, Table * pTab)
>> +/**
>> + * Allocate memory on parser region and copy given string (part of
>> + * the sql statement) into the allocated memory.
>> + * @param parse Parse context.
>> + * @param str String (a part of sql statement) to be copied.
>> + *
>> + * @retval size Appended size.
>> + */
>> +static int
>> +sql_append(struct Parse *parse, const char *str)
> Such strange function...Lets rename it to sql(or str)_copy_to_region() at least.
Ok, renamed.
>
>> +{
>> +	const size_t str_len = strlen(str);
>> +	char *str_part = region_alloc(&parse->region, str_len);
>> +	if (str_part == NULL) {
>> +		diag_set(OutOfMemory, str_len, "region_alloc", "str_part");
>> +		parse->rc = SQL_TARANTOOL_ERROR;
>> +		parse->nErr++;
>> +		return 0;
>> +	}
>> +	memcpy(str_part, str, str_len);
>> +	return str_len;
>> +}
>> +
>> +/**
>> + * Create and set index_def in the given Index.
>> + *
>> + * @param parse Parse context.
>> + * @param index Index for which index_def should be created. It is
>> + *              used only to set index_def at the end of the
>> + *              function.
>> + * @param table Table which is indexed by 'index' param.
>> + * @param iid Index ID.
>> + * @param name Index name.
>> + * @param name_len Index name length.
>> + * @param is_unique Is given 'index' unique or not.
>> + * @param expr_list List of expressions, describe which columns
>> + *                  of 'table' are used in index and also their
>> + *                  collations, orders, etc.
>> + * @param idx_type Index type, one of the following:
>> + *                 SQLITE_IDXTYPE_APPDEF 0 (Index is created with
>> + *                 CREATE INDEX statement)
>> + *                 SQLITE_IDXTYPE_UNIQUE 1 (Index is created
>> + *                 automatically to implement a UNIQUE constraint)
>> + *                 SQLITE_IDXTYPE_PRIMARYKEY 2 (Index is a PRIMARY
>> + *                 KEY).
> Yo can skip this description - it is almost copy of existing one at these macroses definition.
Ok, removed detailed description, left a short one.
>
>> + */
>> +static void
>> +set_index_def(struct Parse *parse, struct Index *index, struct Table *table,
> Lets use Tarantool naming convention, sort of: index_fill_def() or index_create_def().
Ok, renamed.
>
>> +	      uint32_t iid, const char *name, uint32_t name_len, bool is_unique,
>> +	      struct ExprList *expr_list, u8 idx_type)
>> +{
>> +	struct space_def *space_def = table->def;
>> +	size_t sql_size = 0;
>> +	struct index_opts opts;
>> +	index_opts_create(&opts);
>> +	index->def = NULL;
>> +	opts.is_unique = is_unique;
>> +	int rc = -1;
> You don’t use this variable and in the end just reassign it:
>
> +	if (index->def == NULL)
> +		goto tnt_error;
> +	rc = 0;
> +cleanup:
> +	if (key_def != NULL)
> +		key_def_delete(key_def);
> +	return rc;
>
>> +
>> +	struct key_def *key_def = key_def_new(expr_list->nExpr);
>> +	if (key_def == NULL)
>> +		goto tnt_error;
>> +
>> +	/* Build initial parts of SQL statement.  */
>> +	if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>> +		sql_size += sql_append(parse, "CREATE INDEX ");
>> +		sql_size += sql_append(parse, name);
>> +		sql_size += sql_append(parse, " ON ");
>> +		sql_size += sql_append(parse, space_def->name);
>> +		sql_size += sql_append(parse, " (“);
> Why do you need to construct “CREATE INDEX” statement from scratch?
> This function is only called from sql_create_index() - there you already have
> this string:
>
> /*
>   * Gather the complete text of the CREATE INDEX
>   * statement into the zStmt variable
>   */
> assert(start != NULL);
> int n = (int)(parse->sLastToken.z - token->z) +
>         parse->sLastToken.n;
> if (token->z[n - 1] == ';')
>         n--;
Ok, now use this string.
>
>>   void
>> sql_create_index(struct Parse *parse, struct Token *token,
>> 		 struct SrcList *tbl_name, struct ExprList *col_list,
>> -		 int on_error, struct Token *start, struct Expr *where,
>> -		 enum sort_order sort_order, bool if_not_exist, u8 idx_type)
>> -{
>> -	Table *pTab = 0;	/* Table to be indexed */
>> -	Index *pIndex = 0;	/* The index to be created */
>> -	char *zName = 0;	/* Name of the index */
>> -	int nName;		/* Number of characters in zName */
>> -	int i, j;
>> -	DbFixer sFix;		/* For assigning database names to pTable */
>> -	sqlite3 *db = parse->db;
>> -	struct ExprList_item *col_listItem;	/* For looping over col_list */
>> -	int nExtra = 0;		/* Space allocated for zExtra[] */
>> -	char *zExtra = 0;	/* Extra space after the Index object */
>> +		 enum on_conflict_action on_error, struct Token *start,
>> +		 struct Expr *where, enum sort_order sort_order,
>> +		 bool if_not_exist, u8 idx_type)
>> +{
>> +	/* Table to be indexed.  */
> Extra space after dot.
Removed.
>
>> +	struct Table *table = NULL;
>> +	/* The index to be created.  */
>> +	struct Index *index = NULL;
>> +	/* Name of the index.  */
>> +	char *name = NULL;
>> +	int name_len;
> You don’t need to declare variables so beforehand.
> Its first usage occurs at 130+ lines below.
Variables name and index should be declared at the
very beginig, because they are used in exit_create_index
section and 'goto's to this section starts from the very
beginig of create_index_fucntion.
>
>> +	struct sqlite3 *db = parse->db;
>> 	struct session *user_session = current_session();
>> -	if (db->mallocFailed || parse->nErr > 0) {
>> +	if (db->mallocFailed || parse->nErr > 0)
>> 		goto exit_create_index;
>> -	}
>> -	/* Do not account nested operations: the count of such
>> -	 * operations depends on Tarantool data dictionary internals,
>> -	 * such as data layout in system spaces. Also do not account
>> -	 * PRIMARY KEY and UNIQUE constraint - they had been accounted
>> -	 * in CREATE TABLE already.
>> +	/*
>> +	 * Do not account nested operations: the count of such
>> +	 * operations depends on Tarantool data dictionary
>> +	 * internals, such as data layout in system spaces. Also
>> +	 * do not account PRIMARY KEY and UNIQUE constraint - they
>> +	 * had been accounted in CREATE TABLE already.
>> 	 */
>> 	if (!parse->nested && idx_type == SQLITE_IDXTYPE_APPDEF) {
>> 		Vdbe *v = sqlite3GetVdbe(parse);
>> @@ -2681,39 +2711,43 @@ sql_create_index(struct Parse *parse, struct Token *token,
>> 	assert(db->pSchema != NULL);
>>   	/*
>> -	 * Find the table that is to be indexed.  Return early if not found.
>> +	 * Find the table that is to be indexed.
>> +	 * Return early if not found.
>> 	 */
>> 	if (tbl_name != NULL) {
>> -
>> -		/* Use the two-part index name to determine the database
>> -		 * to search for the table. 'Fix' the table name to this db
>> -		 * before looking up the table.
>> +		/*
>> +		 * Use the two-part index name to determine the
>> +		 * database to search for the table. 'Fix' the
>> +		 * table name to this db before looking up the
>> +		 * table.
>> 		 */
>> 		assert(token && token->z);
>> -
>> -		sqlite3FixInit(&sFix, parse, "index", token);
>> -		if (sqlite3FixSrcList(&sFix, tbl_name)) {
>> -			/* Because the parser constructs tbl_name from a single identifier,
>> +		DbFixer db_fixer;
>> +		sqlite3FixInit(&db_fixer, parse, "index", token);
>> +		if (sqlite3FixSrcList(&db_fixer, tbl_name)) {
> This ‘Fix’ routine seems to be useless now, lets remove it.
Ok, removed.
>
>> +			/*
>> +			 * Because the parser constructs tbl_name
>> +			 * from a single identifier,
>> 			 * sqlite3FixSrcList can never fail.
>> 			 */
>> -			assert(0);
>> +			unreachable();
>> 		}
>> -		pTab = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
>> -		assert(db->mallocFailed == 0 || pTab == 0);
>> -		if (pTab == 0)
>> +		table = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
>> +		assert(db->mallocFailed == 0 || table == NULL);
>> +		if (table == NULL)
>> 			goto exit_create_index;
>> -		sqlite3PrimaryKeyIndex(pTab);
>> +		sqlite3PrimaryKeyIndex(table);
> Why do you call this function? It returns PK, but you don’t use it.
Some strange sqlite legacy. Removed.
>
>> 	} else {
>> 		assert(token == NULL);
>> 		assert(start == NULL);
>> -		pTab = parse->pNewTable;
>> -		if (!pTab)
>> +		table = parse->pNewTable;
>> +		if (table == NULL)
>> 			goto exit_create_index;
>> 	}
> Instead of checking table on NULL in each branch and after that using assert(table != NULL),
> it is better to replace that assert() with check:
>
> -               if (table == NULL)
> -                       goto exit_create_index;
> -               sqlite3PrimaryKeyIndex(table);
>          } else {
>                  assert(token == NULL);
>                  assert(start == NULL);
>                  table = parse->pNewTable;
> -               if (table == NULL)
> -                       goto exit_create_index;
>          }
>   
> -       assert(table != NULL);
> +       if (table == NULL)
> +               goto exit_create_index;
>
Sure, it's better. Fixed.
>> -	assert(pTab != 0);
>> +	assert(table != NULL);
>> 	assert(parse->nErr == 0);
>> -	if (pTab->def->opts.is_view) {
>> +	if (table->def->opts.is_view) {
>> 		sqlite3ErrorMsg(parse, "views may not be indexed");
>> 		goto exit_create_index;
>> 	}
> Lets also prohibit creation of indexes on system spaces.
Ok. Added.
>
>> @@ -2731,42 +2765,38 @@ sql_create_index(struct Parse *parse, struct Token *token,
>> 	 * primary key or UNIQUE constraint.  We have to invent
>> 	 * our own name.
>> 	 */
>> -	if (token) {
>> -		zName = sqlite3NameFromToken(db, token);
>> -		if (zName == 0)
>> +	if (token != NULL) {
>> +		name = sqlite3NameFromToken(db, token);
>> +		if (name == NULL)
>> 			goto exit_create_index;
>> -		assert(token->z != 0);
>> +		assert(token->z != NULL);
>> 		if (!db->init.busy) {
>> -			if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
>> +			if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
>> 			    NULL) {
>> -				sqlite3ErrorMsg(parse,
>> -						"there is already a table named %s",
>> -						zName);
>> +				sqlite3ErrorMsg(parse, "there is already a "\
>> +						"table named %s", name);
>> 				goto exit_create_index;
>> 			}
>> 		}
>> -		if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
>> +		if (sqlite3HashFind(&table->idxHash, name) != NULL) {
>> 			if (!if_not_exist) {
>> 				sqlite3ErrorMsg(parse,
>> 						"index %s.%s already exists",
>> -						pTab->def->name, zName);
>> +						table->def->name, name);
>> 			} else {
>> 				assert(!db->init.busy);
>> 			}
>> 			goto exit_create_index;
>> 		}
>> 	} else {
>> -		int n;
>> -		Index *pLoop;
>> -		for (pLoop = pTab->pIndex, n = 1; pLoop;
>> +		int n = 1;
>> +		for (struct Index *pLoop = table->pIndex; pLoop != NULL;
> Lets use Tarantool naming convention.
Ok, renamed where possible.
>
>> 		     pLoop = pLoop->pNext, n++) {
>> 		}
>> -		zName =
>> -		    sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
>> -				   n);
>> -		if (zName == 0) {
>> +		name = sqlite3MPrintf(db, "sqlite_autoindex_%s_%d”,
> Lets remove ’sqlite_’ prefix and use just ’sql_’.
Ok.
>
>> +				      table->def->name, n);
>> +		if (name == NULL)
>> 			goto exit_create_index;
>> -		}
>> 	}
>>   	/*
>> @@ -2776,9 +2806,9 @@ sql_create_index(struct Parse *parse, struct Token *token,
>> 	 * simulate this.
>> 	 */
>> 	if (col_list == NULL) {
>> -		Token prevCol;
>> -		uint32_t last_field = pTab->def->field_count - 1;
>> -		sqlite3TokenInit(&prevCol, pTab->def->fields[last_field].name);
>> +		struct Token prevCol;
> Lets use Tarantool naming convention.
Renamed.
>
>> +		uint32_t last_field = table->def->field_count - 1;
>> +		sqlite3TokenInit(&prevCol, table->def->fields[last_field].name);
>> 		col_list = sql_expr_list_append(parse->db, NULL,
>> 						sqlite3ExprAlloc(db, TK_ID,
>> 								 &prevCol, 0));
>> @@ -2790,108 +2820,93 @@ sql_create_index(struct Parse *parse, struct Token *token,
>> 		sqlite3ExprListCheckLength(parse, col_list, "index");
>> 	}
>> -	/* Figure out how many bytes of space are required to store explicitly
>> -	 * specified collation sequence names.
>> -	 */
>> -	for (i = 0; i < col_list->nExpr; i++) {
>> -		Expr *pExpr = col_list->a[i].pExpr;
>> -		assert(pExpr != 0);
>> -		if (pExpr->op == TK_COLLATE) {
>> -			nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
>> -		}
>> -	}
>> +	/* Allocate the index structure.  */
> Extra space after dot.
Removed.
>
>> +	name_len = sqlite3Strlen30(name);
> Lets use traditional strlen() instead of SQLite analogs.
Ok.
>
>> -	/*
>> -	 * Allocate the index structure.
>> -	 */
>> -	nName = sqlite3Strlen30(zName);
>> -	pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
>> -					    nName + nExtra + 1, &zExtra);
>> -	if (db->mallocFailed) {
>> +	if (name_len > BOX_NAME_MAX) {
>> +		sqlite3ErrorMsg(parse, "%s.%s exceeds indexes' names length "\
>> +				"limit", table->def->name, name);
> But sqlite3CheckIndentifier() also provides this check.
Removed duplicate check.
>
>> +	uint32_t max_iid = 0;
>> +	for (struct Index *index = table->pIndex; index != NULL;
>> +	     index = index->pNext) {
>> +		max_iid = max_iid > index->def->iid ?
>> +			  max_iid : index->def->iid + 1;
>> +	}
> Look, you don’t need to find max_iid: if it is parsing stage, then you can
> set it to any meaningful value (since in the end of function it is going to be destroyed);
> if it is executing step, you can use db->init.newTnum.
Ok, fixed.
>
>> +
>> +	bool is_unique = on_error != ON_CONFLICT_ACTION_NONE;
> It seems to be so messy defining uniqueness by ON_CONFLICT_ACTION.
> Lets refactor it somehow.
Not sure about this. It seems that information about uniqueness is
only in on_error parameter.
>> @@ -460,17 +462,19 @@ fkLookupParent(Parse * pParse,	/* Parse context */
>> 			 */
>> 			if (pTab == pFKey->pFrom && nIncr == 1) {
>> 				int iJump =
>> -				    sqlite3VdbeCurrentAddr(v) + nCol + 1;
>> -				for (i = 0; i < nCol; i++) {
>> +					sqlite3VdbeCurrentAddr(v) + nCol + 1;
>> +				struct key_part *part =
>> +					pIdx->def->key_def->parts;
>> +				for (i = 0; i < nCol; ++i, ++part) {
>> 					int iChild = aiCol[i] + 1 + regData;
>> -					int iParent =
>> -					    pIdx->aiColumn[i] + 1 + regData;
>> -					assert(pIdx->aiColumn[i] >= 0);
>> +					int iParent = 1 + regData +
>> +						      (int)part->fieldno;
>> 					assert(aiCol[i] != pTab->iPKey);
>> -					if (pIdx->aiColumn[i] == pTab->iPKey) {
>> +					if ((int)part->fieldno == pTab->iPKey) {
>> 						/* The parent key is a composite key that includes the IPK column */
>> 						iParent = regData;
>> 					}
>> +
> Extra empty line.
Removed.
>
>> @@ -622,7 +625,7 @@ fkScanChildren(Parse * pParse,	/* Parse context */
>> 	Vdbe *v = sqlite3GetVdbe(pParse);
>>   	assert(pIdx == 0 || pIdx->pTable == pTab);
>> -	assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
>> +	assert(pIdx == 0 || (int) pIdx->def->key_def->part_count == pFKey->nCol);
> Lets use == NULL comparison on pointers.
Fixed.
>
>> @@ -1108,19 +1110,19 @@ sqlite3FkOldmask(Parse * pParse,	/* Parse context */
>>   	if (user_session->sql_flags & SQLITE_ForeignKeys) {
>> 		FKey *p;
>> -		int i;
>> 		for (p = pTab->pFKey; p; p = p->pNextFrom) {
>> -			for (i = 0; i < p->nCol; i++)
>> +			for (int i = 0; i < p->nCol; i++)
> Is this change related to patch?
No, just a side refactoring.
>
>> @@ -1390,24 +1389,22 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
>> 		if (uniqueByteCodeNeeded) {
>> 			sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
>> 					     addrUniqueOk, regIdx,
>> -					     index_column_count(pIdx));
>> +					     pIdx->def->key_def->part_count);
>> 		}
>> 		VdbeCoverage(v);
>> +		const uint32_t pk_part_count = pPk->def->key_def->part_count;
> Why do you use here const? In other places AFAIK we/you don’t use
> const modifier (when it comes to simple numeric variables).
Removed.
>
>
>> @@ -1621,15 +1621,12 @@ sql_stat4_column(struct sqlite3 *db, const char *record, uint32_t col_num,
>> void
>> sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
>> {
>> -	if (pRec) {
>> -		int i;
>> -		int nCol = pRec->key_def->part_count;
>> -		Mem *aMem = pRec->aMem;
>> -		sqlite3 *db = aMem[0].db;
>> -		for (i = 0; i < nCol; i++) {
>> +	if (pRec != NULL) {
>> +		int part_count = pRec->key_def->part_count;
>> +		struct Mem *aMem = pRec->aMem;
>> +		for (int i = 0; i < part_count; i++)
>> 			sqlite3VdbeMemRelease(&aMem[i]);
>> -		}
>> -		sqlite3DbFree(db, pRec);
>> +		sqlite3DbFree(aMem[0].db, pRec);
>> 	}
>> }
> Is this refactoring related to patch? I mean, refactoring is always appreciated,
> but don’t mess it with main goal of patch.
It is not related to the patch.
>
>> diff --git a/src/box/sql/where.c b/src/box/sql/where.c
>> index 85143ed20..7ca02095f 100644
>> --- a/src/box/sql/where.c
>> +++ b/src/box/sql/where.c
>> @@ -372,13 +372,19 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
>> 	pScan->is_column_seen = false;
>> 	if (pIdx) {
>> 		int j = iColumn;
>> -		iColumn = pIdx->aiColumn[j];
>> +		iColumn = pIdx->def->key_def->parts[j].fieldno;
>> +		/*
>> +		 * pIdx->tnum == 0 means that pIdx is a fake
>> +		 * integer primary key index.
>> +		 */
>> +		if (pIdx->tnum == 0)
>> +			iColumn = -1;
> We are going to remove tnum from struct Index and struct Table.
> So, if it is possible, use index->def->iid instead (or smth else).
Removed with 'fake_autoindex'
>
>> @@ -2882,7 +2868,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
>> 	Index *pProbe;		/* An index we are evaluating */
>> 	Index sPk;		/* A fake index object for the primary key */
>> 	LogEst aiRowEstPk[2];	/* The aiRowLogEst[] value for the sPk index */
>> -	i16 aiColumnPk = -1;	/* The aColumn[] value for the sPk index */
>> 	SrcList *pTabList;	/* The FROM clause */
>> 	struct SrcList_item *pSrc;	/* The FROM clause btree term to add */
>> 	WhereLoop *pNew;	/* Template WhereLoop object */
>> @@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
>> 		 */
>> 		Index *pFirst;	/* First of real indices on the table */
>> 		memset(&sPk, 0, sizeof(Index));
>> -		sPk.nColumn = 1;
>> -		sPk.aiColumn = &aiColumnPk;
>> 		sPk.aiRowLogEst = aiRowEstPk;
>> 		sPk.onError = ON_CONFLICT_ACTION_REPLACE;
>> 		sPk.pTable = pTab;
>> +
>> +		struct key_def *key_def = key_def_new(1);
>> +		if (key_def == NULL) {
>> +			pWInfo->pParse->nErr++;
>> +			pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
>> +			return SQL_TARANTOOL_ERROR;
>> +		}
>> +
>> +		key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
>> +				 ON_CONFLICT_ACTION_ABORT,
>> +				 NULL, COLL_NONE, SORT_ORDER_ASC);
>> +
>> +		sPk.def = index_def_new(pTab->def->id, 0, "primary”,
> Lets name if like ‘fake_autoindex’ to easily tell it from the rest.
Ok.
>
>> diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation1.test.lua
>> similarity index 100%
>> rename from test/sql-tap/collation.test.lua
>> rename to test/sql-tap/collation1.test.lua
>> diff --git a/test/sql-tap/collation2.test.lua b/test/sql-tap/collation2.test.lua
>> new file mode 100755
>> index 000000000..64296b0be
>> --- /dev/null
>> +++ b/test/sql-tap/collation2.test.lua
>> @@ -0,0 +1,20 @@
>> +#!/usr/bin/env tarantool
>> +test = require("sqltester")
>> +test:plan(3)
>> +
>> +test:do_catchsql_test(
>> +        "collation-2.1",
>> +        'CREATE TABLE test1 (a int, b int, c int, PRIMARY KEY (a, a, a, b, c))',
>> +        nil)
>> +
>> +test:do_catchsql_test(
>> +        "collation-2.2",
>> +        'CREATE TABLE test2 (a int, b int, c int, PRIMARY KEY (a, a, a, b, b, a, c))',
>> +        nil)
>> +
>> +test:do_catchsql_test(
>> +        "collation-2.3",
>> +        'CREATE TABLE test3 (a int, b int, c int, PRIMARY KEY (a, a COLLATE foo, b, c))',
>> +        {1, "Collation 'FOO' does not exist"})
>> +
>> +test:finish_test()
> I wouldn’t create separate test file for these simple tests.
> Lets put them to existing one.
Moved this testcases to existing tests.

---
Branch:
https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where

Issue:https://github.com/tarantool/tarantool/issues/3369

  src/box/errcode.h                                  |   1 +
  src/box/sql.c                                      |  54 +-
  src/box/sql/analyze.c                              |  85 +--
  src/box/sql/build.c                                | 816 ++++++++++-----------
  src/box/sql/delete.c                               |  10 +-
  src/box/sql/expr.c                                 |  61 +-
  src/box/sql/fkey.c                                 | 216 +++---
  src/box/sql/insert.c                               | 145 ++--
  src/box/sql/pragma.c                               |  30 +-
  src/box/sql/select.c                               |   2 +-
  src/box/sql/sqliteInt.h                            | 116 +--
  src/box/sql/update.c                               |  39 +-
  src/box/sql/vdbeaux.c                              |   2 +-
  src/box/sql/vdbemem.c                              |  21 +-
  src/box/sql/where.c                                | 192 ++---
  src/box/sql/wherecode.c                            | 102 +--
  test/box/misc.result                               |   1 +
  test/sql-tap/analyze6.test.lua                     |   6 +-
  .../{collation.test.lua => collation1.test.lua}    |   7 +-
  test/sql-tap/colname.test.lua                      |   4 +-
  test/sql-tap/gh-2931-savepoints.test.lua           |   2 +-
  test/sql-tap/gh2140-trans.test.lua                 |   2 +-
  test/sql-tap/gh2259-in-stmt-trans.test.lua         |   8 +-
  test/sql-tap/gh2964-abort.test.lua                 |   2 +-
  test/sql-tap/identifier-characters.test.lua        |   2 +-
  test/sql-tap/identifier_case.test.lua              |   4 +-
  test/sql-tap/index1.test.lua                       |  14 +-
  test/sql-tap/index7.test.lua                       |  21 +-
  test/sql-tap/intpkey.test.lua                      |   4 +-
  test/sql-tap/misc1.test.lua                        |   2 +-
  test/sql-tap/unique.test.lua                       |   8 +-
  test/sql-tap/update.test.lua                       |   6 +-
  test/sql/insert-unique.result                      |   3 +-
  test/sql/iproto.result                             |   2 +-
  test/sql/message-func-indexes.result               |   8 +-
  test/sql/on-conflict.result                        |   2 +-
  test/sql/persistency.result                        |   6 +-
  test/sql/transition.result                         |   6 +-
  38 files changed, 965 insertions(+), 1047 deletions(-)
  rename test/sql-tap/{collation.test.lua => collation1.test.lua} (97%)

diff --git a/src/box/errcode.h b/src/box/errcode.h
index c76018cbf..2229c5cbd 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -215,6 +215,7 @@ struct errcode_record {
  	/*160 */_(ER_ACTION_MISMATCH,		"Field %d contains %s on conflict action, but %s in index parts") \
  	/*161 */_(ER_VIEW_MISSING_SQL,		"Space declared as a view must have SQL statement") \
  	/*162 */_(ER_FOREIGN_KEY_CONSTRAINT,	"Can not commit transaction: deferred foreign keys violations are not resolved") \
+	/*163 */_(ER_NO_SUCH_COLLATION,		"Collation '%s' does not exist") \
  
  /*
   * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/sql.c b/src/box/sql.c
index 063743e87..142dcc2da 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1420,8 +1420,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
  
  	/* If table's PK is single column which is INTEGER, then
  	 * treat it as strict type, not affinity.  */
-	if (pk_idx && pk_idx->nColumn == 1) {
-		int pk = pk_idx->aiColumn[0];
+	if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
+		int pk = pk_idx->def->key_def->parts[0].fieldno;
  		if (def->fields[pk].type == FIELD_TYPE_INTEGER)
  			pk_forced_int = pk;
  	}
@@ -1532,20 +1532,19 @@ tarantoolSqlite3MakeTableOpts(Table *pTable, const char *zSql, char *buf)
   */
  int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
  {
-	struct space_def *def = pIndex->pTable->def;
-	assert(def != NULL);
+	struct field_def *fields = pIndex->pTable->def->fields;
+	struct key_def *key_def = pIndex->def->key_def;
  	const struct Enc *enc = get_enc(buf);
-	struct SqliteIndex *primary_index;
-	char *base = buf, *p;
-	int pk_forced_int = -1;
-
-	primary_index = sqlite3PrimaryKeyIndex(pIndex->pTable);
+	char *base = buf;
+	uint32_t pk_forced_int = UINT32_MAX;
+	struct SqliteIndex *primary_index =
+		sqlite3PrimaryKeyIndex(pIndex->pTable);
  
  	/* If table's PK is single column which is INTEGER, then
  	 * treat it as strict type, not affinity.  */
-	if (primary_index->nColumn == 1) {
-		int pk = primary_index->aiColumn[0];
-		if (def->fields[pk].type == FIELD_TYPE_INTEGER)
+	if (primary_index->def->key_def->part_count == 1) {
+		int pk = primary_index->def->key_def->parts[0].fieldno;
+		if (fields[pk].type == FIELD_TYPE_INTEGER)
  			pk_forced_int = pk;
  	}
  
@@ -1555,46 +1554,45 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
  	 * primary key columns. Query planner depends on this particular
  	 * data layout.
  	 */
-	int i, n = pIndex->nColumn;
-
-	p = enc->encode_array(base, n);
-	for (i = 0; i < n; i++) {
-		int col = pIndex->aiColumn[i];
-		assert(def->fields[col].is_nullable ==
-		       action_is_nullable(def->fields[col].nullable_action));
+	struct key_part *part = key_def->parts;
+	char *p = enc->encode_array(base, key_def->part_count);
+	for (uint32_t i = 0; i < key_def->part_count; ++i, ++part) {
+		uint32_t col = part->fieldno;
+		assert(fields[col].is_nullable ==
+		       action_is_nullable(fields[col].nullable_action));
  		const char *t;
  		if (pk_forced_int == col) {
  			t = "integer";
  		} else {
-			enum affinity_type affinity = def->fields[col].affinity;
-			t = convertSqliteAffinity(affinity,
-						  def->fields[col].is_nullable);
+			t = convertSqliteAffinity(fields[col].affinity,
+						  fields[col].is_nullable);
  		}
  		/* do not decode default collation */
-		uint32_t cid = pIndex->coll_id_array[i];
+		uint32_t cid = part->coll_id;
  		p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
  		p = enc->encode_str(p, "type", sizeof("type")-1);
  		p = enc->encode_str(p, t, strlen(t));
  		p = enc->encode_str(p, "field", sizeof("field")-1);
  		p = enc->encode_uint(p, col);
  		if (cid != COLL_NONE) {
-			p = enc->encode_str(p, "collation", sizeof("collation")-1);
+			p = enc->encode_str(p, "collation",
+					    sizeof("collation") - 1);
  			p = enc->encode_uint(p, cid);
  		}
  		p = enc->encode_str(p, "is_nullable", 11);
-		p = enc->encode_bool(p, def->fields[col].is_nullable);
+		p = enc->encode_bool(p, fields[col].is_nullable);
  		p = enc->encode_str(p, "nullable_action", 15);
  		const char *action_str =
-			on_conflict_action_strs[def->fields[col].nullable_action];
+			on_conflict_action_strs[fields[col].nullable_action];
  		p = enc->encode_str(p, action_str, strlen(action_str));
  
  		p = enc->encode_str(p, "sort_order", 10);
-		enum sort_order sort_order = pIndex->sort_order[i];
+		enum sort_order sort_order = part->sort_order;
  		assert(sort_order < sort_order_MAX);
  		const char *sort_order_str = sort_order_strs[sort_order];
  		p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
  	}
-	return (int)(p - base);
+	return p - base;
  }
  
  /*
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index 5f73f026e..cd7a642d2 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -848,8 +848,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  	for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
  		int addrRewind;	/* Address of "OP_Rewind iIdxCur" */
  		int addrNextRow;	/* Address of "next_row:" */
-		const char *zIdxName;	/* Name of the index */
-		int nColTest;	/* Number of columns to test for changes */
+		const char *idx_name;	/* Name of the index */
  
  		if (pOnlyIdx && pOnlyIdx != pIdx)
  			continue;
@@ -857,17 +856,16 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		 * names. Thus, for the sake of clarity, use
  		 * instead more familiar table name.
  		 */
-		if (IsPrimaryKeyIndex(pIdx)) {
-			zIdxName = pTab->def->name;
-		} else {
-			zIdxName = pIdx->zName;
-		}
-		nColTest = index_column_count(pIdx);
+		if (IsPrimaryKeyIndex(pIdx))
+			idx_name = pTab->def->name;
+		else
+			idx_name = pIdx->def->name;
+		int part_count = pIdx->def->key_def->part_count;
  
  		/* Populate the register containing the index name. */
-		sqlite3VdbeLoadString(v, regIdxname, zIdxName);
+		sqlite3VdbeLoadString(v, regIdxname, idx_name);
  		VdbeComment((v, "Analysis for %s.%s", pTab->def->name,
-			zIdxName));
+			    idx_name));
  
  		/*
  		 * Pseudo-code for loop that calls stat_push():
@@ -906,7 +904,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		 * when building a record to insert into the sample column of
  		 * the _sql_stat4 table).
  		 */
-		pParse->nMem = MAX(pParse->nMem, regPrev + nColTest);
+		pParse->nMem = MAX(pParse->nMem, regPrev + part_count);
  
  		/* Open a read-only cursor on the index being analyzed. */
  		struct space *space =
@@ -917,7 +915,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
  				  space_ptr_reg);
  		sql_vdbe_set_p4_key_def(pParse, pIdx);
-		VdbeComment((v, "%s", pIdx->zName));
+		VdbeComment((v, "%s", pIdx->def->name));
  
  		/* Invoke the stat_init() function. The arguments are:
  		 *
@@ -930,8 +928,8 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		 * The third argument is only used for STAT4
  		 */
  		sqlite3VdbeAddOp2(v, OP_Count, iIdxCur, regStat4 + 3);
-		sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 1);
-		sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 2);
+		sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 1);
+		sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 2);
  		sqlite3VdbeAddOp4(v, OP_Function0, 0, regStat4 + 1, regStat4,
  				  (char *)&statInitFuncdef, P4_FUNCDEF);
  		sqlite3VdbeChangeP5(v, 3);
@@ -949,11 +947,11 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		sqlite3VdbeAddOp2(v, OP_Integer, 0, regChng);
  		addrNextRow = sqlite3VdbeCurrentAddr(v);
  
-		if (nColTest > 0) {
+		if (part_count > 0) {
  			int endDistinctTest = sqlite3VdbeMakeLabel(v);
  			int *aGotoChng;	/* Array of jump instruction addresses */
  			aGotoChng =
-			    sqlite3DbMallocRawNN(db, sizeof(int) * nColTest);
+			    sqlite3DbMallocRawNN(db, sizeof(int) * part_count);
  			if (aGotoChng == 0)
  				continue;
  
@@ -969,7 +967,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  			 */
  			sqlite3VdbeAddOp0(v, OP_Goto);
  			addrNextRow = sqlite3VdbeCurrentAddr(v);
-			if (nColTest == 1 && index_is_unique(pIdx)) {
+			if (part_count == 1 && pIdx->def->opts.is_unique) {
  				/* For a single-column UNIQUE index, once we have found a non-NULL
  				 * row, we know that all the rest will be distinct, so skip
  				 * subsequent distinctness tests.
@@ -978,13 +976,12 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  						  endDistinctTest);
  				VdbeCoverage(v);
  			}
-			for (i = 0; i < nColTest; i++) {
-				uint32_t id;
-				struct coll *coll =
-					sql_index_collation(pIdx, i, &id);
+			struct key_part *part = pIdx->def->key_def->parts;
+			for (i = 0; i < part_count; ++i, ++part) {
+				struct coll *coll = part->coll;
  				sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
  				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-						  pIdx->aiColumn[i], regTemp);
+						  part->fieldno, regTemp);
  				aGotoChng[i] =
  				    sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
  						      regPrev + i, (char *)coll,
@@ -992,7 +989,7 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  				sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
  				VdbeCoverage(v);
  			}
-			sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regChng);
+			sqlite3VdbeAddOp2(v, OP_Integer, part_count, regChng);
  			sqlite3VdbeGoto(v, endDistinctTest);
  
  			/*
@@ -1003,11 +1000,11 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  			 *  ...
  			 */
  			sqlite3VdbeJumpHere(v, addrNextRow - 1);
-			for (i = 0; i < nColTest; i++) {
+			part = pIdx->def->key_def->parts;
+			for (i = 0; i < part_count; ++i, ++part) {
  				sqlite3VdbeJumpHere(v, aGotoChng[i]);
  				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-						  pIdx->aiColumn[i],
-						  regPrev + i);
+						  part->fieldno, regPrev + i);
  			}
  			sqlite3VdbeResolveLabel(v, endDistinctTest);
  			sqlite3DbFree(db, aGotoChng);
@@ -1022,19 +1019,18 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		 */
  		assert(regKey == (regStat4 + 2));
  		Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-		int j, k, regKeyStat;
-		int nPkColumn = (int)index_column_count(pPk);
-		regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
-		for (j = 0; j < nPkColumn; j++) {
-			k = pPk->aiColumn[j];
-			assert(k >= 0 && k < (int)pTab->def->field_count);
-			sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
-			VdbeComment((v, "%s",
-				pTab->def->fields[pPk->aiColumn[j]].name));
+		uint32_t pk_part_count = pPk->def->key_def->part_count;
+		int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
+		for (uint32_t j = 0; j < pk_part_count; ++j) {
+			uint32_t k = pPk->def->key_def->parts[j].fieldno;
+			assert(k < pTab->def->field_count);
+			sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
+					  regKeyStat + j);
+			VdbeComment((v, "%s", pTab->def->fields[k].name));
  		}
  		sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
-				  nPkColumn, regKey);
-		sqlite3ReleaseTempRange(pParse, regKeyStat, nPkColumn);
+				  pk_part_count, regKey);
+		sqlite3ReleaseTempRange(pParse, regKeyStat, pk_part_count);
  
  		assert(regChng == (regStat4 + 1));
  		sqlite3VdbeAddOp4(v, OP_Function0, 1, regStat4, regTemp,
@@ -1057,11 +1053,11 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		int regDLt = regStat1 + 2;
  		int regSample = regStat1 + 3;
  		int regCol = regStat1 + 4;
-		int regSampleKey = regCol + nColTest;
+		int regSampleKey = regCol + part_count;
  		int addrNext;
  		int addrIsNull;
  
-		pParse->nMem = MAX(pParse->nMem, regCol + nColTest);
+		pParse->nMem = MAX(pParse->nMem, regCol + part_count);
  
  		addrNext = sqlite3VdbeCurrentAddr(v);
  		callStatGet(v, regStat4, STAT_GET_KEY, regSampleKey);
@@ -1077,12 +1073,11 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
  		 * be taken
  		 */
  		VdbeCoverageNeverTaken(v);
-		for (i = 0; i < nColTest; i++) {
-			sqlite3ExprCodeLoadIndexColumn(pParse, pIdx,
-									 iTabCur, i,
-									 regCol + i);
+		for (i = 0; i < part_count; i++) {
+			sqlite3ExprCodeLoadIndexColumn(pParse, pIdx, iTabCur, i,
+						       regCol + i);
  		}
-		sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nColTest,
+		sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, part_count,
  				  regSample);
  		sqlite3VdbeAddOp3(v, OP_MakeRecord, regTabname, 6, regTemp);
  		sqlite3VdbeAddOp2(v, OP_IdxReplace, iStatCur + 1, regTemp);
@@ -1146,7 +1141,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index * pOnlyIdx)
  	iStatCur = pParse->nTab;
  	pParse->nTab += 3;
  	if (pOnlyIdx) {
-		openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
+		openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
  	} else {
  		openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
  	}
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 0072f842e..d66777f73 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -241,6 +241,8 @@ static void
  freeIndex(sqlite3 * db, Index * p)
  {
  	sql_expr_delete(db, p->pPartIdxWhere, false);
+	if (p->def != NULL)
+		index_def_delete(p->def);
  	sqlite3DbFree(db, p->zColAff);
  	sqlite3DbFree(db, p);
  }
@@ -259,7 +261,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index * pIndex)
  
  	struct session *user_session = current_session();
  
-	pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
+	pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
+				   pIndex->def->name, 0);
  	if (ALWAYS(pIndex)) {
  		if (pIndex->pTable->pIndex == pIndex) {
  			pIndex->pTable->pIndex = pIndex->pNext;
@@ -364,7 +367,7 @@ deleteTable(sqlite3 * db, Table * pTable)
  		pNext = pIndex->pNext;
  		assert(pIndex->pSchema == pTable->pSchema);
  		if ((db == 0 || db->pnBytesFreed == 0)) {
-			char *zName = pIndex->zName;
+			char *zName = pIndex->def->name;
  			TESTONLY(Index *
  				 pOld =) sqlite3HashInsert(&pTable->idxHash,
  							   zName, 0);
@@ -1029,7 +1032,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
  	Table *p = pParse->pNewTable;
  	if (p == NULL)
  		return;
-	int i = p->def->field_count - 1;
+	uint32_t i = p->def->field_count - 1;
  	sqlite3 *db = pParse->db;
  	char *zColl = sqlite3NameFromToken(db, pToken);
  	if (!zColl)
@@ -1037,22 +1040,21 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
  	uint32_t *id = &p->def->fields[i].coll_id;
  	p->aCol[i].coll = sql_get_coll_seq(pParse, zColl, id);
  	if (p->aCol[i].coll != NULL) {
-		Index *pIdx;
  		/* If the column is declared as "<name> PRIMARY KEY COLLATE <type>",
  		 * then an index may have been created on this column before the
  		 * collation type was added. Correct this if it is the case.
  		 */
-		for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
-			assert(pIdx->nColumn == 1);
-			if (pIdx->aiColumn[0] == i) {
-				id = &pIdx->coll_id_array[0];
-				pIdx->coll_array[0] =
+		for (struct Index *pIdx = p->pIndex; pIdx != NULL;
+		     pIdx = pIdx->pNext) {
+			assert(pIdx->def->key_def->part_count == 1);
+			if (pIdx->def->key_def->parts[0].fieldno == i) {
+				id = &pIdx->def->key_def->parts[0].coll_id;
+				pIdx->def->key_def->parts[0].coll =
  					sql_column_collation(p->def, i, id);
  			}
  		}
-	} else {
-		sqlite3DbFree(db, zColl);
  	}
+	sqlite3DbFree(db, zColl);
  }
  
  struct coll *
@@ -1082,66 +1084,6 @@ sql_column_collation(struct space_def *def, uint32_t column, uint32_t *coll_id)
  	return space->format->fields[column].coll;
  }
  
-struct key_def*
-sql_index_key_def(struct Index *idx)
-{
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct space *space = space_by_id(space_id);
-	assert(space != NULL);
-	struct index *index = space_index(space, index_id);
-	assert(index != NULL && index->def != NULL);
-	return index->def->key_def;
-}
-
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
-{
-	assert(idx != NULL);
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-	struct space *space = space_by_id(space_id);
-
-	assert(column < idx->nColumn);
-	/*
-	 * If space is still under construction, or it is
-	 * an ephemeral space, then fetch collation from
-	 * SQL internal structure.
-	 */
-	if (space == NULL) {
-		assert(column < idx->nColumn);
-		*coll_id = idx->coll_id_array[column];
-		return idx->coll_array[column];
-	}
-
-	struct key_def *key_def = sql_index_key_def(idx);
-	assert(key_def != NULL && key_def->part_count >= column);
-	*coll_id = key_def->parts[column].coll_id;
-	return key_def->parts[column].coll;
-}
-
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column)
-{
-	assert(idx != NULL);
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-	struct space *space = space_by_id(space_id);
-
-	assert(column < idx->nColumn);
-	/*
-	 * If space is still under construction, or it is
-	 * an ephemeral space, then fetch collation from
-	 * SQL internal structure.
-	 */
-	if (space == NULL) {
-		assert(column < idx->nColumn);
-		return idx->sort_order[column];
-	}
-
-	struct key_def *key_def = sql_index_key_def(idx);
-	assert(key_def != NULL && key_def->part_count >= column);
-	return key_def->parts[column].sort_order;
-}
-
  struct ExprList *
  space_checks_expr_list(uint32_t space_id)
  {
@@ -1325,17 +1267,6 @@ createTableStmt(sqlite3 * db, Table * p)
  	return zStmt;
  }
  
-/* Return true if value x is found any of the first nCol entries of aiCol[]
- */
-static int
-hasColumn(const i16 * aiCol, int nCol, int x)
-{
-	while (nCol-- > 0)
-		if (x == *(aiCol++))
-			return 1;
-	return 0;
-}
-
  /*
   * This routine runs at the end of parsing a CREATE TABLE statement.
   * The job of this routine is to convert both
@@ -1352,13 +1283,12 @@ static void
  convertToWithoutRowidTable(Parse * pParse, Table * pTab)
  {
  	Index *pPk;
-	int i, j;
  	sqlite3 *db = pParse->db;
  
  	/* Mark every PRIMARY KEY column as NOT NULL (except for imposter tables)
  	 */
  	if (!db->init.imposterTable) {
-		for (i = 0; i < (int)pTab->def->field_count; i++) {
+		for (uint32_t i = 0; i < pTab->def->field_count; i++) {
  			if (pTab->aCol[i].is_primkey) {
  				pTab->def->fields[i].nullable_action
  					= ON_CONFLICT_ACTION_ABORT;
@@ -1390,20 +1320,6 @@ convertToWithoutRowidTable(Parse * pParse, Table * pTab)
  		pTab->iPKey = -1;
  	} else {
  		pPk = sqlite3PrimaryKeyIndex(pTab);
-
-		/*
-		 * Remove all redundant columns from the PRIMARY KEY.  For example, change
-		 * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY KEY(a,b,c,d)".  Later
-		 * code assumes the PRIMARY KEY contains no repeated columns.
-		 */
-		for (i = j = 1; i < pPk->nColumn; i++) {
-			if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
-				pPk->nColumn--;
-			} else {
-				pPk->aiColumn[j++] = pPk->aiColumn[i];
-			}
-		}
-		pPk->nColumn = j;
  	}
  	assert(pPk != 0);
  }
@@ -1485,7 +1401,7 @@ createIndex(Parse * pParse, Index * pIndex, int iSpaceId, int iIndexId,
  	}
  	sqlite3VdbeAddOp4(v,
  			  OP_String8, 0, iFirstCol + 2, 0,
-			  sqlite3DbStrDup(pParse->db, pIndex->zName),
+			  sqlite3DbStrDup(pParse->db, pIndex->def->name),
  			  P4_DYNAMIC);
  	sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
  			  P4_STATIC);
@@ -1522,7 +1438,7 @@ makeIndexSchemaRecord(Parse * pParse,
  
  	sqlite3VdbeAddOp4(v,
  			  OP_String8, 0, iFirstCol, 0,
-			  sqlite3DbStrDup(pParse->db, pIndex->zName),
+			  sqlite3DbStrDup(pParse->db, pIndex->def->name),
  			  P4_DYNAMIC);
  
  	if (pParse->pNewTable) {
@@ -2452,15 +2368,16 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
  	} else {
  		tnum = pIndex->tnum;
  	}
-	struct key_def *def = key_def_dup(sql_index_key_def(pIndex));
+	struct key_def *def = key_def_dup(pIndex->def->key_def);
  	if (def == NULL) {
  		sqlite3OomFault(db);
  		return;
  	}
  	/* Open the sorter cursor if we are to use one. */
  	iSorter = pParse->nTab++;
-	sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
-			  (char *)def, P4_KEYDEF);
+	sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
+			  pIndex->def->key_def->part_count, (char *)def,
+			  P4_KEYDEF);
  
  	/* Open the table. Loop through all rows of the table, inserting index
  	 * records into the sorter.
@@ -2491,7 +2408,8 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
  		sqlite3VdbeGoto(v, j2);
  		addr2 = sqlite3VdbeCurrentAddr(v);
  		sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
-				     regRecord, pIndex->nColumn);
+				     regRecord,
+				     pIndex->def->key_def->part_count);
  		VdbeCoverage(v);
  		parser_emit_unique_constraint(pParse, ON_CONFLICT_ACTION_ABORT,
  					      pIndex);
@@ -2511,44 +2429,14 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
  	sqlite3VdbeAddOp1(v, OP_Close, iSorter);
  }
  
-/*
- * Allocate heap space to hold an Index object with nCol columns.
- *
- * Increase the allocation size to provide an extra nExtra bytes
- * of 8-byte aligned space after the Index object and return a
- * pointer to this extra space in *ppExtra.
- */
-Index *
-sqlite3AllocateIndexObject(sqlite3 * db,	/* Database connection */
-			   i16 nCol,	/* Total number of columns in the index */
-			   int nExtra,	/* Number of bytes of extra space to alloc */
-			   char **ppExtra	/* Pointer to the "extra" space */
-    )
+struct Index *
+sql_index_alloc(struct sqlite3 *db)
  {
-	Index *p;		/* Allocated index object */
-	int nByte;		/* Bytes of space for Index object + arrays */
-
-	nByte = ROUND8(sizeof(Index)) +		    /* Index structure   */
-	    ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
-	    ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
-	    ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
-		   sizeof(i16) * nCol +		    /* Index.aiColumn    */
-		   sizeof(enum sort_order) * nCol); /* Index.sort_order  */
-	p = sqlite3DbMallocZero(db, nByte + nExtra);
-	if (p) {
-		char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
-		p->coll_array = (struct coll **)pExtra;
-		pExtra += ROUND8(sizeof(struct coll **) * nCol);
-		p->coll_id_array = (uint32_t *) pExtra;
-		pExtra += ROUND8(sizeof(uint32_t) * nCol);
-		p->aiRowLogEst = (LogEst *) pExtra;
-		pExtra += sizeof(LogEst) * (nCol + 1);
-		p->aiColumn = (i16 *) pExtra;
-		pExtra += sizeof(i16) * nCol;
-		p->sort_order = (enum sort_order *) pExtra;
-		p->nColumn = nCol;
-		*ppExtra = ((char *)p) + nByte;
-	}
+	/* Size of struct Index and aiRowLogEst. */
+	int index_size = ROUND8(sizeof(struct Index));
+	struct Index *p = sqlite3DbMallocZero(db, index_size);
+	if (p != NULL)
+		p->aiRowLogEst = (LogEst *) ((char *)p + ROUND8(sizeof(*p)));
  	return p;
  }
  
@@ -2635,46 +2523,132 @@ addIndexToTable(Index * pIndex, Table * pTab)
  	}
  }
  
-bool
-index_is_unique(Index *idx)
-{
-	assert(idx != NULL);
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct space *space = space_by_id(space_id);
-	assert(space != NULL);
-	struct index *tnt_index = space_index(space, index_id);
-	assert(tnt_index != NULL);
+/**
+ * Create and set index_def in the given Index.
+ *
+ * @param parse Parse context.
+ * @param index Index for which index_def should be created. It is
+ *              used only to set index_def at the end of the
+ *              function.
+ * @param table Table which is indexed by 'index' param.
+ * @param iid Index ID.
+ * @param name Index name.
+ * @param name_len Index name length.
+ * @param is_unique Is given 'index' unique or not.
+ * @param expr_list List of expressions, describe which columns
+ *                  of 'table' are used in index and also their
+ *                  collations, orders, etc.
+ * @param idx_type Index type, one of the following:
+ *                 SQLITE_IDXTYPE_APPDEF     0
+ *                 SQLITE_IDXTYPE_UNIQUE     1
+ *                 SQLITE_IDXTYPE_PRIMARYKEY 2.
+ * @param sql_stmt SQL statement, which creates the index.
+ * @retval 0 Success.
+ * @retval -1 Error.
+ */
+static int
+index_fill_def(struct Parse *parse, struct Index *index,
+	       struct Table *table, uint32_t iid, const char *name,
+	       uint32_t name_len, bool is_unique,
+	       struct ExprList *expr_list, u8 idx_type,
+	       char *sql_stmt)
+{
+	struct space_def *space_def = table->def;
+	struct index_opts opts;
+	index_opts_create(&opts);
+	opts.is_unique = is_unique;
+	opts.sql = sql_stmt;
+	index->def = NULL;
+	int rc = -1;
+
+	struct key_def *key_def = key_def_new(expr_list->nExpr);
+	if (key_def == NULL)
+		goto tnt_error;
+
+	for (int i = 0; i < expr_list->nExpr; i++) {
+		struct Expr *expr = expr_list->a[i].pExpr;
+		sql_resolve_self_reference(parse, table, NC_IdxExpr, expr, 0);
+		if (parse->nErr > 0)
+			goto cleanup;
  
-	return tnt_index->def->opts.is_unique;
+		struct Expr *column_expr = sqlite3ExprSkipCollate(expr);
+		if (column_expr->op != TK_COLUMN) {
+			diag_set(ClientError, ER_UNSUPPORTED, "Tarantool",
+				 "functional indexes");
+			goto tnt_error;
+		}
+
+		uint32_t fieldno = column_expr->iColumn;
+		uint32_t coll_id;
+		struct coll *coll;
+		if (expr->op == TK_COLLATE) {
+			coll = sql_get_coll_seq(parse, expr->u.zToken,
+						&coll_id);
+			if (coll == NULL &&
+			    strcasecmp(expr->u.zToken, "binary") != 0) {
+				diag_set(ClientError, ER_NO_SUCH_COLLATION,
+					 expr->u.zToken);
+				goto tnt_error;
+			}
+		} else {
+			coll = sql_column_collation(space_def, fieldno,
+						    &coll_id);
+		}
+		/*
+		 * Tarantool: DESC indexes are not supported so
+		 * far.
+		 */
+		key_def_set_part(key_def, i, fieldno,
+				 space_def->fields[fieldno].type,
+				 space_def->fields[fieldno].nullable_action,
+				 coll, coll_id, SORT_ORDER_ASC);
+	}
+	if (parse->nErr > 0)
+		goto cleanup;
+
+	struct key_def *pk_key_def;
+	if (idx_type == SQLITE_IDXTYPE_APPDEF)
+		pk_key_def = table->pIndex->def->key_def;
+	else
+		pk_key_def = NULL;
+
+	index->def = index_def_new(space_def->id, iid, name, name_len, TREE,
+				   &opts, key_def, pk_key_def);
+	if (index->def == NULL)
+		goto tnt_error;
+	rc = 0;
+cleanup:
+	if (key_def != NULL)
+		key_def_delete(key_def);
+	return rc;
+tnt_error:
+	parse->rc = SQL_TARANTOOL_ERROR;
+	++parse->nErr;
+	goto cleanup;
  }
  
  void
  sql_create_index(struct Parse *parse, struct Token *token,
  		 struct SrcList *tbl_name, struct ExprList *col_list,
-		 int on_error, struct Token *start, struct Expr *where,
-		 enum sort_order sort_order, bool if_not_exist, u8 idx_type)
-{
-	Table *pTab = 0;	/* Table to be indexed */
-	Index *pIndex = 0;	/* The index to be created */
-	char *zName = 0;	/* Name of the index */
-	int nName;		/* Number of characters in zName */
-	int i, j;
-	DbFixer sFix;		/* For assigning database names to pTable */
-	sqlite3 *db = parse->db;
-	struct ExprList_item *col_listItem;	/* For looping over col_list */
-	int nExtra = 0;		/* Space allocated for zExtra[] */
-	char *zExtra = 0;	/* Extra space after the Index object */
+		 enum on_conflict_action on_error, struct Token *start,
+		 struct Expr *where, enum sort_order sort_order,
+		 bool if_not_exist, u8 idx_type)
+{
+	/* The index to be created. */
+	struct Index *index = NULL;
+	/* Name of the index. */
+	char *name = NULL;
+	struct sqlite3 *db = parse->db;
  	struct session *user_session = current_session();
  
-	if (db->mallocFailed || parse->nErr > 0) {
+	if (db->mallocFailed || parse->nErr > 0)
  		goto exit_create_index;
-	}
-	/* Do not account nested operations: the count of such
-	 * operations depends on Tarantool data dictionary internals,
-	 * such as data layout in system spaces. Also do not account
-	 * PRIMARY KEY and UNIQUE constraint - they had been accounted
-	 * in CREATE TABLE already.
+	/*
+	 * Do not account nested operations: the count of such
+	 * operations depends on Tarantool data dictionary
+	 * internals, such as data layout in system spaces. Also
+	 * do not account PRIMARY KEY and UNIQUE constraint - they
+	 * had been accounted in CREATE TABLE already.
  	 */
  	if (!parse->nested && idx_type == SQLITE_IDXTYPE_APPDEF) {
  		Vdbe *v = sqlite3GetVdbe(parse);
@@ -2685,39 +2659,30 @@ sql_create_index(struct Parse *parse, struct Token *token,
  	assert(db->pSchema != NULL);
  
  	/*
-	 * Find the table that is to be indexed.  Return early if not found.
+	 * Find the table that is to be indexed.
+	 * Return early if not found.
  	 */
+	struct Table *table = NULL;
  	if (tbl_name != NULL) {
-
-		/* Use the two-part index name to determine the database
-		 * to search for the table. 'Fix' the table name to this db
-		 * before looking up the table.
+		/*
+		 * Use the two-part index name to determine the
+		 * database to search for the table. 'Fix' the
+		 * table name to this db before looking up the
+		 * table.
  		 */
-		assert(token && token->z);
-
-		sqlite3FixInit(&sFix, parse, "index", token);
-		if (sqlite3FixSrcList(&sFix, tbl_name)) {
-			/* Because the parser constructs tbl_name from a single identifier,
-			 * sqlite3FixSrcList can never fail.
-			 */
-			assert(0);
-		}
-		pTab = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
-		assert(db->mallocFailed == 0 || pTab == 0);
-		if (pTab == 0)
-			goto exit_create_index;
-		sqlite3PrimaryKeyIndex(pTab);
+		assert(token != NULL && token->z != NULL);
+		table = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
+		assert(db->mallocFailed == 0 || table == NULL);
  	} else {
  		assert(token == NULL);
  		assert(start == NULL);
-		pTab = parse->pNewTable;
-		if (!pTab)
-			goto exit_create_index;
+		table = parse->pNewTable;
  	}
  
-	assert(pTab != 0);
-	assert(parse->nErr == 0);
-	if (pTab->def->opts.is_view) {
+	if (table == NULL || parse->nErr > 0)
+		goto exit_create_index;
+
+	if (table->def->opts.is_view) {
  		sqlite3ErrorMsg(parse, "views may not be indexed");
  		goto exit_create_index;
  	}
@@ -2735,42 +2700,49 @@ sql_create_index(struct Parse *parse, struct Token *token,
  	 * primary key or UNIQUE constraint.  We have to invent
  	 * our own name.
  	 */
-	if (token) {
-		zName = sqlite3NameFromToken(db, token);
-		if (zName == 0)
+	if (token != NULL) {
+		name = sqlite3NameFromToken(db, token);
+		if (name == NULL)
  			goto exit_create_index;
-		assert(token->z != 0);
+		assert(token->z != NULL);
  		if (!db->init.busy) {
-			if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
+			if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
  			    NULL) {
-				sqlite3ErrorMsg(parse,
-						"there is already a table named %s",
-						zName);
+				sqlite3ErrorMsg(parse, "there is already a "\
+						"table named %s", name);
  				goto exit_create_index;
  			}
  		}
-		if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
+		if (sqlite3HashFind(&table->idxHash, name) != NULL) {
  			if (!if_not_exist) {
  				sqlite3ErrorMsg(parse,
  						"index %s.%s already exists",
-						pTab->def->name, zName);
+						table->def->name, name);
  			} else {
  				assert(!db->init.busy);
  			}
  			goto exit_create_index;
  		}
  	} else {
-		int n;
-		Index *pLoop;
-		for (pLoop = pTab->pIndex, n = 1; pLoop;
-		     pLoop = pLoop->pNext, n++) {
+		int n = 1;
+		for (struct Index *idx = table->pIndex; idx != NULL;
+		     idx = idx->pNext, n++) {
  		}
-		zName =
-		    sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
-				   n);
-		if (zName == 0) {
+		name = sqlite3MPrintf(db, "sql_autoindex_%s_%d",
+				      table->def->name, n);
+		if (name == NULL)
  			goto exit_create_index;
-		}
+	}
+
+	bool is_system_space = BOX_SYSTEM_ID_MIN < table->def->id &&
+			       table->def->id < BOX_SYSTEM_ID_MAX;
+	if (is_system_space && idx_type == SQLITE_IDXTYPE_APPDEF) {
+		diag_set(ClientError, ER_MODIFY_INDEX, name,
+			 table->def->name, "creating indexes on system "
+					   "spaces are prohibited");
+		parse->nErr++;
+		parse->rc = SQL_TARANTOOL_ERROR;
+		goto exit_create_index;
  	}
  
  	/*
@@ -2780,12 +2752,12 @@ sql_create_index(struct Parse *parse, struct Token *token,
  	 * simulate this.
  	 */
  	if (col_list == NULL) {
-		Token prevCol;
-		uint32_t last_field = pTab->def->field_count - 1;
-		sqlite3TokenInit(&prevCol, pTab->def->fields[last_field].name);
+		struct Token prev_col;
+		uint32_t last_field = table->def->field_count - 1;
+		sqlite3TokenInit(&prev_col, table->def->fields[last_field].name);
  		col_list = sql_expr_list_append(parse->db, NULL,
  						sqlite3ExprAlloc(db, TK_ID,
-								 &prevCol, 0));
+								 &prev_col, 0));
  		if (col_list == NULL)
  			goto exit_create_index;
  		assert(col_list->nExpr == 1);
@@ -2794,191 +2766,194 @@ sql_create_index(struct Parse *parse, struct Token *token,
  		sqlite3ExprListCheckLength(parse, col_list, "index");
  	}
  
-	/* Figure out how many bytes of space are required to store explicitly
-	 * specified collation sequence names.
-	 */
-	for (i = 0; i < col_list->nExpr; i++) {
-		Expr *pExpr = col_list->a[i].pExpr;
-		assert(pExpr != 0);
-		if (pExpr->op == TK_COLLATE) {
-			nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
-		}
-	}
+	if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
+		goto exit_create_index;
  
-	/*
-	 * Allocate the index structure.
-	 */
-	nName = sqlite3Strlen30(zName);
-	pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
-					    nName + nExtra + 1, &zExtra);
-	if (db->mallocFailed) {
+	index = sql_index_alloc(db);
+	if (index == NULL)
  		goto exit_create_index;
-	}
-	assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
-	assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
-	pIndex->zName = zExtra;
-	zExtra += nName + 1;
-	memcpy(pIndex->zName, zName, nName + 1);
-	pIndex->pTable = pTab;
-	pIndex->onError = (u8) on_error;
+
+	assert(EIGHT_BYTE_ALIGNMENT(index->aiRowLogEst));
+	index->pTable = table;
+	index->onError = (u8) on_error;
  	/*
  	 * Don't make difference between UNIQUE indexes made by user
  	 * using CREATE INDEX statement and those created during
  	 * CREATE TABLE processing.
  	 */
  	if (idx_type == SQLITE_IDXTYPE_APPDEF &&
-	    on_error != ON_CONFLICT_ACTION_NONE) {
-		pIndex->idxType = SQLITE_IDXTYPE_UNIQUE;
-	} else {
-		pIndex->idxType = idx_type;
-	}
-	pIndex->pSchema = db->pSchema;
-	pIndex->nColumn = col_list->nExpr;
-	/* Tarantool have access to each column by any index */
-	if (where) {
-		sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
+	    on_error != ON_CONFLICT_ACTION_NONE)
+		index->idxType = SQLITE_IDXTYPE_UNIQUE;
+	else
+		index->idxType = idx_type;
+	index->pSchema = db->pSchema;
+	/* Tarantool have access to each column by any index. */
+	if (where != NULL) {
+		sql_resolve_self_reference(parse, table, NC_PartIdx, where,
  					   NULL);
-		pIndex->pPartIdxWhere = where;
+		index->pPartIdxWhere = where;
  		where = NULL;
  	}
  
-	/* Analyze the list of expressions that form the terms of the index and
-	 * report any errors.  In the common case where the expression is exactly
-	 * a table column, store that column in aiColumn[].
-	 *
-	 * TODO: Issue a warning if two or more columns of the index are identical.
-	 * TODO: Issue a warning if the table primary key is used as part of the
-	 * index key.
+	/*
+	 * TODO: Issue a warning if two or more columns of the
+	 * index are identical.
+	 * TODO: Issue a warning if the table primary key is used
+	 * as part of the index key.
  	 */
-	for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
-	     i++, col_listItem++) {
-		Expr *pCExpr;	/* The i-th index expression */
-		sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
-					   col_listItem->pExpr, NULL);
-		if (parse->nErr > 0)
-			goto exit_create_index;
-		pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
-		if (pCExpr->op != TK_COLUMN) {
-			sqlite3ErrorMsg(parse,
-					"functional indexes aren't supported "
-					"in the current version");
+
+	char *sql_stmt = "";
+	if (!db->init.busy && tbl_name != NULL) {
+		int n = (int) (parse->sLastToken.z - token->z) +
+			parse->sLastToken.n;
+		if (token->z[n - 1] == ';')
+			n--;
+		sql_stmt = sqlite3MPrintf(db, "CREATE%s INDEX %.*s",
+				       on_error == ON_CONFLICT_ACTION_NONE ?
+				       "" : " UNIQUE", n, token->z);
+		if (db->mallocFailed || sql_stmt == NULL)
  			goto exit_create_index;
-		} else {
-			j = pCExpr->iColumn;
-			assert(j <= 0x7fff);
-			if (j < 0) {
-				j = pTab->iPKey;
-			}
-			pIndex->aiColumn[i] = (i16) j;
-		}
-		struct coll *coll;
-		uint32_t id;
-		if (col_listItem->pExpr->op == TK_COLLATE) {
-			const char *coll_name = col_listItem->pExpr->u.zToken;
-			coll = sql_get_coll_seq(parse, coll_name, &id);
+	}
  
-			if (coll == NULL &&
-			    sqlite3StrICmp(coll_name, "binary") != 0) {
-				goto exit_create_index;
-			}
-		} else if (j >= 0) {
-			coll = sql_column_collation(pTab->def, j, &id);
-		} else {
-			id = COLL_NONE;
-			coll = NULL;
+	/* If it is parsing stage, then iid may have any value. */
+	uint32_t iid = 1;
+	if (db->init.busy)
+		iid = SQLITE_PAGENO_TO_INDEXID(db->init.newTnum);
+
+	bool is_unique = on_error != ON_CONFLICT_ACTION_NONE;
+	if (index_fill_def(parse, index, table, iid, name, strlen(name),
+			   is_unique, col_list, idx_type, sql_stmt) != 0)
+		goto exit_create_index;
+	/*
+	 * Remove all redundant columns from the PRIMARY KEY.
+	 * For example, change "PRIMARY KEY(a,b,a,b,c,b,c,d)" into
+	 * just "PRIMARY KEY(a,b,c,d)". Later code assumes the
+	 * PRIMARY KEY contains no repeated columns.
+	 */
+	struct key_part *parts = index->def->key_def->parts;
+	uint32_t part_count = index->def->key_def->part_count;
+	uint32_t new_part_count = 1;
+	for(uint32_t i = 1; i < part_count; i++) {
+		uint32_t j;
+		for(j = 0; j < new_part_count; j++) {
+			if(parts[i].fieldno == parts[j].fieldno)
+				break;
  		}
-		pIndex->coll_array[i] = coll;
-		pIndex->coll_id_array[i] = id;
  
-		/* Tarantool: DESC indexes are not supported so far.
-		 * See gh-3016.
-		 */
-		pIndex->sort_order[i] = SORT_ORDER_ASC;
+		if (j == new_part_count)
+			parts[new_part_count++] = parts[i];
  	}
-	if (pTab == parse->pNewTable) {
-		/* This routine has been called to create an automatic index as a
-		 * result of a PRIMARY KEY or UNIQUE clause on a column definition, or
-		 * a PRIMARY KEY or UNIQUE clause following the column definitions.
-		 * i.e. one of:
+	index->def->key_def->part_count = new_part_count;
+
+	if (!index_def_is_valid(index->def, table->def->name))
+		goto exit_create_index;
+
+	if (table == parse->pNewTable) {
+		/*
+		 * This routine has been called to create an
+		 * automatic index as a result of a PRIMARY KEY or
+		 * UNIQUE clause on a column definition, or
+		 * a PRIMARY KEY or UNIQUE clause following the
+		 * column definitions. i.e. one of:
  		 *
  		 * CREATE TABLE t(x PRIMARY KEY, y);
  		 * CREATE TABLE t(x, y, UNIQUE(x, y));
  		 *
-		 * Either way, check to see if the table already has such an index. If
-		 * so, don't bother creating this one. This only applies to
-		 * automatically created indices. Users can do as they wish with
+		 * Either way, check to see if the table already
+		 * has such an index. If so, don't bother creating
+		 * this one. This only applies to automatically
+		 * created indices. Users can do as they wish with
  		 * explicit indices.
  		 *
-		 * Two UNIQUE or PRIMARY KEY constraints are considered equivalent
-		 * (and thus suppressing the second one) even if they have different
+		 * Two UNIQUE or PRIMARY KEY constraints are
+		 * considered equivalent (and thus suppressing
+		 * the second one) even if they have different
  		 * sort orders.
  		 *
-		 * If there are different collating sequences or if the columns of
-		 * the constraint occur in different orders, then the constraints are
-		 * considered distinct and both result in separate indices.
+		 * If there are different collating sequences or
+		 * if the columns of the constraint occur in
+		 * different orders, then the constraints are
+		 * considered distinct and both result in separate
+		 * indices.
  		 */
-		Index *pIdx;
-		for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-			int k;
-			assert(IsUniqueIndex(pIdx));
-			assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
-			assert(IsUniqueIndex(pIndex));
-
-			if (pIdx->nColumn != pIndex->nColumn)
+		for (struct Index *idx = table->pIndex; idx != NULL;
+		     idx = idx->pNext) {
+			uint32_t k;
+			assert(IsUniqueIndex(idx));
+			assert(idx->idxType != SQLITE_IDXTYPE_APPDEF);
+			assert(IsUniqueIndex(index));
+
+			if (idx->def->key_def->part_count !=
+			    index->def->key_def->part_count)
  				continue;
-			for (k = 0; k < pIdx->nColumn; k++) {
-				assert(pIdx->aiColumn[k] >= 0);
-				if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
+			for (k = 0; k < idx->def->key_def->part_count; k++) {
+				if (idx->def->key_def->parts[k].fieldno !=
+				    index->def->key_def->parts[k].fieldno)
  					break;
  				struct coll *coll1, *coll2;
-				uint32_t id;
-				coll1 = sql_index_collation(pIdx, k, &id);
-				coll2 = sql_index_collation(pIndex, k, &id);
+				coll1 = idx->def->key_def->parts[k].coll;
+				coll2 = index->def->key_def->parts[k].coll;
  				if (coll1 != coll2)
  					break;
  			}
-			if (k == pIdx->nColumn) {
-				if (pIdx->onError != pIndex->onError) {
-					/* This constraint creates the same index as a previous
-					 * constraint specified somewhere in the CREATE TABLE statement.
-					 * However the ON CONFLICT clauses are different. If both this
-					 * constraint and the previous equivalent constraint have explicit
-					 * ON CONFLICT clauses this is an error. Otherwise, use the
-					 * explicitly specified behavior for the index.
+			if (k == idx->def->key_def->part_count) {
+				if (idx->onError != index->onError) {
+					/*
+					 * This constraint creates
+					 * the same index as a
+					 * previous
+					 * constraint specified
+					 * somewhere in the CREATE
+					 * TABLE statement.
+					 * However the ON CONFLICT
+					 * clauses are different.
+					 * If both this constraint
+					 * and the previous
+					 * equivalent constraint
+					 * have explicit
+					 * ON CONFLICT clauses
+					 * this is an error.
+					 * Otherwise, use the
+					 * explicitly specified
+					 * behavior for the index.
  					 */
-					if (!
-					    (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT
-					     || pIndex->onError ==
-					     ON_CONFLICT_ACTION_DEFAULT)) {
+					if (idx->onError !=
+					    ON_CONFLICT_ACTION_DEFAULT &&
+					    index->onError !=
+					    ON_CONFLICT_ACTION_DEFAULT) {
  						sqlite3ErrorMsg(parse,
-								"conflicting ON CONFLICT clauses specified",
-								0);
-					}
-					if (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT) {
-						pIdx->onError = pIndex->onError;
+								"conflicting "\
+								"ON CONFLICT "\
+								"clauses "\
+								"specified");
  					}
+					if (idx->onError ==
+					    ON_CONFLICT_ACTION_DEFAULT)
+						idx->onError = index->onError;
  				}
  				if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
-					pIdx->idxType = idx_type;
+					idx->idxType = idx_type;
  				goto exit_create_index;
  			}
  		}
  	}
  
-	/* Link the new Index structure to its table and to the other
-	 * in-memory database structures.
+	/*
+	 * Link the new Index structure to its table and to the
+	 * other in-memory database structures.
  	 */
  	assert(parse->nErr == 0);
  	if (db->init.busy) {
-		Index *p;
-		p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
-		if (p) {
-			assert(p == pIndex);	/* Malloc must have failed */
+		struct Index *p = sqlite3HashInsert(&table->idxHash,
+						    index->def->name, index);
+		if (p != NULL) {
+			/* Malloc must have failed. */
+			assert(p == index);
  			sqlite3OomFault(db);
  			goto exit_create_index;
  		}
  		user_session->sql_flags |= SQLITE_InternChanges;
-		pIndex->tnum = db->init.newTnum;
+		index->tnum = db->init.newTnum;
  	}
  
  	/*
@@ -2996,117 +2971,72 @@ sql_create_index(struct Parse *parse, struct Token *token,
  	 * initialization step can be skipped.
  	 */
  	else if (tbl_name != NULL) {
-		Vdbe *v;
-		char *zStmt;
-		int iCursor = parse->nTab++;
+		Vdbe *vdbe;
+		int cursor = parse->nTab++;
  		int index_space_ptr_reg = parse->nTab++;
-		int iSpaceId, iIndexId, iFirstSchemaCol;
+		int space_id, index_id, first_schema_col;
  
-		v = sqlite3GetVdbe(parse);
-		if (v == 0)
+		vdbe = sqlite3GetVdbe(parse);
+		if (vdbe == 0)
  			goto exit_create_index;
  
  		sql_set_multi_write(parse, true);
  
  
-		sqlite3VdbeAddOp2(v, OP_SIDtoPtr, BOX_INDEX_ID,
+		sqlite3VdbeAddOp2(vdbe, OP_SIDtoPtr, BOX_INDEX_ID,
  				  index_space_ptr_reg);
-		sqlite3VdbeAddOp4Int(v, OP_OpenWrite, iCursor, 0,
+		sqlite3VdbeAddOp4Int(vdbe, OP_OpenWrite, cursor, 0,
  				     index_space_ptr_reg, 6);
-		sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);
+		sqlite3VdbeChangeP5(vdbe, OPFLAG_SEEKEQ);
  
  		/*
  		 * Gather the complete text of the CREATE INDEX
-		 * statement into the zStmt variable
+		 * statement into the sql_stmt variable.
  		 */
  		assert(start != NULL);
-		int n = (int)(parse->sLastToken.z - token->z) +
-			parse->sLastToken.n;
-		if (token->z[n - 1] == ';')
-			n--;
-		/* A named index with an explicit CREATE INDEX statement */
-		zStmt = sqlite3MPrintf(db, "CREATE%s INDEX %.*s", on_error ==
-				       ON_CONFLICT_ACTION_NONE ? "" : " UNIQUE",
-				       n, token->z);
-
-		iSpaceId = SQLITE_PAGENO_TO_SPACEID(pTab->tnum);
-		iIndexId = getNewIid(parse, iSpaceId, iCursor);
-		sqlite3VdbeAddOp1(v, OP_Close, iCursor);
-		createIndex(parse, pIndex, iSpaceId, iIndexId, zStmt);
-
-		/* consumes zStmt */
-		iFirstSchemaCol =
-		    makeIndexSchemaRecord(parse, pIndex, iSpaceId, iIndexId,
-					  zStmt);
-
-		/* Reparse the schema. Code an OP_Expire
+		space_id = SQLITE_PAGENO_TO_SPACEID(table->tnum);
+		index_id = getNewIid(parse, space_id, cursor);
+		sqlite3VdbeAddOp1(vdbe, OP_Close, cursor);
+		createIndex(parse, index, space_id, index_id, sql_stmt);
+
+		/* Consumes sql_stmt. */
+		first_schema_col = makeIndexSchemaRecord(parse, index,
+							 space_id, index_id,
+							 sql_stmt);
+
+		/*
+		 * Reparse the schema. Code an OP_Expire
  		 * to invalidate all pre-compiled statements.
  		 */
  		sqlite3ChangeCookie(parse);
-		sqlite3VdbeAddParseSchema2Op(v, iFirstSchemaCol, 4);
-		sqlite3VdbeAddOp0(v, OP_Expire);
+		sqlite3VdbeAddParseSchema2Op(vdbe, first_schema_col, 4);
+		sqlite3VdbeAddOp0(vdbe, OP_Expire);
  	}
  
-	/* When adding an index to the list of indexes for a table, we
-	 * maintain special order of the indexes in the list:
+	/*
+	 * When adding an index to the list of indexes for a table,
+	 * we maintain special order of the indexes in the list:
  	 * 1. PK (go first just for simplicity)
  	 * 2. ON_CONFLICT_ACTION_REPLACE indexes
  	 * 3. ON_CONFLICT_ACTION_IGNORE indexes
  	 * This is necessary for the correct constraint check
-	 * processing (in sqlite3GenerateConstraintChecks()) as part of
-	 * UPDATE and INSERT statements.
+	 * processing (in sqlite3GenerateConstraintChecks()) as
+	 * part of UPDATE and INSERT statements.
  	 */
  
  	if (!db->init.busy && tbl_name != NULL)
  		goto exit_create_index;
-	addIndexToTable(pIndex, pTab);
-	pIndex = NULL;
+	addIndexToTable(index, table);
+	index = NULL;
  
-	/* Clean up before exiting */
+	/* Clean up before exiting. */
   exit_create_index:
-	if (pIndex)
-		freeIndex(db, pIndex);
+	if (index != NULL)
+		freeIndex(db, index);
  	sql_expr_delete(db, where, false);
  	sql_expr_list_delete(db, col_list);
  	sqlite3SrcListDelete(db, tbl_name);
-	sqlite3DbFree(db, zName);
-}
-
-/**
- * Return number of columns in given index.
- * If space is ephemeral, use internal
- * SQL structure to fetch the value.
- */
-uint32_t
-index_column_count(const Index *idx)
-{
-	assert(idx != NULL);
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-	struct space *space = space_by_id(space_id);
-	/* It is impossible to find an ephemeral space by id. */
-	if (space == NULL)
-		return idx->nColumn;
-
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct index *index = space_index(space, index_id);
-	assert(index != NULL);
-	return index->def->key_def->part_count;
-}
-
-/** Return true if given index is unique and not nullable. */
-bool
-index_is_unique_not_null(const Index *idx)
-{
-	assert(idx != NULL);
-	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-	struct space *space = space_by_id(space_id);
-	assert(space != NULL);
-
-	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-	struct index *index = space_index(space, index_id);
-	assert(index != NULL);
-	return (index->def->opts.is_unique &&
-		!index->def->key_def->is_nullable);
+	sqlite3DbFree(db, name);
  }
  
  void
@@ -3296,7 +3226,7 @@ sqlite3IdListIndex(IdList * pList, const char *zName)
  }
  
  /*
- * Expand the space allocated for the given SrcList object by
+ * Expand the space allocazted for the given SrcList object by
   * creating nExtra new slots beginning at iStart.  iStart is zero based.
   * New slots are zeroed.
   *
@@ -3732,9 +3662,9 @@ parser_emit_unique_constraint(struct Parse *parser,
  	const struct space_def *def = index->pTable->def;
  	StrAccum err_accum;
  	sqlite3StrAccumInit(&err_accum, parser->db, 0, 0, 200);
-	for (int j = 0; j < index->nColumn; ++j) {
-		assert(index->aiColumn[j] >= 0);
-		const char *col_name = def->fields[index->aiColumn[j]].name;
+	struct key_part *part = index->def->key_def->parts;
+	for (uint32_t j = 0; j < index->def->key_def->part_count; ++j, ++part) {
+		const char *col_name = def->fields[part->fieldno].name;
  		if (j != 0)
  			sqlite3StrAccumAppend(&err_accum, ", ", 2);
  		sqlite3XPrintf(&err_accum, "%s.%s", def->name, col_name);
@@ -3755,11 +3685,11 @@ static bool
  collationMatch(struct coll *coll, struct Index *index)
  {
  	assert(coll != NULL);
-	for (int i = 0; i < index->nColumn; i++) {
-		uint32_t id;
-		struct coll *idx_coll = sql_index_collation(index, i, &id);
-		assert(idx_coll != 0 || index->aiColumn[i] < 0);
-		if (index->aiColumn[i] >= 0 && coll == idx_coll)
+	struct key_part *part = index->def->key_def->parts;
+	for (uint32_t i = 0; i < index->def->key_def->part_count; i++, part++) {
+		struct coll *idx_coll = part->coll;
+		assert(idx_coll != NULL);
+		if (coll == idx_coll)
  			return true;
  	}
  	return false;
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 5a799714d..5a7cf7652 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -268,11 +268,12 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
  
  		/* Extract the primary key for the current row */
  		if (!is_view) {
-			for (int i = 0; i < pk_len; i++) {
+			struct key_part *part = pk_def->parts;
+			for (int i = 0; i < pk_len; i++, part++) {
  				struct space_def *def = space->def;
  				sqlite3ExprCodeGetColumnOfTable(v, def,
  								tab_cursor,
-								pk_def->parts[i].fieldno,
+								part->fieldno,
  								reg_pk + i);
  			}
  		} else {
@@ -568,13 +569,14 @@ sql_generate_index_key(struct Parse *parse, struct Index *index, int cursor,
  			*part_idx_label = 0;
  		}
  	}
-	int col_cnt = index_column_count(index);
+	int col_cnt = index->def->key_def->part_count;
  	int reg_base = sqlite3GetTempRange(parse, col_cnt);
  	if (prev != NULL && (reg_base != reg_prev ||
  			     prev->pPartIdxWhere != NULL))
  		prev = NULL;
  	for (int j = 0; j < col_cnt; j++) {
-		if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]) {
+		if (prev != NULL && prev->def->key_def->parts[j].fieldno ==
+				    index->def->key_def->parts[j].fieldno) {
  			/*
  			 * This column was already computed by the
  			 * previous index.
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 3183e3dc7..6ca76fcd2 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2405,21 +2405,28 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
  			     pIdx = pIdx->pNext) {
  				Bitmask colUsed; /* Columns of the index used */
  				Bitmask mCol;	/* Mask for the current column */
-				if (pIdx->nColumn < nExpr)
+				uint32_t part_count =
+					pIdx->def->key_def->part_count;
+				struct key_part *parts =
+					pIdx->def->key_def->parts;
+				if ((int)part_count < nExpr)
  					continue;
  				/* Maximum nColumn is BMS-2, not BMS-1, so that we can compute
  				 * BITMASK(nExpr) without overflowing
  				 */
-				testcase(pIdx->nColumn == BMS - 2);
-				testcase(pIdx->nColumn == BMS - 1);
-				if (pIdx->nColumn >= BMS - 1)
+				testcase(part_count == BMS - 2);
+				testcase(part_count == BMS - 1);
+				if (part_count >= BMS - 1)
+					continue;
+				if (mustBeUnique &&
+				    ((int)part_count > nExpr ||
+				     !pIdx->def->opts.is_unique)) {
+					/*
+					 * This index is not
+					 * unique over the IN RHS
+					 * columns.
+					 */
  					continue;
-				if (mustBeUnique) {
-					if (pIdx->nColumn > nExpr
-					    || (pIdx->nColumn > nExpr
-					    && !index_is_unique(pIdx))) {
-							continue;	/* This index is not unique over the IN RHS columns */
-					}
  				}
  
  				colUsed = 0;	/* Columns of index used so far */
@@ -2432,16 +2439,15 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
  					int j;
  
  					for (j = 0; j < nExpr; j++) {
-						if (pIdx->aiColumn[j] !=
-						    pRhs->iColumn) {
+						if ((int) parts[j].fieldno !=
+						    pRhs->iColumn)
  							continue;
-						}
-						struct coll *idx_coll;
-						idx_coll = sql_index_collation(pIdx, j, &id);
+
+						struct coll *idx_coll =
+							     parts[j].coll;
  						if (pReq != NULL &&
-						    pReq != idx_coll) {
+						    pReq != idx_coll)
  							continue;
-						}
  						break;
  					}
  					if (j == nExpr)
@@ -2466,18 +2472,17 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
  							  0, 0, 0,
  							  sqlite3MPrintf(db,
  							  "USING INDEX %s FOR IN-OPERATOR",
-							  pIdx->zName),
+							  pIdx->def->name),
  							  P4_DYNAMIC);
  					struct space *space =
  						space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
  					vdbe_emit_open_cursor(pParse, iTab,
  							      pIdx->tnum, space);
-					VdbeComment((v, "%s", pIdx->zName));
+					VdbeComment((v, "%s", pIdx->def->name));
  					assert(IN_INDEX_INDEX_DESC ==
  					       IN_INDEX_INDEX_ASC + 1);
  					eType = IN_INDEX_INDEX_ASC +
-						sql_index_column_sort_order(pIdx,
-									    0);
+						parts[0].sort_order;
  
  					if (prRhsHasNull) {
  #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -2499,7 +2504,7 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
  							/* Tarantool: Check for null is performed on first key of the index.  */
  							sqlite3SetHasNullFlag(v,
  									      iTab,
-									      pIdx->aiColumn[0],
+									      parts[0].fieldno,
  									      *prRhsHasNull);
  						}
  					}
@@ -3137,12 +3142,12 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
  		struct Index *pk = sqlite3PrimaryKeyIndex(tab);
  		assert(pk);
  
+		uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
  		enum affinity_type affinity =
-			tab->def->fields[pk->aiColumn[0]].affinity;
-		if (pk->nColumn == 1
-		    && affinity == AFFINITY_INTEGER
-		    && pk->aiColumn[0] < nVector) {
-			int reg_pk = rLhs + pk->aiColumn[0];
+			tab->def->fields[fieldno].affinity;
+		if (pk->def->key_def->part_count == 1 &&
+		    affinity == AFFINITY_INTEGER && (int)fieldno < nVector) {
+			int reg_pk = rLhs + (int)fieldno;
  			sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
  		}
  	}
@@ -3474,7 +3479,7 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,	/* The parsing context */
  			       int regOut	/* Store the index column value in this register */
      )
  {
-	i16 iTabCol = pIdx->aiColumn[iIdxCol];
+	i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
  	sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
  					iTabCur, iTabCol, regOut);
  }
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 6c75c4772..c63419810 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -213,7 +213,6 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
  		     int **paiCol	/* OUT: Map of index columns in pFKey */
      )
  {
-	Index *pIdx = 0;	/* Value to return via *ppIdx */
  	int *aiCol = 0;		/* Value to return via *paiCol */
  	int nCol = pFKey->nCol;	/* Number of columns in parent key */
  	char *zKey = pFKey->aCol[0].zCol;	/* Name of left-most parent key column */
@@ -255,83 +254,86 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
  		*paiCol = aiCol;
  	}
  
-	for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
-		int nIdxCol = index_column_count(pIdx);
-		if (nIdxCol == nCol && index_is_unique(pIdx)
-		    && pIdx->pPartIdxWhere == 0) {
-			/* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the right number
-			 * of columns. If each indexed column corresponds to a foreign key
-			 * column of pFKey, then this index is a winner.
+	struct Index *index = NULL;
+	for (index = pParent->pIndex; index != NULL; index = index->pNext) {
+		int part_count = index->def->key_def->part_count;
+		if (part_count != nCol || !index->def->opts.is_unique ||
+		    index->pPartIdxWhere != NULL)
+			continue;
+		/*
+		 * Index is a UNIQUE index (or a PRIMARY KEY) and
+		 * has the right number of columns. If each
+		 * indexed column corresponds to a foreign key
+		 * column of pFKey, then this index is a winner.
+		 */
+		if (zKey == NULL) {
+			/*
+			 * If zKey is NULL, then this foreign key
+			 * is implicitly mapped to the PRIMARY KEY
+			 * of table pParent. The PRIMARY KEY index
+			 * may be identified by the test.
  			 */
-
-			if (zKey == 0) {
-				/* If zKey is NULL, then this foreign key is implicitly mapped to
-				 * the PRIMARY KEY of table pParent. The PRIMARY KEY index may be
-				 * identified by the test.
-				 */
-				if (IsPrimaryKeyIndex(pIdx)) {
-					if (aiCol) {
-						int i;
-						for (i = 0; i < nCol; i++)
-							aiCol[i] =
-							    pFKey->aCol[i].
-							    iFrom;
-					}
-					break;
+			if (IsPrimaryKeyIndex(index)) {
+				if (aiCol != NULL) {
+					for (int i = 0; i < nCol; i++)
+						aiCol[i] = pFKey->aCol[i].iFrom;
  				}
-			} else {
-				/* If zKey is non-NULL, then this foreign key was declared to
-				 * map to an explicit list of columns in table pParent. Check if this
-				 * index matches those columns. Also, check that the index uses
-				 * the default collation sequences for each column.
+				break;
+			}
+		} else {
+			/*
+			 * If zKey is non-NULL, then this foreign
+			 * key was declared to map to an explicit
+			 * list of columns in table pParent. Check
+			 * if this index matches those columns.
+			 * Also, check that the index uses the
+			 * default collation sequences for each
+			 * column.
+			 */
+			int i, j;
+			struct key_part *part = index->def->key_def->parts;
+			for (i = 0; i < nCol; i++, part++) {
+				/*
+				 * Index of column in parent
+				 * table.
  				 */
-				int i, j;
-				for (i = 0; i < nCol; i++) {
-					i16 iCol = pIdx->aiColumn[i];	/* Index of column in parent tbl */
-					char *zIdxCol;	/* Name of indexed column */
-
-					if (iCol < 0)
-						break;	/* No foreign keys against expression indexes */
-
-					/* If the index uses a collation sequence that is different from
-					 * the default collation sequence for the column, this index is
-					 * unusable. Bail out early in this case.
-					 */
-					struct coll *def_coll;
-					uint32_t id;
-					def_coll = sql_column_collation(pParent->def,
-									iCol,
-									&id);
-					struct coll *coll =
-						sql_index_collation(pIdx, i,
-								    &id);
-					if (def_coll != coll)
-						break;
-
-					zIdxCol =
-						pParent->def->fields[iCol].name;
-					for (j = 0; j < nCol; j++) {
-						if (strcmp
-						    (pFKey->aCol[j].zCol,
-						     zIdxCol) == 0) {
-							if (aiCol)
-								aiCol[i] =
-								    pFKey->
-								    aCol[j].
-								    iFrom;
-							break;
-						}
-					}
-					if (j == nCol)
-						break;
+				i16 iCol = (int) part->fieldno;
+				/*
+				 * If the index uses a collation
+				 * sequence that is different from
+				 * the default collation sequence
+				 * for the column, this index is
+				 * unusable. Bail out early in
+				 * this case.
+				 */
+				uint32_t id;
+				struct coll *def_coll =
+					sql_column_collation(pParent->def,
+							     iCol, &id);
+				struct coll *coll = part->coll;
+				if (def_coll != coll)
+					break;
+
+				char *zIdxCol = pParent->def->fields[iCol].name;
+				for (j = 0; j < nCol; j++) {
+					if (strcmp(pFKey->aCol[j].zCol,
+						   zIdxCol) != 0)
+						continue;
+					if (aiCol)
+						aiCol[i] = pFKey->aCol[j].iFrom;
+					break;
  				}
-				if (i == nCol)
-					break;	/* pIdx is usable */
+				if (j == nCol)
+					break;
+			}
+			if (i == nCol) {
+				/* Index is usable. */
+				break;
  			}
  		}
  	}
  
-	if (!pIdx) {
+	if (index == NULL) {
  		if (!pParse->disableTriggers) {
  			sqlite3ErrorMsg(pParse,
  					"foreign key mismatch - \"%w\" referencing \"%w\"",
@@ -341,7 +343,7 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
  		return 1;
  	}
  
-	*ppIdx = pIdx;
+	*ppIdx = index;
  	return 0;
  }
  
@@ -460,14 +462,15 @@ fkLookupParent(Parse * pParse,	/* Parse context */
  			 */
  			if (pTab == pFKey->pFrom && nIncr == 1) {
  				int iJump =
-				    sqlite3VdbeCurrentAddr(v) + nCol + 1;
-				for (i = 0; i < nCol; i++) {
+					sqlite3VdbeCurrentAddr(v) + nCol + 1;
+				struct key_part *part =
+					pIdx->def->key_def->parts;
+				for (i = 0; i < nCol; ++i, ++part) {
  					int iChild = aiCol[i] + 1 + regData;
-					int iParent =
-					    pIdx->aiColumn[i] + 1 + regData;
-					assert(pIdx->aiColumn[i] >= 0);
+					int iParent = 1 + regData +
+						      (int)part->fieldno;
  					assert(aiCol[i] != pTab->iPKey);
-					if (pIdx->aiColumn[i] == pTab->iPKey) {
+					if ((int)part->fieldno == pTab->iPKey) {
  						/* The parent key is a composite key that includes the IPK column */
  						iParent = regData;
  					}
@@ -614,16 +617,15 @@ fkScanChildren(Parse * pParse,	/* Parse context */
      )
  {
  	sqlite3 *db = pParse->db;	/* Database handle */
-	int i;			/* Iterator variable */
  	Expr *pWhere = 0;	/* WHERE clause to scan with */
  	NameContext sNameContext;	/* Context used to resolve WHERE clause */
  	WhereInfo *pWInfo;	/* Context used by sqlite3WhereXXX() */
  	int iFkIfZero = 0;	/* Address of OP_FkIfZero */
  	Vdbe *v = sqlite3GetVdbe(pParse);
  
-	assert(pIdx == 0 || pIdx->pTable == pTab);
-	assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
-	assert(pIdx != 0);
+	assert(pIdx == NULL || pIdx->pTable == pTab);
+	assert(pIdx == NULL || (int) pIdx->def->key_def->part_count == pFKey->nCol);
+	assert(pIdx != NULL);
  
  	if (nIncr < 0) {
  		iFkIfZero =
@@ -639,19 +641,20 @@ fkScanChildren(Parse * pParse,	/* Parse context */
  	 * the parent key columns. The affinity of the parent key column should
  	 * be applied to each child key value before the comparison takes place.
  	 */
-	for (i = 0; i < pFKey->nCol; i++) {
+	for (int i = 0; i < pFKey->nCol; i++) {
  		Expr *pLeft;	/* Value from parent table row */
  		Expr *pRight;	/* Column ref to child table */
  		Expr *pEq;	/* Expression (pLeft = pRight) */
  		i16 iCol;	/* Index of column in child table */
-		const char *zCol;	/* Name of column in child table */
+		const char *column_name;
  
-		iCol = pIdx ? pIdx->aiColumn[i] : -1;
+		iCol = pIdx != NULL ?
+		       (int) pIdx->def->key_def->parts[i].fieldno : -1;
  		pLeft = exprTableRegister(pParse, pTab, regData, iCol);
  		iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
  		assert(iCol >= 0);
-		zCol = pFKey->pFrom->def->fields[iCol].name;
-		pRight = sqlite3Expr(db, TK_ID, zCol);
+		column_name = pFKey->pFrom->def->fields[iCol].name;
+		pRight = sqlite3Expr(db, TK_ID, column_name);
  		pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
  		pWhere = sqlite3ExprAnd(db, pWhere, pEq);
  	}
@@ -670,15 +673,14 @@ fkScanChildren(Parse * pParse,	/* Parse context */
  
  		Expr *pEq, *pAll = 0;
  		Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-		assert(pIdx != 0);
-		int col_count = index_column_count(pPk);
-		for (i = 0; i < col_count; i++) {
-			i16 iCol = pIdx->aiColumn[i];
-			assert(iCol >= 0);
-			pLeft = exprTableRegister(pParse, pTab, regData, iCol);
-			pRight =
-				exprTableColumn(db, pTab->def,
-						pSrc->a[0].iCursor, iCol);
+		assert(pIdx != NULL);
+		uint32_t part_count = pPk->def->key_def->part_count;
+		for (uint32_t i = 0; i < part_count; i++) {
+			uint32_t fieldno = pIdx->def->key_def->parts[i].fieldno;
+			pLeft = exprTableRegister(pParse, pTab, regData,
+						  fieldno);
+			pRight = exprTableColumn(db, pTab->def,
+						 pSrc->a[0].iCursor, fieldno);
  			pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
  			pAll = sqlite3ExprAnd(db, pAll, pEq);
  		}
@@ -983,7 +985,6 @@ sqlite3FkCheck(Parse * pParse,	/* Parse context */
  			if (aiCol[i] == pTab->iPKey) {
  				aiCol[i] = -1;
  			}
-			assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
  		}
  
  		pParse->nTab++;
@@ -1108,19 +1109,19 @@ sqlite3FkOldmask(Parse * pParse,	/* Parse context */
  
  	if (user_session->sql_flags & SQLITE_ForeignKeys) {
  		FKey *p;
-		int i;
  		for (p = pTab->pFKey; p; p = p->pNextFrom) {
-			for (i = 0; i < p->nCol; i++)
+			for (int i = 0; i < p->nCol; i++)
  				mask |= COLUMN_MASK(p->aCol[i].iFrom);
  		}
  		for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
  			Index *pIdx = 0;
  			sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
-			if (pIdx) {
-				int nIdxCol = index_column_count(pIdx);
-				for (i = 0; i < nIdxCol; i++) {
-					assert(pIdx->aiColumn[i] >= 0);
-					mask |= COLUMN_MASK(pIdx->aiColumn[i]);
+			if (pIdx != NULL) {
+				uint32_t part_count =
+					pIdx->def->key_def->part_count;
+				for (uint32_t i = 0; i < part_count; i++) {
+					mask |= COLUMN_MASK(pIdx->def->
+						key_def->parts[i].fieldno);
  				}
  			}
  		}
@@ -1264,11 +1265,12 @@ fkActionTrigger(struct Parse *pParse, struct Table *pTab, struct FKey *pFKey,
  			       || (pTab->iPKey >= 0
  				   && pTab->iPKey <
  				      (int)pTab->def->field_count));
-			assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
+
+			uint32_t fieldno = pIdx != NULL ?
+					   pIdx->def->key_def->parts[i].fieldno :
+					   (uint32_t)pTab->iPKey;
  			sqlite3TokenInit(&tToCol,
-					 pTab->def->fields[pIdx ? pIdx->
-						    aiColumn[i] : pTab->iPKey].
-					 name);
+					 pTab->def->fields[fieldno].name);
  			sqlite3TokenInit(&tFromCol,
  					 pFKey->pFrom->def->fields[
  						iFromCol].name);
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index c12043bde..50c2e8634 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -89,14 +89,14 @@ sqlite3IndexAffinityStr(sqlite3 *db, Index *index)
  	 * sqliteDeleteIndex() when the Index structure itself is
  	 * cleaned up.
  	 */
-	int column_count = index_column_count(index);
+	int column_count = index->def->key_def->part_count;
  	index->zColAff = (char *) sqlite3DbMallocRaw(0, column_count + 1);
  	if (index->zColAff == NULL) {
  		sqlite3OomFault(db);
  		return NULL;
  	}
  	for (int n = 0; n < column_count; n++) {
-		uint16_t x = index->aiColumn[n];
+		uint16_t x = index->def->key_def->parts[n].fieldno;
  		index->zColAff[n] = index->pTable->def->fields[x].affinity;
  	}
  	index->zColAff[column_count] = 0;
@@ -647,7 +647,7 @@ sqlite3Insert(Parse * pParse,	/* Parser context */
  		     pIdx = pIdx->pNext, i++) {
  			assert(pIdx);
  			aRegIdx[i] = ++pParse->nMem;
-			pParse->nMem += index_column_count(pIdx);
+			pParse->nMem += pIdx->def->key_def->part_count;
  		}
  	}
  
@@ -1069,12 +1069,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  	Index *pIdx;		/* Pointer to one of the indices */
  	Index *pPk = 0;		/* The PRIMARY KEY index */
  	sqlite3 *db;		/* Database connection */
-	int i;			/* loop counter */
-	int ix;			/* Index loop counter */
-	int nCol;		/* Number of columns */
  	int addr1;		/* Address of jump instruction */
  	int seenReplace = 0;	/* True if REPLACE is used to resolve INT PK conflict */
-	int nPkField;		/* Number of fields in PRIMARY KEY. */
  	u8 isUpdate;		/* True if this is an UPDATE operation */
  	u8 bAffinityDone = 0;	/* True if the OP_Affinity operation has been run */
  	struct session *user_session = current_session();
@@ -1086,10 +1082,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  	struct space_def *def = pTab->def;
  	/* This table is not a VIEW */
  	assert(!def->opts.is_view);
-	nCol = def->field_count;
  
  	pPk = sqlite3PrimaryKeyIndex(pTab);
-	nPkField = index_column_count(pPk);
  
  	/* Record that this module has started */
  	VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
@@ -1099,17 +1093,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  	enum on_conflict_action on_error;
  	/* Test all NOT NULL constraints.
  	 */
-	for (i = 0; i < nCol; i++) {
-		if (i == pTab->iPKey) {
+	for (uint32_t i = 0; i < def->field_count; i++) {
+		if ((int) i == pTab->iPKey)
  			continue;
-		}
  		if (aiChng && aiChng[i] < 0) {
  			/* Don't bother checking for NOT NULL on columns that do not change */
  			continue;
  		}
  		if (def->fields[i].is_nullable ||
  		    (pTab->tabFlags & TF_Autoincrement &&
-		     pTab->iAutoIncPKey == i))
+		     pTab->iAutoIncPKey == (int) i))
  			continue;	/* This column is allowed to be NULL */
  
  		on_error = table_column_nullable_action(pTab, i);
@@ -1179,7 +1172,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		else
  			on_error = ON_CONFLICT_ACTION_ABORT;
  
-		for (i = 0; i < checks->nExpr; i++) {
+		for (int i = 0; i < checks->nExpr; i++) {
  			int allOk;
  			Expr *pExpr = checks->a[i].pExpr;
  			if (aiChng
@@ -1206,13 +1199,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		}
  	}
  
-	/* Test all UNIQUE constraints by creating entries for each UNIQUE
-	 * index and making sure that duplicate entries do not already exist.
-	 * Compute the revised record entries for indices as we go.
+	/*
+	 * Test all UNIQUE constraints by creating entries for
+	 * each UNIQUE index and making sure that duplicate entries
+	 * do not already exist. Compute the revised record entries
+	 * for indices as we go.
  	 *
  	 * This loop also handles the case of the PRIMARY KEY index.
  	 */
-	for (ix = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, ix++) {
+	pIdx = pTab->pIndex;
+	for (int ix = 0; pIdx != NULL; pIdx = pIdx->pNext, ix++) {
  		int regIdx;	/* Range of registers hold conent for pIdx */
  		int regR;	/* Range of registers holding conflicting PK */
  		int iThisCur;	/* Cursor for this UNIQUE index */
@@ -1253,10 +1249,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		 * the insert or update.  Store that record in the aRegIdx[ix] register
  		 */
  		regIdx = aRegIdx[ix] + 1;
-		int nIdxCol = (int) index_column_count(pIdx);
+		uint32_t part_count = pIdx->def->key_def->part_count;
  		if (uniqueByteCodeNeeded) {
-			for (i = 0; i < nIdxCol; ++i) {
-				int fieldno = pIdx->aiColumn[i];
+			for (uint32_t i = 0; i < part_count; ++i) {
+				uint32_t fieldno =
+					pIdx->def->key_def->parts[i].fieldno;
  				int reg;
  				/*
  				 * OP_SCopy copies value in
@@ -1267,11 +1264,10 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  				 * needed for proper UNIQUE
  				 * constraint handling.
  				 */
-				if (fieldno == pTab->iPKey)
+				if ((int) fieldno == pTab->iPKey)
  					reg = regNewData;
  				else
  					reg = fieldno + regNewData + 1;
-				assert(fieldno >= 0);
  				sqlite3VdbeAddOp2(v, OP_SCopy, reg, regIdx + i);
  				VdbeComment((v, "%s",
  					    def->fields[fieldno].name));
@@ -1283,9 +1279,12 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		if (IsPrimaryKeyIndex(pIdx)) {
  			/* If PK is marked as INTEGER, use it as strict type,
  			 * not as affinity. Emit code for type checking */
-			if (nIdxCol == 1) {
-				reg_pk = regNewData + 1 + pIdx->aiColumn[0];
-				if (pTab->zColAff[pIdx->aiColumn[0]] ==
+			if (part_count == 1) {
+				uint32_t fieldno =
+					pIdx->def->key_def->parts[0].fieldno;
+				reg_pk = regNewData + 1 + fieldno;
+
+				if (pTab->zColAff[fieldno] ==
  				    AFFINITY_INTEGER) {
  					int skip_if_null = sqlite3VdbeMakeLabel(v);
  					if ((pTab->tabFlags & TF_Autoincrement) != 0) {
@@ -1303,7 +1302,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  
  			sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
  					  def->field_count, aRegIdx[ix]);
-			VdbeComment((v, "for %s", pIdx->zName));
+			VdbeComment((v, "for %s", pIdx->def->name));
  		}
  
  		/* In an UPDATE operation, if this index is the PRIMARY KEY
@@ -1390,24 +1389,22 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		if (uniqueByteCodeNeeded) {
  			sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
  					     addrUniqueOk, regIdx,
-					     index_column_count(pIdx));
+					     pIdx->def->key_def->part_count);
  		}
  		VdbeCoverage(v);
  
+		uint32_t pk_part_count = pPk->def->key_def->part_count;
  		/* Generate code to handle collisions */
-		regR =
-		    (pIdx == pPk) ? regIdx : sqlite3GetTempRange(pParse,
-								 nPkField);
+		regR = pIdx == pPk ? regIdx :
+		       sqlite3GetTempRange(pParse, pk_part_count);
  		if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
  			int x;
-			int nPkCol = index_column_count(pPk);
  			/* Extract the PRIMARY KEY from the end of the index entry and
  			 * store it in registers regR..regR+nPk-1
  			 */
  			if (pIdx != pPk) {
-				for (i = 0; i < nPkCol; i++) {
-					assert(pPk->aiColumn[i] >= 0);
-					x = pPk->aiColumn[i];
+				for (uint32_t i = 0; i < pk_part_count; i++) {
+					x = pPk->def->key_def->parts[i].fieldno;
  					sqlite3VdbeAddOp3(v, OP_Column,
  							  iThisCur, x, regR + i);
  					VdbeComment((v, "%s.%s", def->name,
@@ -1422,22 +1419,25 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  				 * of the matched index row are different from the original PRIMARY
  				 * KEY values of this row before the update.
  				 */
-				int addrJump =
-					sqlite3VdbeCurrentAddr(v) + nPkCol;
+				int addrJump = sqlite3VdbeCurrentAddr(v) +
+					       pk_part_count;
  				int op = OP_Ne;
-				int regCmp = (IsPrimaryKeyIndex(pIdx) ?
-					      regIdx : regR);
-
-				for (i = 0; i < nPkCol; i++) {
-					uint32_t id;
-					char *p4 = (char *)sql_index_collation(pPk, i, &id);
-					x = pPk->aiColumn[i];
-					assert(x >= 0);
-					if (i == (nPkCol - 1)) {
+				int regCmp = IsPrimaryKeyIndex(pIdx) ?
+					     regIdx : regR;
+				struct key_part *part =
+					pPk->def->key_def->parts;
+				for (uint32_t i = 0; i < pk_part_count;
+				     ++i, ++part) {
+					char *p4 = (char *) part->coll;
+					x = part->fieldno;
+					if (pPk->tnum==0)
+						x = -1;
+					if (i == (pk_part_count - 1)) {
  						addrJump = addrUniqueOk;
  						op = OP_Eq;
  					}
-					sqlite3VdbeAddOp4(v, op, regOldData + 1 + x,
+					sqlite3VdbeAddOp4(v, op,
+							  regOldData + 1 + x,
  							  addrJump, regCmp + i,
  							  p4, P4_COLLSEQ);
  					sqlite3VdbeChangeP5(v, SQLITE_NOTNULL);
@@ -1480,7 +1480,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  							      NULL, NULL);
  			}
  			sql_generate_row_delete(pParse, pTab, trigger,
-						iDataCur, regR, nPkField, false,
+						iDataCur, regR, pk_part_count,
+						false,
  						ON_CONFLICT_ACTION_REPLACE,
  						pIdx == pPk ? ONEPASS_SINGLE :
  						ONEPASS_OFF, -1);
@@ -1490,7 +1491,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
  		}
  		sqlite3VdbeResolveLabel(v, addrUniqueOk);
  		if (regR != regIdx)
-			sqlite3ReleaseTempRange(pParse, regR, nPkField);
+			sqlite3ReleaseTempRange(pParse, regR, pk_part_count);
  	}
  
  	*pbMayReplace = seenReplace;
@@ -1608,8 +1609,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,	/* Parsing context */
  		    IsPrimaryKeyIndex(pIdx) ||		/* Condition 2 */
  		    sqlite3FkReferences(pTab) ||	/* Condition 3 */
  		    /* Condition 4 */
-		    (index_is_unique(pIdx) && pIdx->onError !=
-		     ON_CONFLICT_ACTION_DEFAULT &&
+		    (pIdx->def->opts.is_unique &&
+		     pIdx->onError != ON_CONFLICT_ACTION_DEFAULT &&
  		     /* Condition 4.1 */
  		     pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
  		     /* Condition 4.2 */
@@ -1627,7 +1628,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,	/* Parsing context */
  						  space_ptr_reg);
  				sql_vdbe_set_p4_key_def(pParse, pIdx);
  				sqlite3VdbeChangeP5(v, p5);
-				VdbeComment((v, "%s", pIdx->zName));
+				VdbeComment((v, "%s", pIdx->def->name));
  			}
  		}
  	}
@@ -1661,30 +1662,25 @@ int sqlite3_xferopt_count;
  static int
  xferCompatibleIndex(Index * pDest, Index * pSrc)
  {
-	uint32_t i;
  	assert(pDest && pSrc);
  	assert(pDest->pTable != pSrc->pTable);
-	uint32_t nDestCol = index_column_count(pDest);
-	uint32_t nSrcCol = index_column_count(pSrc);
-	if (nDestCol != nSrcCol) {
-		return 0;	/* Different number of columns */
-	}
+	uint32_t dest_idx_part_count = pDest->def->key_def->part_count;
+	uint32_t src_idx_part_count = pSrc->def->key_def->part_count;
+	if (dest_idx_part_count != src_idx_part_count)
+		return 0;
  	if (pDest->onError != pSrc->onError) {
  		return 0;	/* Different conflict resolution strategies */
  	}
-	for (i = 0; i < nSrcCol; i++) {
-		if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
+	struct key_part *src_part = pSrc->def->key_def->parts;
+	struct key_part *dest_part = pDest->def->key_def->parts;
+	for (uint32_t i = 0; i < src_idx_part_count;
+	     ++i, ++src_part, ++dest_part) {
+		if (src_part->fieldno != dest_part->fieldno)
  			return 0;	/* Different columns indexed */
-		}
-		if (sql_index_column_sort_order(pSrc, i) !=
-		    sql_index_column_sort_order(pDest, i)) {
+		if (src_part->sort_order != dest_part->sort_order)
  			return 0;	/* Different sort orders */
-		}
-		uint32_t id;
-		if (sql_index_collation(pSrc, i, &id) !=
-		    sql_index_collation(pDest, i, &id)) {
+		if (src_part->coll != dest_part->coll)
  			return 0;	/* Different collating sequences */
-		}
  	}
  	if (sqlite3ExprCompare(pSrc->pPartIdxWhere, pDest->pPartIdxWhere, -1)) {
  		return 0;	/* Different WHERE clauses */
@@ -1856,16 +1852,15 @@ xferOptimization(Parse * pParse,	/* Parser context */
  		}
  	}
  	for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
-		if (index_is_unique(pDestIdx)) {
+		if (pDestIdx->def->opts.is_unique)
  			destHasUniqueIdx = 1;
-		}
  		for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) {
  			if (xferCompatibleIndex(pDestIdx, pSrcIdx))
  				break;
  		}
-		if (pSrcIdx == 0) {
-			return 0;	/* pDestIdx has no corresponding index in pSrc */
-		}
+		/* pDestIdx has no corresponding index in pSrc. */
+		if (pSrcIdx == NULL)
+			return 0;
  	}
  	/* Get server checks. */
  	ExprList *pCheck_src = space_checks_expr_list(
@@ -1941,11 +1936,11 @@ xferOptimization(Parse * pParse,	/* Parser context */
  		struct space *src_space =
  			space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
  		vdbe_emit_open_cursor(pParse, iSrc, pSrcIdx->tnum, src_space);
-		VdbeComment((v, "%s", pSrcIdx->zName));
+		VdbeComment((v, "%s", pSrcIdx->def->name));
  		struct space *dest_space =
  			space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
  		vdbe_emit_open_cursor(pParse, iDest, pDestIdx->tnum, dest_space);
-		VdbeComment((v, "%s", pDestIdx->zName));
+		VdbeComment((v, "%s", pDestIdx->def->name));
  		addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
  		VdbeCoverage(v);
  		sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index 31581b17f..90a728065 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -411,9 +411,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
  			} else if (pk == NULL) {
  				k = 1;
  			} else {
-				for (k = 1; k <= def->field_count &&
-				     pk->aiColumn[k - 1] != (int) i; ++k) {
-				}
+				struct key_def *kdef = pk->def->key_def;
+				k = key_def_find(kdef, i) - kdef->parts + 1;
  			}
  			bool is_nullable = def->fields[i].is_nullable;
  			char *expr_str = def->fields[i].default_value;
@@ -456,7 +455,7 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
  					size_t avg_tuple_size_idx =
  						sql_index_tuple_size(space, idx);
  					sqlite3VdbeMultiLoad(v, 2, "sii",
-							     pIdx->zName,
+							     pIdx->def->name,
  							     avg_tuple_size_idx,
  							     index_field_tuple_est(pIdx, 0));
  					sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
@@ -485,11 +484,13 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
  						 */
  						pParse->nMem = 3;
  					}
-					mx = index_column_count(pIdx);
+					mx = pIdx->def->key_def->part_count;
  					assert(pParse->nMem <=
  					       pPragma->nPragCName);
-					for (i = 0; i < mx; i++) {
-						i16 cnum = pIdx->aiColumn[i];
+					struct key_part *part =
+						pIdx->def->key_def->parts;
+					for (i = 0; i < mx; i++, part++) {
+						i16 cnum = (int) part->fieldno;
  						assert(pIdx->pTable);
  						sqlite3VdbeMultiLoad(v, 1,
  								     "iis", i,
@@ -503,19 +504,18 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
  								     name);
  						if (pPragma->iArg) {
  							const char *c_n;
-							uint32_t id;
+							uint32_t id =
+								part->coll_id;
  							struct coll *coll =
-								sql_index_collation(pIdx, i, &id);
+								part->coll;
  							if (coll != NULL)
  								c_n = coll_by_id(id)->name;
  							else
  								c_n = "BINARY";
-							enum sort_order sort_order;
-							sort_order = sql_index_column_sort_order(pIdx,
-												 i);
  							sqlite3VdbeMultiLoad(v,
  									     4,
  									     "isi",
+									     part->
  									     sort_order,
  									     c_n,
  									     i <
@@ -545,10 +545,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
  						    { "c", "u", "pk" };
  						sqlite3VdbeMultiLoad(v, 1,
  								     "isisi", i,
-								     pIdx->
-								     zName,
-								     index_is_unique
-								     (pIdx),
+								     pIdx->def->name,
+								     pIdx->def->opts.is_unique,
  								     azOrigin
  								     [pIdx->
  								      idxType],
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 52b3fdd07..093346a63 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4399,7 +4399,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct SrcList_item *pFrom)
  		char *zIndexedBy = pFrom->u1.zIndexedBy;
  		Index *pIdx;
  		for (pIdx = pTab->pIndex;
-		     pIdx && strcmp(pIdx->zName, zIndexedBy);
+		     pIdx && strcmp(pIdx->def->name, zIndexedBy);
  		     pIdx = pIdx->pNext) ;
  		if (!pIdx) {
  			sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 8b75ae888..ae31dfae5 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2069,28 +2069,6 @@ struct UnpackedRecord {
   * Each SQL index is represented in memory by an
   * instance of the following structure.
   *
- * The columns of the table that are to be indexed are described
- * by the aiColumn[] field of this structure.  For example, suppose
- * we have the following table and index:
- *
- *     CREATE TABLE Ex1(c1 int, c2 int, c3 text);
- *     CREATE INDEX Ex2 ON Ex1(c3,c1);
- *
- * In the Table structure describing Ex1, nCol==3 because there are
- * three columns in the table.  In the Index structure describing
- * Ex2, nColumn==2 since 2 of the 3 columns of Ex1 are indexed.
- * The value of aiColumn is {2, 0}.  aiColumn[0]==2 because the
- * first column to be indexed (c3) has an index of 2 in Ex1.aCol[].
- * The second column to be indexed (c1) has an index of 0 in
- * Ex1.aCol[], hence Ex2.aiColumn[1]==0.
- *
- * The Index.onError field determines whether or not the indexed columns
- * must be unique and what to do if they are not.  When Index.onError=
- * ON_CONFLICT_ACTION_NONE, it means this is not a unique index.
- * Otherwise it is a unique index and the value of Index.onError indicate
- * the which conflict resolution algorithm to employ whenever an attempt
- * is made to insert a non-unique element.
- *
   * While parsing a CREATE TABLE or CREATE INDEX statement in order to
   * generate VDBE code (as opposed to reading from Tarantool's _space
   * space as part of parsing an existing database schema), transient instances
@@ -2100,26 +2078,30 @@ struct UnpackedRecord {
   * program is executed). See convertToWithoutRowidTable() for details.
   */
  struct Index {
-	char *zName;		/* Name of this index */
-	i16 *aiColumn;		/* Which columns are used by this index.  1st is 0 */
-	LogEst *aiRowLogEst;	/* From ANALYZE: Est. rows selected by each column */
-	Table *pTable;		/* The SQL table being indexed */
-	char *zColAff;		/* String defining the affinity of each column */
-	Index *pNext;		/* The next index associated with the same table */
-	Schema *pSchema;	/* Schema containing this index */
-	/** Sorting order for each column. */
-	enum sort_order *sort_order;
-	/** Array of collation sequences for index. */
-	struct coll **coll_array;
-	/** Array of collation identifiers. */
-	uint32_t *coll_id_array;
-	Expr *pPartIdxWhere;	/* WHERE clause for partial indices */
-	int tnum;		/* DB Page containing root of this index */
-	u16 nColumn;		/* Number of columns stored in the index */
-	u8 onError;		/* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
-				 * or _NONE
-				 */
-	unsigned idxType:2;	/* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
+	/** From ANALYZE: Est. rows selected by each column. */
+	LogEst *aiRowLogEst;
+	/** The SQL table being indexed. */
+	Table *pTable;
+	/** String defining the affinity of each column. */
+	char *zColAff;
+	/** The next index associated with the same table. */
+	Index *pNext;
+	/** Schema containing this index. */
+	Schema *pSchema;
+	/** WHERE clause for partial indices. */
+	Expr *pPartIdxWhere;
+	/** DB Page containing root of this index. */
+	int tnum;
+	/**
+	 * Conflict resolution algorithm to employ whenever an
+	 * attempt is made to insert a non-unique element in
+	 * unique index.
+	 */
+	u8 onError;
+	/** 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX. */
+	unsigned idxType:2;
+	/** Index definition. */
+	struct index_def *def;
  };
  
  /**
@@ -3554,34 +3536,6 @@ void sqlite3AddCollateType(Parse *, Token *);
   */
  struct coll *
  sql_column_collation(struct space_def *def, uint32_t column, uint32_t *coll_id);
-/**
- * Return name of given column collation from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @param[out] coll_id Collation identifier.
- * @retval Pointer to collation.
- */
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
-
-/**
- * Return key_def of provided struct Index.
- * @param idx Pointer to `struct Index` object.
- * @retval Pointer to `struct key_def`.
- */
-struct key_def*
-sql_index_key_def(struct Index *idx);
-
-/**
- * Return sort order of given column from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @retval Sort order of requested column.
- */
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column);
  
  void sqlite3EndTable(Parse *, Token *, Token *, Select *);
  
@@ -3668,9 +3622,16 @@ void sqlite3SrcListShiftJoinType(SrcList *);
  void sqlite3SrcListAssignCursors(Parse *, SrcList *);
  void sqlite3IdListDelete(sqlite3 *, IdList *);
  void sqlite3SrcListDelete(sqlite3 *, SrcList *);
-Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
-bool
-index_is_unique(Index *);
+/**
+ * Allocate SQL index object with part count fields.
+ * @param db SQLite environment.
+ * @param part_count Index part_count.
+ *
+ * @retval NULL Memory error.
+ * @retval not NULL Index object.
+ */
+struct Index *
+sql_index_alloc(struct sqlite3 *db);
  
  /**
   * Create a new index for an SQL table.  name is the name of the
@@ -3700,8 +3661,9 @@ index_is_unique(Index *);
  void
  sql_create_index(struct Parse *parse, struct Token *token,
  		 struct SrcList *tbl_name, struct ExprList *col_list,
-		 int on_error, struct Token *start, struct Expr *pi_where,
-		 enum sort_order sort_order, bool if_not_exist, u8 idx_type);
+		 enum on_conflict_action on_error, struct Token *start,
+		 struct Expr *pi_where, enum sort_order sort_order,
+		 bool if_not_exist, u8 idx_type);
  
  /**
   * This routine will drop an existing named index.  This routine
@@ -4553,10 +4515,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
  int
  sql_analysis_load(struct sqlite3 *db);
  
-uint32_t
-index_column_count(const Index *);
-bool
-index_is_unique_not_null(const Index *);
  void sqlite3RegisterLikeFunctions(sqlite3 *, int);
  int sqlite3IsLikeFunction(sqlite3 *, Expr *, int *, char *);
  void sqlite3SchemaClear(sqlite3 *);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 212adbcb3..113e3ba0e 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -239,17 +239,18 @@ sqlite3Update(Parse * pParse,		/* The parser context */
  	 */
  	for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
  		int reg;
-		int nIdxCol = index_column_count(pIdx);
+		uint32_t part_count = pIdx->def->key_def->part_count;
  		if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
  			reg = ++pParse->nMem;
-			pParse->nMem += nIdxCol;
+			pParse->nMem += part_count;
  		} else {
  			reg = 0;
-			for (i = 0; i < nIdxCol; i++) {
-				i16 iIdxCol = pIdx->aiColumn[i];
-				if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
+			for (uint32_t i = 0; i < part_count; i++) {
+				uint32_t fieldno =
+					pIdx->def->key_def->parts[i].fieldno;
+				if (aXRef[fieldno] >= 0) {
  					reg = ++pParse->nMem;
-					pParse->nMem += nIdxCol;
+					pParse->nMem += part_count;
  					break;
  				}
  			}
@@ -299,17 +300,18 @@ sqlite3Update(Parse * pParse,		/* The parser context */
  	 * In this case we have to manually load columns in order to make tuple.
  	 */
  	int iPk;	/* First of nPk memory cells holding PRIMARY KEY value */
-	i16 nPk;	/* Number of components of the PRIMARY KEY */
+	/* Number of components of the PRIMARY KEY.  */
+	uint32_t pk_part_count;
  	int addrOpen;	/* Address of the OpenEphemeral instruction */
  
  	if (is_view) {
-		nPk = nKey;
+		pk_part_count = nKey;
  	} else {
  		assert(pPk != 0);
-		nPk = index_column_count(pPk);
+		pk_part_count = pPk->def->key_def->part_count;
  	}
  	iPk = pParse->nMem + 1;
-	pParse->nMem += nPk;
+	pParse->nMem += pk_part_count;
  	regKey = ++pParse->nMem;
  	iEph = pParse->nTab++;
  	sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
@@ -318,7 +320,8 @@ sqlite3Update(Parse * pParse,		/* The parser context */
  		addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
  					     nKey);
  	} else {
-		addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
+		addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
+					     pk_part_count);
  		sql_vdbe_set_p4_key_def(pParse, pPk);
  	}
  
@@ -328,27 +331,27 @@ sqlite3Update(Parse * pParse,		/* The parser context */
  		goto update_cleanup;
  	okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
  	if (is_view) {
-		for (i = 0; i < nPk; i++) {
+		for (i = 0; i < (int) pk_part_count; i++) {
  			sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
  		}
  	} else {
-		for (i = 0; i < nPk; i++) {
-			assert(pPk->aiColumn[i] >= 0);
+		for (i = 0; i < (int) pk_part_count; i++) {
  			sqlite3ExprCodeGetColumnOfTable(v, def, iDataCur,
-							pPk->aiColumn[i],
+							pPk->def->key_def->
+								parts[i].fieldno,
  							iPk + i);
  		}
  	}
  
  	if (okOnePass) {
  		sqlite3VdbeChangeToNoop(v, addrOpen);
-		nKey = nPk;
+		nKey = pk_part_count;
  		regKey = iPk;
  	} else {
  		const char *zAff = is_view ? 0 :
  				   sqlite3IndexAffinityStr(pParse->db, pPk);
-		sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
-					  zAff, nPk);
+		sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, pk_part_count,
+				  regKey, zAff, pk_part_count);
  		sqlite3VdbeAddOp2(v, OP_IdxInsert, iEph, regKey);
  		/* Set flag to save memory allocating one by malloc. */
  		sqlite3VdbeChangeP5(v, 1);
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index cec0ed647..c96157a55 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -1150,7 +1150,7 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, struct Index *idx)
  	struct Vdbe *v = parse->pVdbe;
  	assert(v != NULL);
  	assert(idx != NULL);
-	struct key_def *def = key_def_dup(sql_index_key_def(idx));
+	struct key_def *def = key_def_dup(idx->def->key_def);
  	if (def == NULL)
  		sqlite3OomFault(parse->db);
  	else
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 2ce90747d..d0e16bafb 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1087,15 +1087,15 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
  			Index *pIdx = p->pIdx;	/* Index being probed */
  			int nByte;	/* Bytes of space to allocate */
  			int i;	/* Counter variable */
-			int nCol = index_column_count(pIdx);
+			int part_count = pIdx->def->key_def->part_count;
  
-			nByte = sizeof(Mem) * nCol +
+			nByte = sizeof(Mem) * part_count +
  				ROUND8(sizeof(UnpackedRecord));
  			pRec =
  			    (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
  			if (pRec == NULL)
  				return NULL;
-			pRec->key_def = key_def_dup(sql_index_key_def(pIdx));
+			pRec->key_def = key_def_dup(pIdx->def->key_def);
  			if (pRec->key_def == NULL) {
  				sqlite3DbFree(db, pRec);
  				sqlite3OomFault(db);
@@ -1103,7 +1103,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
  			}
  			pRec->aMem = (Mem *)((char *) pRec +
  					     ROUND8(sizeof(UnpackedRecord)));
-			for (i = 0; i < nCol; i++) {
+			for (i = 0; i < (int) part_count; i++) {
  				pRec->aMem[i].flags = MEM_Null;
  				pRec->aMem[i].db = db;
  			}
@@ -1621,15 +1621,12 @@ sql_stat4_column(struct sqlite3 *db, const char *record, uint32_t col_num,
  void
  sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
  {
-	if (pRec) {
-		int i;
-		int nCol = pRec->key_def->part_count;
-		Mem *aMem = pRec->aMem;
-		sqlite3 *db = aMem[0].db;
-		for (i = 0; i < nCol; i++) {
+	if (pRec != NULL) {
+		int part_count = pRec->key_def->part_count;
+		struct Mem *aMem = pRec->aMem;
+		for (int i = 0; i < part_count; i++)
  			sqlite3VdbeMemRelease(&aMem[i]);
-		}
-		sqlite3DbFree(db, pRec);
+		sqlite3DbFree(aMem[0].db, pRec);
  	}
  }
  
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 85143ed20..092869ee8 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -370,16 +370,20 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
  	pScan->idxaff = 0;
  	pScan->coll = NULL;
  	pScan->is_column_seen = false;
-	if (pIdx) {
+	if (pIdx != NULL) {
  		int j = iColumn;
-		iColumn = pIdx->aiColumn[j];
-		if (iColumn >= 0) {
-			char affinity =
+		/*
+		 * pIdx->def->name == "fake_autoindex" means that
+		 * pIdx is a fake integer primary key index.
+		 */
+		if (strcmp(pIdx->def->name, "fake_autoindex") != 0) {
+			iColumn = pIdx->def->key_def->parts[iColumn].fieldno;
+			pScan->idxaff =
  				pIdx->pTable->def->fields[iColumn].affinity;
-			pScan->idxaff = affinity;
-			uint32_t id;
-			pScan->coll = sql_index_collation(pIdx, j, &id);
+			pScan->coll = pIdx->def->key_def->parts[j].coll;
  			pScan->is_column_seen = true;
+		} else {
+			iColumn = -1;
  		}
  	}
  	pScan->opMask = opMask;
@@ -541,47 +545,24 @@ findIndexCol(Parse * pParse,	/* Parse context */
  	     Index * pIdx,	/* Index to match column of */
  	     int iCol)		/* Column of index to match */
  {
+	struct key_part *part_to_match = &pIdx->def->key_def->parts[iCol];
  	for (int i = 0; i < pList->nExpr; i++) {
  		Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
-		if (p->op == TK_COLUMN &&
-		    p->iColumn == pIdx->aiColumn[iCol] &&
-		    p->iTable == iBase) {
+		if (p->op == TK_COLUMN && p->iTable == iBase &&
+		    p->iColumn == (int) part_to_match->fieldno) {
  			bool is_found;
  			uint32_t id;
  			struct coll *coll = sql_expr_coll(pParse,
  							  pList->a[i].pExpr,
  							  &is_found, &id);
-			if (is_found &&
-			    coll == sql_index_collation(pIdx, iCol, &id)) {
+			if (is_found && coll == part_to_match->coll)
  				return i;
-			}
  		}
  	}
  
  	return -1;
  }
  
-/*
- * Return TRUE if the iCol-th column of index pIdx is NOT NULL
- */
-static int
-indexColumnNotNull(Index * pIdx, int iCol)
-{
-	int j;
-	assert(pIdx != 0);
-	assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
-	j = pIdx->aiColumn[iCol];
-	if (j >= 0) {
-		return !pIdx->pTable->def->fields[j].is_nullable;
-	} else if (j == (-1)) {
-		return 1;
-	} else {
-		assert(j == (-2));
-		return 0;	/* Assume an indexed expression can always yield a NULL */
-
-	}
-}
-
  /*
   * Return true if the DISTINCT expression-list passed as the third argument
   * is redundant.
@@ -633,9 +614,9 @@ isDistinctRedundant(Parse * pParse,		/* Parsing context */
  	 *      contain a "col=X" term are subject to a NOT NULL constraint.
  	 */
  	for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-		if (!index_is_unique(pIdx))
+		if (!pIdx->def->opts.is_unique)
  			continue;
-		int col_count = index_column_count(pIdx);
+		int col_count = pIdx->def->key_def->part_count;
  		for (i = 0; i < col_count; i++) {
  			if (0 ==
  			    sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
@@ -643,11 +624,12 @@ isDistinctRedundant(Parse * pParse,		/* Parsing context */
  				if (findIndexCol
  				    (pParse, pDistinct, iBase, pIdx, i) < 0)
  					break;
-				if (indexColumnNotNull(pIdx, i) == 0)
+				uint32_t j = pIdx->def->key_def->parts[i].fieldno;
+				if (pIdx->pTable->def->fields[j].is_nullable)
  					break;
  			}
  		}
-		if (i == (int)index_column_count(pIdx)) {
+		if (i == (int) pIdx->def->key_def->part_count) {
  			/* This index implies that the DISTINCT qualifier is redundant. */
  			return 1;
  		}
@@ -835,8 +817,7 @@ constructAutomaticIndex(Parse * pParse,			/* The parsing context */
  	}
  
  	/* Construct the Index object to describe this index */
-	pIdx =
-	    sqlite3AllocateIndexObject(pParse->db, nKeyCol + 1, 0, &zNotUsed);
+	pIdx = sql_index_alloc(pParse->db, nKeyCol + 1);
  	if (pIdx == 0)
  		goto end_auto_index_create;
  	pLoop->pIndex = pIdx;
@@ -1184,7 +1165,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
  char
  sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
  {
-	assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
+	assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
  	if (!pIdx->zColAff) {
  		if (sqlite3IndexAffinityStr(db, pIdx) == 0)
  			return AFFINITY_BLOB;
@@ -1246,13 +1227,12 @@ whereRangeSkipScanEst(Parse * pParse,		/* Parsing & code generating context */
  	int nUpper = index->def->opts.stat->sample_count + 1;
  	int rc = SQLITE_OK;
  	u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
-	uint32_t id;
  
  	sqlite3_value *p1 = 0;	/* Value extracted from pLower */
  	sqlite3_value *p2 = 0;	/* Value extracted from pUpper */
  	sqlite3_value *pVal = 0;	/* Value extracted from record */
  
-	struct coll *pColl = sql_index_collation(p, nEq, &id);
+	struct coll *coll = p->def->key_def->parts[nEq].coll;
  	if (pLower) {
  		rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
  					       aff, &p1);
@@ -1273,12 +1253,12 @@ whereRangeSkipScanEst(Parse * pParse,		/* Parsing & code generating context */
  			rc = sql_stat4_column(db, samples[i].sample_key, nEq,
  					      &pVal);
  			if (rc == SQLITE_OK && p1) {
-				int res = sqlite3MemCompare(p1, pVal, pColl);
+				int res = sqlite3MemCompare(p1, pVal, coll);
  				if (res >= 0)
  					nLower++;
  			}
  			if (rc == SQLITE_OK && p2) {
-				int res = sqlite3MemCompare(p2, pVal, pColl);
+				int res = sqlite3MemCompare(p2, pVal, coll);
  				if (res >= 0)
  					nUpper++;
  			}
@@ -1448,7 +1428,7 @@ whereRangeScanEst(Parse * pParse,	/* Parsing & code generating context */
  			       || (pLower->eOperator & (WO_GT | WO_GE)) != 0);
  			assert(pUpper == 0
  			       || (pUpper->eOperator & (WO_LT | WO_LE)) != 0);
-			if (sql_index_column_sort_order(p, nEq) !=
+			if (p->def->key_def->parts[nEq].sort_order !=
  			    SORT_ORDER_ASC) {
  				/* The roles of pLower and pUpper are swapped for a DESC index */
  				SWAP(pLower, pUpper);
@@ -1598,7 +1578,7 @@ whereEqualScanEst(Parse * pParse,	/* Parsing & code generating context */
  	int bOk;
  
  	assert(nEq >= 1);
-	assert(nEq <= (int)index_column_count(p));
+	assert(nEq <= (int) p->def->key_def->part_count);
  	assert(pBuilder->nRecValid < nEq);
  
  	/* If values are not available for all fields of the index to the left
@@ -1619,7 +1599,7 @@ whereEqualScanEst(Parse * pParse,	/* Parsing & code generating context */
  
  	whereKeyStats(pParse, p, pRec, 0, a);
  	WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
-			  p->zName, nEq - 1, (int)a[1]));
+			  p->def->name, nEq - 1, (int)a[1]));
  	*pnRow = a[1];
  
  	return rc;
@@ -1751,8 +1731,8 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
  			   pItem->zAlias ? pItem->zAlias : pTab->def->name);
  #endif
  	const char *zName;
-	if (p->pIndex && (zName = p->pIndex->zName) != 0) {
-		if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
+	if (p->pIndex != NULL && (zName = p->pIndex->def->name) != NULL) {
+		if (strncmp(zName, "sql_autoindex_", 17) == 0) {
  			int i = sqlite3Strlen30(zName) - 1;
  			while (zName[i] != '_')
  				i--;
@@ -2314,7 +2294,7 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
  	int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
  	int i;
  
-	nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
+	nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
  	for (i = 1; i < nCmp; i++) {
  		/* Test if comparison i of pTerm is compatible with column (i+nEq)
  		 * of the index. If not, exit the loop.
@@ -2335,13 +2315,11 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
  		 * order of the index column is the same as the sort order of the
  		 * leftmost index column.
  		 */
-		if (pLhs->op != TK_COLUMN
-		    || pLhs->iTable != iCur
-		    || pLhs->iColumn != pIdx->aiColumn[i + nEq]
-		    || sql_index_column_sort_order(pIdx, i + nEq) !=
-		       sql_index_column_sort_order(pIdx, nEq)) {
+		struct key_part *parts = pIdx->def->key_def->parts;
+		if (pLhs->op != TK_COLUMN || pLhs->iTable != iCur ||
+		    pLhs->iColumn != (int)parts[i + nEq].fieldno ||
+		    parts[i + nEq].sort_order != parts[nEq].sort_order)
  			break;
-		}
  
  		aff = sqlite3CompareAffinity(pRhs, sqlite3ExprAffinity(pLhs));
  		idxaff =
@@ -2353,7 +2331,7 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
  		pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
  		if (pColl == 0)
  			break;
-	        if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
+		if (pIdx->def->key_def->parts[i + nEq].coll != pColl)
  			break;
  	}
  	return i;
@@ -2396,13 +2374,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  	LogEst rSize;		/* Number of rows in the table */
  	LogEst rLogSize;	/* Logarithm of table size */
  	WhereTerm *pTop = 0, *pBtm = 0;	/* Top and bottom range constraints */
-	uint32_t nProbeCol = index_column_count(pProbe);
+	uint32_t probe_part_count = pProbe->def->key_def->part_count;
  
  	pNew = pBuilder->pNew;
  	if (db->mallocFailed)
  		return SQLITE_NOMEM_BKPT;
  	WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
-			   pProbe->zName, pNew->nEq));
+			   pProbe->def->name, pNew->nEq));
  
  	assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
  	if (pNew->wsFlags & WHERE_BTM_LIMIT) {
@@ -2431,7 +2409,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  		stat = &surrogate_stat;
  	if (stat->is_unordered)
  		opMask &= ~(WO_GT | WO_GE | WO_LT | WO_LE);
-	assert(pNew->nEq < nProbeCol);
+	assert(pNew->nEq < probe_part_count);
  
  	saved_nEq = pNew->nEq;
  	saved_nBtm = pNew->nBtm;
@@ -2452,10 +2430,14 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  		LogEst nOutUnadjusted;	/* nOut before IN() and WHERE adjustments */
  		int nIn = 0;
  		int nRecValid = pBuilder->nRecValid;
-		if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
-		    && indexColumnNotNull(pProbe, saved_nEq)
-		    ) {
-			continue;	/* ignore IS [NOT] NULL constraints on NOT NULL columns */
+		uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
+		if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0) &&
+		    !pProbe->pTable->def->fields[j].is_nullable) {
+			/*
+			 * Ignore IS [NOT] NULL constraints on NOT
+			 * NULL columns.
+			 */
+			continue;
  		}
  		if (pTerm->prereqRight & pNew->maskSelf)
  			continue;
@@ -2523,14 +2505,16 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  							 */
  			}
  		} else if (eOp & WO_EQ) {
-			int iCol = pProbe->aiColumn[saved_nEq];
+			int iCol = pProbe->def->key_def->parts[saved_nEq].fieldno;
  			pNew->wsFlags |= WHERE_COLUMN_EQ;
  			assert(saved_nEq == pNew->nEq);
-			if ((iCol > 0 && nInMul == 0
-				&& saved_nEq == nProbeCol - 1)
-			    ) {
-				if (iCol >= 0 &&
-				    !index_is_unique_not_null(pProbe)) {
+			if (iCol > 0 && nInMul == 0 &&
+			    saved_nEq == probe_part_count - 1) {
+				bool index_is_unique_not_null =
+					pProbe->def->key_def->is_nullable &&
+					pProbe->def->opts.is_unique;
+				if (pProbe->tnum != 0 &&
+				    !index_is_unique_not_null) {
  					pNew->wsFlags |= WHERE_UNQ_WANTED;
  				} else {
  					pNew->wsFlags |= WHERE_ONEROW;
@@ -2592,8 +2576,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  			assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
  
  			assert(pNew->nOut == saved_nOut);
-			if (pTerm->truthProb <= 0
-			    && pProbe->aiColumn[saved_nEq] >= 0) {
+			if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
  				assert((eOp & WO_IN) || nIn == 0);
  				testcase(eOp & WO_IN);
  				pNew->nOut += pTerm->truthProb;
@@ -2695,8 +2678,8 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  			pNew->nOut = nOutUnadjusted;
  		}
  
-		if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0
-		    && pNew->nEq < nProbeCol) {
+		if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0 &&
+		    pNew->nEq < probe_part_count) {
  			whereLoopAddBtreeIndex(pBuilder, pSrc, pProbe,
  					       nInMul + nIn);
  		}
@@ -2724,7 +2707,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  	 * more expensive.
  	 */
  	assert(42 == sqlite3LogEst(18));
-	if (saved_nEq == saved_nSkip && saved_nEq + 1U < nProbeCol &&
+	if (saved_nEq == saved_nSkip && saved_nEq + 1U < probe_part_count &&
  	    stat->skip_scan_enabled == true &&
  	    /* TUNING: Minimum for skip-scan */
  	    index_field_tuple_est(pProbe, saved_nEq + 1) >= 42 &&
@@ -2749,7 +2732,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,	/* The WhereLoop factory */
  	}
  
  	WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
-			   pProbe->zName, saved_nEq, rc));
+			   pProbe->def->name, saved_nEq, rc));
  	return rc;
  }
  
@@ -2792,7 +2775,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
  {
  	ExprList *pOB;
  	int ii, jj;
-	int nIdxCol = index_column_count(pIndex);
+	int part_count = pIndex->def->key_def->part_count;
  	if (index_is_unordered(pIndex))
  		return 0;
  	if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
@@ -2802,8 +2785,9 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
  		if (pExpr->op == TK_COLUMN && pExpr->iTable == iCursor) {
  			if (pExpr->iColumn < 0)
  				return 1;
-			for (jj = 0; jj < nIdxCol; jj++) {
-				if (pExpr->iColumn == pIndex->aiColumn[jj])
+			for (jj = 0; jj < part_count; jj++) {
+				if (pExpr->iColumn == (int)
+				    pIndex->def->key_def->parts[jj].fieldno)
  					return 1;
  			}
  		}
@@ -2882,7 +2866,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
  	Index *pProbe;		/* An index we are evaluating */
  	Index sPk;		/* A fake index object for the primary key */
  	LogEst aiRowEstPk[2];	/* The aiRowLogEst[] value for the sPk index */
-	i16 aiColumnPk = -1;	/* The aColumn[] value for the sPk index */
  	SrcList *pTabList;	/* The FROM clause */
  	struct SrcList_item *pSrc;	/* The FROM clause btree term to add */
  	WhereLoop *pNew;	/* Template WhereLoop object */
@@ -2903,8 +2886,10 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
  	if (pSrc->pIBIndex) {
  		/* An INDEXED BY clause specifies a particular index to use */
  		pProbe = pSrc->pIBIndex;
+		sPk.def = NULL;
  	} else if (pTab->pIndex) {
  		pProbe = pTab->pIndex;
+		sPk.def = NULL;
  	} else {
  		/* There is no INDEXED BY clause.  Create a fake Index object in local
  		 * variable sPk to represent the primary key index.  Make this
@@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
  		 */
  		Index *pFirst;	/* First of real indices on the table */
  		memset(&sPk, 0, sizeof(Index));
-		sPk.nColumn = 1;
-		sPk.aiColumn = &aiColumnPk;
  		sPk.aiRowLogEst = aiRowEstPk;
  		sPk.onError = ON_CONFLICT_ACTION_REPLACE;
  		sPk.pTable = pTab;
+
+		struct key_def *key_def = key_def_new(1);
+		if (key_def == NULL) {
+			pWInfo->pParse->nErr++;
+			pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+			return SQL_TARANTOOL_ERROR;
+		}
+
+		key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
+				 ON_CONFLICT_ACTION_ABORT,
+				 NULL, COLL_NONE, SORT_ORDER_ASC);
+
+		sPk.def = index_def_new(pTab->def->id, 0, "fake_autoindex",
+					sizeof("fake_autoindex") - 1, TREE,
+					&index_opts_default, key_def, NULL);
+		key_def_delete(key_def);
+
+		if (sPk.def == NULL) {
+			pWInfo->pParse->nErr++;
+			pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+			return SQL_TARANTOOL_ERROR;
+		}
+
  		aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
  		aiRowEstPk[1] = 0;
  		pFirst = pSrc->pTab->pIndex;
@@ -3058,6 +3064,8 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
  		if (pSrc->pIBIndex)
  			break;
  	}
+	if (sPk.def != NULL)
+		index_def_delete(sPk.def);
  	return rc;
  }
  
@@ -3392,8 +3400,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
  				   index_is_unordered(pIndex)) {
  				return 0;
  			} else {
-				nColumn = index_column_count(pIndex);
-				isOrderDistinct = index_is_unique(pIndex);
+				nColumn = pIndex->def->key_def->part_count;
+				isOrderDistinct = pIndex->def->opts.is_unique;
  			}
  
  			/* Loop through all columns of the index and deal with the ones
@@ -3454,9 +3462,10 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
  				 * (revIdx) for the j-th column of the index.
  				 */
  				if (pIndex != NULL) {
-					iColumn = pIndex->aiColumn[j];
-					revIdx = sql_index_column_sort_order(pIndex,
-									     j);
+					struct key_def *def =
+						pIndex->def->key_def;
+					iColumn = def->parts[j].fieldno;
+					revIdx = def->parts[j].sort_order;
  					if (iColumn == pIndex->pTable->iPKey)
  						iColumn = -1;
  				} else {
@@ -3506,8 +3515,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
  								      pOrderBy->a[i].pExpr,
  								      &is_found, &id);
  						struct coll *idx_coll =
-							sql_index_collation(pIndex,
-									    j, &id);
+							pIndex->def->key_def->parts[j].coll;
  						if (is_found &&
  						    coll != idx_coll)
  							continue;
@@ -4777,7 +4785,7 @@ sqlite3WhereBegin(Parse * pParse,	/* The parser context */
  					sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);	/* Hint to COMDB2 */
  				}
  				if (pIx != NULL)
-					VdbeComment((v, "%s", pIx->zName));
+					VdbeComment((v, "%s", pIx->def->name));
  				else
  					VdbeComment((v, "%s", idx_def->name));
  #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -4910,7 +4918,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
  		if (pLevel->addrSkip) {
  			sqlite3VdbeGoto(v, pLevel->addrSkip);
  			VdbeComment((v, "next skip-scan on %s",
-				     pLoop->pIndex->zName));
+				     pLoop->pIndex->def->name));
  			sqlite3VdbeJumpHere(v, pLevel->addrSkip);
  			sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
  		}
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index c35c25ac4..1976583fa 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -48,7 +48,7 @@
  static const char *
  explainIndexColumnName(Index * pIdx, int i)
  {
-	i = pIdx->aiColumn[i];
+	i = pIdx->def->key_def->parts[i].fieldno;
  	return pIdx->pTable->def->fields[i].name;
  }
  
@@ -243,7 +243,7 @@ sqlite3WhereExplainOneScan(Parse * pParse,	/* Parse context */
  			if (zFmt) {
  				sqlite3StrAccumAppend(&str, " USING ", 7);
  				if (pIdx != NULL)
-					sqlite3XPrintf(&str, zFmt, pIdx->zName);
+					sqlite3XPrintf(&str, zFmt, pIdx->def->name);
  				else if (idx_def != NULL)
  					sqlite3XPrintf(&str, zFmt, idx_def->name);
  				else
@@ -488,7 +488,7 @@ codeEqualityTerm(Parse * pParse,	/* The parsing context */
  		int *aiMap = 0;
  
  		if (pLoop->pIndex != 0 &&
-		    sql_index_column_sort_order(pLoop->pIndex, iEq)) {
+		    pLoop->pIndex->def->key_def->parts[iEq].sort_order) {
  			testcase(iEq == 0);
  			testcase(bRev);
  			bRev = !bRev;
@@ -736,7 +736,7 @@ codeAllEqualityTerms(Parse * pParse,	/* Parsing context */
  		sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
  		VdbeCoverageIf(v, bRev == 0);
  		VdbeCoverageIf(v, bRev != 0);
-		VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
+		VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
  		j = sqlite3VdbeAddOp0(v, OP_Goto);
  		pLevel->addrSkip =
  		    sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
@@ -746,7 +746,8 @@ codeAllEqualityTerms(Parse * pParse,	/* Parsing context */
  		sqlite3VdbeJumpHere(v, j);
  		for (j = 0; j < nSkip; j++) {
  			sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-					  pIdx->aiColumn[j], regBase + j);
+					  pIdx->def->key_def->parts[j].fieldno,
+					  regBase + j);
  			VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
  		}
  	}
@@ -1037,14 +1038,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  		assert(pWInfo->pOrderBy == 0
  		       || pWInfo->pOrderBy->nExpr == 1
  		       || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
-		int nIdxCol;
+		uint32_t part_count;
  		if (pIdx != NULL)
-			nIdxCol = index_column_count(pIdx);
+			part_count = pIdx->def->key_def->part_count;
  		else
-			nIdxCol = idx_def->key_def->part_count;
-		if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
-		    && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
-			j = pIdx->aiColumn[nEq];
+			part_count = idx_def->key_def->part_count;
+		if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0 &&
+		    pWInfo->nOBSat > 0 && part_count > nEq) {
+			j = pIdx->def->key_def->parts[nEq].fieldno;
  			/* Allow seek for column with `NOT NULL` == false attribute.
  			 * If a column may contain NULL-s, the comparator installed
  			 * by Tarantool is prepared to seek using a NULL value.
@@ -1055,8 +1056,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  			 * FYI: entries in an index are ordered as follows:
  			 *      NULL, ... NULL, min_value, ...
  			 */
-			if (j >= 0 &&
-			    pIdx->pTable->def->fields[j].is_nullable) {
+			if (pIdx->pTable->def->fields[j].is_nullable) {
  				assert(pLoop->nSkip == 0);
  				bSeekPastNull = 1;
  				nExtraReg = 1;
@@ -1093,16 +1093,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  				testcase(pIdx->aSortOrder[nEq] ==
  					 SORT_ORDER_DESC);
  				assert((bRev & ~1) == 0);
+				struct key_def *def = pIdx->def->key_def;
  				pLevel->iLikeRepCntr <<= 1;
  				pLevel->iLikeRepCntr |=
-					bRev ^ (sql_index_column_sort_order(pIdx, nEq) ==
+					bRev ^ (def->parts[nEq].sort_order ==
  						SORT_ORDER_DESC);
  			}
  #endif
  			if (pRangeStart == 0) {
-				j = pIdx->aiColumn[nEq];
-				if (j >= 0 &&
-				    pIdx->pTable->def->fields[j].is_nullable)
+				j = pIdx->def->key_def->parts[nEq].fieldno;
+				if (pIdx->pTable->def->fields[j].is_nullable)
  					bSeekPastNull = 1;
  			}
  		}
@@ -1113,10 +1113,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  		 * a forward order scan on a descending index, interchange the
  		 * start and end terms (pRangeStart and pRangeEnd).
  		 */
-		if ((nEq < nIdxCol &&
-		     bRev == (sql_index_column_sort_order(pIdx, nEq) ==
-			      SORT_ORDER_ASC)) ||
-		    (bRev && nIdxCol == nEq)) {
+		if ((nEq < part_count &&
+		     bRev == (pIdx->def->key_def->parts[nEq].sort_order ==
+			      SORT_ORDER_ASC)) || (bRev && part_count == nEq)) {
  			SWAP(pRangeEnd, pRangeStart);
  			SWAP(bSeekPastNull, bStopAtNull);
  			SWAP(nBtm, nTop);
@@ -1196,16 +1195,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  			}
  		} else {
  			pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-			affinity =
-				pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+			uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
+			affinity = pIdx->pTable->def->fields[fieldno].affinity;
  		}
  
-		int nPkCol;
+		uint32_t pk_part_count;
  		if (pk != NULL)
-			nPkCol = index_column_count(pk);
+			pk_part_count = pk->def->key_def->part_count;
  		else
-			nPkCol = idx_pk->key_def->part_count;
-		if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
+			pk_part_count = idx_pk->key_def->part_count;
+		if (pk_part_count == 1 && affinity == AFFINITY_INTEGER) {
  			/* Right now INTEGER PRIMARY KEY is the only option to
  			 * get Tarantool's INTEGER column type. Need special handling
  			 * here: try to loosely convert FLOAT to INT. If RHS type
@@ -1213,8 +1212,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  			 */
  			int limit = pRangeStart == NULL ? nEq : nEq + 1;
  			for (int i = 0; i < limit; i++) {
-				if ((pIdx != NULL && pIdx->aiColumn[i] ==
-				     pk->aiColumn[0]) ||
+				if ((pIdx != NULL &&
+				     pIdx->def->key_def->parts[i].fieldno ==
+				     pk->def->key_def->parts[0].fieldno) ||
  				    (idx_pk != NULL &&
  				     idx_def->key_def->parts[i].fieldno ==
  				     idx_pk->key_def->parts[0].fieldno)) {
@@ -1326,17 +1326,17 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  			/* pIdx is a covering index.  No need to access the main table. */
  		}  else if (iCur != iIdxCur) {
  			Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-			int nPkCol = index_column_count(pPk);
-			int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
-			for (j = 0; j < nPkCol; j++) {
-				k = pPk->aiColumn[j];
+			int pk_part_count = pPk->def->key_def->part_count;
+			int iKeyReg = sqlite3GetTempRange(pParse, pk_part_count);
+			for (j = 0; j < pk_part_count; j++) {
+				k = pPk->def->key_def->parts[j].fieldno;
  				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
  						  iKeyReg + j);
  			}
  			sqlite3VdbeAddOp4Int(v, OP_NotFound, iCur, addrCont,
-					     iKeyReg, nPkCol);
+					     iKeyReg, pk_part_count);
  			VdbeCoverage(v);
-			sqlite3ReleaseTempRange(pParse, iKeyReg, nPkCol);
+			sqlite3ReleaseTempRange(pParse, iKeyReg, pk_part_count);
  		}
  
  		/* Record the instruction used to terminate the loop. */
@@ -1434,10 +1434,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  		 */
  		if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
  			Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-			int nPkCol = index_column_count(pPk);
+			int pk_part_count = pPk->def->key_def->part_count;
  			regRowset = pParse->nTab++;
  			sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
-					  regRowset, nPkCol);
+					  regRowset, pk_part_count);
  			sql_vdbe_set_p4_key_def(pParse, pPk);
  			regPk = ++pParse->nMem;
  		}
@@ -1538,16 +1538,23 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  						int iSet =
  						    ((ii == pOrWc->nTerm - 1) ? -1 : ii);
  						Index *pPk = sqlite3PrimaryKeyIndex (pTab);
-						int nPk = index_column_count(pPk);
-						int iPk;
+						struct key_def *def =
+							pPk->def->key_def;
  
  						/* Read the PK into an array of temp registers. */
-						r = sqlite3GetTempRange(pParse, nPk);
-						for (iPk = 0; iPk < nPk; iPk++) {
-							int iCol = pPk->aiColumn[iPk];
+						r = sqlite3GetTempRange(pParse,
+									def->part_count);
+						for (uint32_t iPk = 0;
+						     iPk < def->part_count;
+						     iPk++) {
+							uint32_t fieldno =
+								def->parts[iPk].
+								fieldno;
  							sqlite3ExprCodeGetColumnToReg
-								(pParse, pTab->def,
-								 iCol, iCur,
+								(pParse,
+								 pTab->def,
+								 fieldno,
+								 iCur,
  								 r + iPk);
  						}
  
@@ -1567,20 +1574,21 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,	/* Complete information about t
  							jmp1 = sqlite3VdbeAddOp4Int
  								(v, OP_Found,
  								 regRowset, 0,
-								 r, nPk);
+								 r,
+								 def->part_count);
  							VdbeCoverage(v);
  						}
  						if (iSet >= 0) {
  							sqlite3VdbeAddOp3
  								(v, OP_MakeRecord,
-								 r, nPk, regPk);
+								 r, def->part_count, regPk);
  							sqlite3VdbeAddOp2
  								(v, OP_IdxInsert,
  								 regRowset, regPk);
  						}
  
  						/* Release the array of temp registers */
-						sqlite3ReleaseTempRange(pParse, r, nPk);
+						sqlite3ReleaseTempRange(pParse, r, def->part_count);
  					}
  
  					/* Invoke the main loop body as a subroutine */
diff --git a/test/box/misc.result b/test/box/misc.result
index a00d03365..a0b35ecc2 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -487,6 +487,7 @@ t;
    160: box.error.ACTION_MISMATCH
    161: box.error.VIEW_MISSING_SQL
    162: box.error.FOREIGN_KEY_CONSTRAINT
+  163: box.error.NO_SUCH_COLLATION
  ...
  test_run:cmd("setopt delimiter ''");
  ---
diff --git a/test/sql-tap/analyze6.test.lua b/test/sql-tap/analyze6.test.lua
index 7f4ce1e3e..cb1710a50 100755
--- a/test/sql-tap/analyze6.test.lua
+++ b/test/sql-tap/analyze6.test.lua
@@ -116,7 +116,7 @@ test:do_eqp_test(
      [[SELECT * FROM t201 WHERE y=5]],
      {
          -- <analyze6-2.2>
-        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sqlite_autoindex_T201_1 (Y=?)"}
+        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sql_autoindex_T201_1 (Y=?)"}
          -- </analyze6-2.2>
  })
  
@@ -148,7 +148,7 @@ test:do_eqp_test(
      [[SELECT * FROM t201 WHERE y=5]],
      {
          -- <analyze6-2.5>
-        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sqlite_autoindex_T201_1 (Y=?)"}
+        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sql_autoindex_T201_1 (Y=?)"}
          -- </analyze6-2.5>
  })
  
@@ -183,7 +183,7 @@ test:do_eqp_test(
      [[SELECT * FROM t201 WHERE y=5]],
      {
          -- <analyze6-2.8>
-        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sqlite_autoindex_T201_1 (Y=?)"}
+        {0, 0, 0, "SEARCH TABLE T201 USING COVERING INDEX sql_autoindex_T201_1 (Y=?)"}
          -- </analyze6-2.8>
  })
  
diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation1.test.lua
similarity index 97%
rename from test/sql-tap/collation.test.lua
rename to test/sql-tap/collation1.test.lua
index 8a98de987..4d33d0c9d 100755
--- a/test/sql-tap/collation.test.lua
+++ b/test/sql-tap/collation1.test.lua
@@ -1,6 +1,6 @@
  #!/usr/bin/env tarantool
  test = require("sqltester")
-test:plan(173)
+test:plan(174)
  
  local prefix = "collation-"
  
@@ -249,4 +249,9 @@ local like_testcases =
  
  test:do_catchsql_set_test(like_testcases, prefix)
  
+test:do_catchsql_test(
+        "collation-2.5.0",
+        'CREATE TABLE test3 (a int, b int, c int, PRIMARY KEY (a, a COLLATE foo, b, c))',
+        {1, "Collation 'FOO' does not exist"})
+
  test:finish_test()
diff --git a/test/sql-tap/colname.test.lua b/test/sql-tap/colname.test.lua
index c53a1e885..ddc06eea7 100755
--- a/test/sql-tap/colname.test.lua
+++ b/test/sql-tap/colname.test.lua
@@ -643,13 +643,13 @@ test:do_catchsql_test(
      "colname-11.2",
      [[CREATE TABLE t1(a, b, c, d, e,
        PRIMARY KEY(a), UNIQUE('b' COLLATE "unicode_ci" DESC));]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
  
  test:execsql("create table table1(a primary key, b, c)")
  
  test:do_catchsql_test(
      "colname-11.3",
      [[ CREATE INDEX t1c ON table1('c'); ]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
  
  test:finish_test()
diff --git a/test/sql-tap/gh-2931-savepoints.test.lua b/test/sql-tap/gh-2931-savepoints.test.lua
index 9668d8fde..3861bb209 100755
--- a/test/sql-tap/gh-2931-savepoints.test.lua
+++ b/test/sql-tap/gh-2931-savepoints.test.lua
@@ -80,7 +80,7 @@ local testcases = {
  		{0,{1,2,10,11,1,2,4,10,11}}},
  	{"14",
  		[[insert into t1 values(4);]],
-		{1,"Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"}},
+		{1,"Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"}},
  	{"15",
  		[[select * from t1 union all select * from t2;]],
  		{0,{1,2,10,11,1,2,4,10,11}}},
diff --git a/test/sql-tap/gh2140-trans.test.lua b/test/sql-tap/gh2140-trans.test.lua
index 5539ed261..fe7af5f8e 100755
--- a/test/sql-tap/gh2140-trans.test.lua
+++ b/test/sql-tap/gh2140-trans.test.lua
@@ -32,7 +32,7 @@ for _, verb in ipairs({'ROLLBACK', 'ABORT'}) do
  	if verb == 'ROLLBACK' then
  		answer = 'UNIQUE constraint failed: T1.S1'
  	else
-		answer = "Duplicate key exists in unique index 'sqlite_autoindex_T1_1' in space 'T1'"
+		answer = "Duplicate key exists in unique index 'sql_autoindex_T1_1' in space 'T1'"
  	end
          test:do_catchsql_test('insert1_'..verb,
                                [[BEGIN;
diff --git a/test/sql-tap/gh2259-in-stmt-trans.test.lua b/test/sql-tap/gh2259-in-stmt-trans.test.lua
index 41d52feb6..e2ae1694b 100755
--- a/test/sql-tap/gh2259-in-stmt-trans.test.lua
+++ b/test/sql-tap/gh2259-in-stmt-trans.test.lua
@@ -18,7 +18,7 @@ for _, prefix in pairs({"BEFORE", "AFTER"}) do
  
      test:do_catchsql_test(prefix..'_insert1',
                            'INSERT INTO t1 VALUES(1, 2)',
-                          {1,"Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"})
+                          {1,"Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"})
  
      test:do_execsql_test(prefix..'_insert1_check1',
                           'SELECT *  FROM t1',
@@ -34,7 +34,7 @@ for _, prefix in pairs({"BEFORE", "AFTER"}) do
  
      test:do_catchsql_test(prefix..'_update1',
                            'UPDATE t1 SET s1=1',
-                          {1,"Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"})
+                          {1,"Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"})
  
      test:do_execsql_test(prefix..'_update1_check1',
                           'SELECT *  FROM t1',
@@ -52,7 +52,7 @@ for _, prefix in pairs({"BEFORE", "AFTER"}) do
  
      test:do_catchsql_test(prefix..'delete1',
                            'DELETE FROM t1;',
-                          {1, "Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"})
+                          {1, "Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"})
  
      -- Nothing should be inserted due to abort
      test:do_execsql_test('delete1_check1',
@@ -69,7 +69,7 @@ end
  -- Check multi-insert
  test:do_catchsql_test('insert2',
                        'INSERT INTO t1 VALUES (5, 6), (6, 7)',
-                      {1, "Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"})
+                      {1, "Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"})
  test:do_execsql_test('insert2_check',
                       'SELECT * FROM t1;',
                       {3, 3})
diff --git a/test/sql-tap/gh2964-abort.test.lua b/test/sql-tap/gh2964-abort.test.lua
index 193aabb25..a06b4fd15 100755
--- a/test/sql-tap/gh2964-abort.test.lua
+++ b/test/sql-tap/gh2964-abort.test.lua
@@ -13,7 +13,7 @@ test:do_catchsql_test(
      "CREATE TABLE t2 (a int primary key);")
  
  local insert_err = {1, "UNIQUE constraint failed: T2.A"}
-local insert_err_PK = {1, "Duplicate key exists in unique index 'sqlite_autoindex_T2_1' in space 'T2'"}
+local insert_err_PK = {1, "Duplicate key exists in unique index 'sql_autoindex_T2_1' in space 'T2'"}
  local data = {
  --id|TRIG TYPE|INSERT TYPE|insert error|commit error| result
   {1, "AFTER", "or abort",   insert_err_PK, {0},          {1,1,2}},
diff --git a/test/sql-tap/identifier-characters.test.lua b/test/sql-tap/identifier-characters.test.lua
index 31b45c6e7..24b5dd075 100755
--- a/test/sql-tap/identifier-characters.test.lua
+++ b/test/sql-tap/identifier-characters.test.lua
@@ -10,7 +10,7 @@ local testcases = {
  	{"table",
  	-- create
  	function (id)
-		-- sql autogenerated index name rules add "sqlite_autoindex_") prefix
+		-- sql autogenerated index name rules add "sql_autoindex_") prefix
  		if string.len(id) == box.schema.NAME_MAX then
  			id = string.sub(id, string.len(id))
  		end
diff --git a/test/sql-tap/identifier_case.test.lua b/test/sql-tap/identifier_case.test.lua
index 5e7573ac4..ed9553c6b 100755
--- a/test/sql-tap/identifier_case.test.lua
+++ b/test/sql-tap/identifier_case.test.lua
@@ -206,8 +206,8 @@ data = {
      { 3,  [["binary"]], {0}},
      { 4,  [["bInaRy"]], {0}},
      { 5,  [["unicode"]], {0}},
-    { 6,  [[ unicode ]], {1,"no such collation sequence: UNICODE"}},
-    { 7,  [["UNICODE"]], {1,"no such collation sequence: UNICODE"}}
+    { 6,  [[ unicode ]], {1,"Collation 'UNICODE' does not exist"}},
+    { 7,  [["UNICODE"]], {1,"Collation 'UNICODE' does not exist"}}
  }
  
  test:do_catchsql_test(
diff --git a/test/sql-tap/index1.test.lua b/test/sql-tap/index1.test.lua
index 4329381e2..a3405a3e7 100755
--- a/test/sql-tap/index1.test.lua
+++ b/test/sql-tap/index1.test.lua
@@ -454,10 +454,10 @@ test:do_execsql_test(
  test:do_execsql_test(
      "index-7.3",
      [[
-        SELECT "name" FROM "_index" WHERE "name"='sqlite_autoindex_TEST1_1'
+        SELECT "name" FROM "_index" WHERE "name"='sql_autoindex_TEST1_1'
      ]], {
          -- <index-7.3>
-        "sqlite_autoindex_TEST1_1"
+        "sql_autoindex_TEST1_1"
          -- </index-7.3>
      })
  
@@ -1015,7 +1015,7 @@ test:do_execsql_test(
      })
  
  -- Test that automatically create indices are named correctly. The current
--- convention is: "sqlite_autoindex_<table name>_<integer>"
+-- convention is: "sql_autoindex_<table name>_<integer>"
  --
  -- Then check that it is an error to try to drop any automtically created
  -- indices.
@@ -1027,18 +1027,18 @@ test:do_execsql_test(
          SELECT "_index"."name" FROM "_index" JOIN "_space" WHERE "_index"."id" = "_space"."id" AND "_space"."name"='T7';
      ]], {
          -- <index-17.1>
-        "sqlite_autoindex_T7_3", "sqlite_autoindex_T7_2", "sqlite_autoindex_T7_1"
+        "sql_autoindex_T7_3", "sql_autoindex_T7_2", "sql_autoindex_T7_1"
          -- </index-17.1>
      })
  
  -- do_test index-17.2 {
  --   catchsql {
---     DROP INDEX sqlite_autoindex_t7_1;
+--     DROP INDEX sql_autoindex_t7_1;
  --   }
  -- } {1 {index associated with UNIQUE or PRIMARY KEY constraint cannot be dropped}}
  -- do_test index-17.3 {
  --   catchsql {
---     DROP INDEX IF EXISTS sqlite_autoindex_t7_1;
+--     DROP INDEX IF EXISTS sql_autoindex_t7_1;
  --   }
  -- } {1 {index associated with UNIQUE or PRIMARY KEY constraint cannot be dropped}}
  test:do_catchsql_test(
@@ -1081,7 +1081,7 @@ test:do_execsql_test(
              INSERT INTO t7 VALUES(1);
          ]], {
              -- <index-19.2>
-            1, "Duplicate key exists in unique index 'sqlite_autoindex_T7_1' in space 'T7'"
+            1, "Duplicate key exists in unique index 'sql_autoindex_T7_1' in space 'T7'"
              -- </index-19.2>
          })
  
diff --git a/test/sql-tap/index7.test.lua b/test/sql-tap/index7.test.lua
index 336f42796..4bd01b8b3 100755
--- a/test/sql-tap/index7.test.lua
+++ b/test/sql-tap/index7.test.lua
@@ -1,6 +1,6 @@
  #!/usr/bin/env tarantool
  test = require("sqltester")
-test:plan(5)
+test:plan(7)
  
  --!./tcltestrunner.lua
  -- 2013-11-04
@@ -48,7 +48,7 @@ end
  -- do_test index7-1.1a {
  --   capture_pragma db out {PRAGMA index_list(t1)}
  --   db eval {SELECT "name", "partial", '|' FROM out ORDER BY "name"}
--- } {sqlite_autoindex_t1_1 0 | t1a 1 | t1b 1 |}
+-- } {sql_autoindex_t1_1 0 | t1a 1 | t1b 1 |}
  -- # Make sure the count(*) optimization works correctly with
  -- # partial indices.  Ticket [a5c8ed66cae16243be6] 2013-10-03.
  -- #
@@ -303,4 +303,21 @@ test:do_catchsql_test(
          1, "keyword \"WHERE\" is reserved"
      })
  
+test:do_catchsql_test(
+        "index7-6.6",
+        'CREATE TABLE test2 (a int, b int, c int, PRIMARY KEY (a, a, a, b, b, a, c))',
+        nil)
+
+test:do_catchsql_test(
+        "index7-6.7",
+        [[
+            CREATE TABLE test4(a,b,c,d, PRIMARY KEY(a,a,a,b,c));
+            CREATE INDEX index1 on test4(b,c,a,c);
+            SELECT "_index"."name" FROM "_index" JOIN "_space" WHERE
+            "_index"."id" = "_space"."id" AND
+            "_space"."name"='TEST4' AND
+            "_index"."name"='INDEX1';
+        ]],
+        {0, {'INDEX1'}})
+
  test:finish_test()
diff --git a/test/sql-tap/intpkey.test.lua b/test/sql-tap/intpkey.test.lua
index ecff3c0ff..b5359b695 100755
--- a/test/sql-tap/intpkey.test.lua
+++ b/test/sql-tap/intpkey.test.lua
@@ -42,7 +42,7 @@ test:do_execsql_test(
          SELECT "_index"."name" FROM "_index" JOIN "_space" WHERE "_index"."id" = "_space"."id" AND "_space"."name"='T1'
      ]], {
          -- <intpkey-1.1>
-        "sqlite_autoindex_T1_1"
+        "sql_autoindex_T1_1"
          -- </intpkey-1.1>
      })
  
@@ -96,7 +96,7 @@ test:do_catchsql_test(
          INSERT INTO t1 VALUES(5,'second','entry');
      ]], {
          -- <intpkey-1.6>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_1' in space 'T1'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T1_1' in space 'T1'"
          -- </intpkey-1.6>
      })
  
diff --git a/test/sql-tap/misc1.test.lua b/test/sql-tap/misc1.test.lua
index 5e1dfd1e6..a5078b5f5 100755
--- a/test/sql-tap/misc1.test.lua
+++ b/test/sql-tap/misc1.test.lua
@@ -380,7 +380,7 @@ test:do_catchsql_test(
          INSERT INTO t5 VALUES(1,2,4);
      ]], {
          -- <misc1-7.4>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T5_1' in space 'T5'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T5_1' in space 'T5'"
          -- </misc1-7.4>
      })
  
diff --git a/test/sql-tap/unique.test.lua b/test/sql-tap/unique.test.lua
index 86d119751..63b50657b 100755
--- a/test/sql-tap/unique.test.lua
+++ b/test/sql-tap/unique.test.lua
@@ -70,7 +70,7 @@ test:do_catchsql_test(
          INSERT INTO t1(a,b,c) VALUES(1,3,4)
      ]], {
          -- <unique-1.3>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_2' in space 'T1'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T1_2' in space 'T1'"
          -- </unique-1.3>
      })
  
@@ -91,7 +91,7 @@ test:do_catchsql_test(
          INSERT INTO t1(a,b,c) VALUES(3,2,4)
      ]], {
          -- <unique-1.5>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_1' in space 'T1'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T1_1' in space 'T1'"
          -- </unique-1.5>
      })
  
@@ -287,7 +287,7 @@ test:do_catchsql_test(
          SELECT a,b,c,d FROM t3 ORDER BY a,b,c,d;
      ]], {
          -- <unique-3.4>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T3_2' in space 'T3'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T3_2' in space 'T3'"
          -- </unique-3.4>
      })
  
@@ -444,7 +444,7 @@ test:do_catchsql_test(
          INSERT INTO t5 VALUES(2, 1,2,3,4,5,6);
      ]], {
          -- <unique-5.2>
-        1, "Duplicate key exists in unique index 'sqlite_autoindex_T5_2' in space 'T5'"
+        1, "Duplicate key exists in unique index 'sql_autoindex_T5_2' in space 'T5'"
          -- </unique-5.2>
      })
  
diff --git a/test/sql-tap/update.test.lua b/test/sql-tap/update.test.lua
index a4efe4015..1ed951da1 100755
--- a/test/sql-tap/update.test.lua
+++ b/test/sql-tap/update.test.lua
@@ -917,7 +917,7 @@ test:do_catchsql_test("update-10.3", [[
    SELECT * FROM t1;
  ]], {
    -- <update-10.3>
-  1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_3' in space 'T1'"
+  1, "Duplicate key exists in unique index 'sql_autoindex_T1_3' in space 'T1'"
    -- </update-10.3>
  })
  
@@ -943,7 +943,7 @@ test:do_catchsql_test("update-10.6", [[
    SELECT * FROM t1;
  ]], {
    -- <update-10.6>
-  1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_1' in space 'T1'"
+  1, "Duplicate key exists in unique index 'sql_autoindex_T1_1' in space 'T1'"
    -- </update-10.6>
  })
  
@@ -969,7 +969,7 @@ test:do_catchsql_test("update-10.9", [[
    SELECT * FROM t1;
  ]], {
    -- <update-10.9>
-  1, "Duplicate key exists in unique index 'sqlite_autoindex_T1_2' in space 'T1'"
+  1, "Duplicate key exists in unique index 'sql_autoindex_T1_2' in space 'T1'"
    -- </update-10.9>
  })
  
diff --git a/test/sql/insert-unique.result b/test/sql/insert-unique.result
index 048d6284c..359ac4346 100644
--- a/test/sql/insert-unique.result
+++ b/test/sql/insert-unique.result
@@ -24,8 +24,7 @@ box.sql.execute("INSERT INTO zoobar VALUES (111, 222, 'c3', 444)")
  -- PK must be unique
  box.sql.execute("INSERT INTO zoobar VALUES (112, 222, 'c3', 444)")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_ZOOBAR_1' in space
-    'ZOOBAR'
+- error: Duplicate key exists in unique index 'sql_autoindex_ZOOBAR_1' in space 'ZOOBAR'
  ...
  -- Unique index must be respected
  box.sql.execute("INSERT INTO zoobar VALUES (111, 223, 'c3', 444)")
diff --git a/test/sql/iproto.result b/test/sql/iproto.result
index 4cf790169..26ad17b3a 100644
--- a/test/sql/iproto.result
+++ b/test/sql/iproto.result
@@ -553,7 +553,7 @@ future1:wait_result()
  future2:wait_result()
  ---
  - null
-- 'Failed to execute SQL statement: Duplicate key exists in unique index ''sqlite_autoindex_TEST_1''
+- 'Failed to execute SQL statement: Duplicate key exists in unique index ''sql_autoindex_TEST_1''
    in space ''TEST'''
  ...
  future3:wait_result()
diff --git a/test/sql/message-func-indexes.result b/test/sql/message-func-indexes.result
index 37ed4ec6e..5928a8ea8 100644
--- a/test/sql/message-func-indexes.result
+++ b/test/sql/message-func-indexes.result
@@ -18,25 +18,25 @@ box.sql.execute("CREATE TABLE t2(object INTEGER PRIMARY KEY, price INTEGER, coun
  -- should return certain message.
  box.sql.execute("CREATE INDEX i1 ON t1(a+1)")
  ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
  ...
  box.sql.execute("CREATE INDEX i2 ON t1(a)")
  ---
  ...
  box.sql.execute("CREATE INDEX i3 ON t2(price + 100)")
  ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
  ...
  box.sql.execute("CREATE INDEX i4 ON t2(price)")
  ---
  ...
  box.sql.execute("CREATE INDEX i5 ON t2(count + 1)")
  ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
  ...
  box.sql.execute("CREATE INDEX i6 ON t2(count * price)")
  ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
  ...
  -- Cleaning up.
  box.sql.execute("DROP TABLE t1")
diff --git a/test/sql/on-conflict.result b/test/sql/on-conflict.result
index c0d0de08d..4080648aa 100644
--- a/test/sql/on-conflict.result
+++ b/test/sql/on-conflict.result
@@ -23,7 +23,7 @@ box.sql.execute("CREATE TABLE e (id INTEGER PRIMARY KEY ON CONFLICT REPLACE, v I
  -- Insert values and select them
  box.sql.execute("INSERT INTO t values (1, 1), (2, 2), (3, 1)")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_T_1' in space 'T'
+- error: Duplicate key exists in unique index 'sql_autoindex_T_1' in space 'T'
  ...
  box.sql.execute("SELECT * FROM t")
  ---
diff --git a/test/sql/persistency.result b/test/sql/persistency.result
index e3b4662de..f64e666cf 100644
--- a/test/sql/persistency.result
+++ b/test/sql/persistency.result
@@ -26,8 +26,7 @@ box.sql.execute("INSERT INTO foobar VALUES (1000, 'foobar')")
  ...
  box.sql.execute("INSERT INTO foobar VALUES (1, 'duplicate')")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_FOOBAR_1' in space
-    'FOOBAR'
+- error: Duplicate key exists in unique index 'sql_autoindex_FOOBAR_1' in space 'FOOBAR'
  ...
  -- simple select
  box.sql.execute("SELECT bar, foo, 42, 'awesome' FROM foobar")
@@ -209,8 +208,7 @@ box.sql.execute("SELECT \"name\", \"opts\" FROM \"_trigger\"")
  -- prove barfoo2 still exists
  box.sql.execute("INSERT INTO barfoo VALUES ('xfoo', 1)")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_BARFOO_1' in space
-    'BARFOO'
+- error: Duplicate key exists in unique index 'sql_autoindex_BARFOO_1' in space 'BARFOO'
  ...
  box.sql.execute("SELECT * FROM barfoo")
  ---
diff --git a/test/sql/transition.result b/test/sql/transition.result
index 70fb2db40..765b0f0bf 100644
--- a/test/sql/transition.result
+++ b/test/sql/transition.result
@@ -23,8 +23,7 @@ box.sql.execute("INSERT INTO foobar VALUES (1000, 'foobar')")
  ...
  box.sql.execute("INSERT INTO foobar VALUES (1, 'duplicate')")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_FOOBAR_1' in space
-    'FOOBAR'
+- error: Duplicate key exists in unique index 'sql_autoindex_FOOBAR_1' in space 'FOOBAR'
  ...
  -- simple select
  box.sql.execute("SELECT bar, foo, 42, 'awesome' FROM foobar")
@@ -142,8 +141,7 @@ box.sql.execute("INSERT INTO barfoo VALUES ('foobar', 1000)")
  -- prove barfoo2 was created
  box.sql.execute("INSERT INTO barfoo VALUES ('xfoo', 1)")
  ---
-- error: Duplicate key exists in unique index 'sqlite_autoindex_BARFOO_1' in space
-    'BARFOO'
+- error: Duplicate key exists in unique index 'sql_autoindex_BARFOO_1' in space 'BARFOO'
  ...
  box.sql.execute("SELECT foo, bar FROM barfoo")
  ---
-- 





More information about the Tarantool-patches mailing list