[tarantool-patches] Re: [PATCH] sql: add index_def to struct Index (fixed version #2)

Ivan Koptelov ivan.koptelov at tarantool.org
Thu Jun 7 19:45:50 MSK 2018


>1. The patch is not so big to do not fit in a letter. Please, send a patch as a file only when
>it is really huge. 
Ok.
>2. There is a problem with your GibHub account. Your profile name in commits on github must
>be a link to the profile. But your name is just a text. Please, cope with the problem. 
Fixed.
>3. Now you can remove sql_index_collation() function. It is useless two line wrapper. And
>its second out parameter is not needed in some places. 
Ok, fixed in all the places.
>4. Same about index_is_unique(). 
Fixed.
>
>5. get_new_iid() is unused. 
Removed.
>
>> +void
>> +append(struct region *r, const char *str, char **sql_arr, int idx, int total_sql_str_size)
>> +{
>> +	sql_arr[idx] = region_alloc(r, strlen(str));
>> +	strcpy(sql_arr[idx], str);
>> +	total_sql_str_size += strlen(str);
>> +}
>
>6. How does it work? total_sql_str_size is not out parameter here. In the caller
>context it is still unchanged. 
Refactoring mistake. Made total_sql_str_size output parameter.
>
>> +
>> +char *
>> +create_sql(const char *idx_name, struct space_def *space_def, ExprList *expr_list)
>> +{
>> +	struct region *r = &fiber()->gc;
>> +	/*
>> +	 * CREATE_INDEX_ + INDEX_NAME + _ON_ + TABLE_NAME + _(
>> +	 */
>> +	uint32_t sql_parts_count = 5;
>
>7. You do not need sql parts array, part count. You just do region_alloc + memcpy and
>region_join at the end. It is not required to store each part anywhere. 
Ok, fixed.
>> +	Expr *column_expr;
>> +	for (uint32_t i = 0; i < (uint32_t) expr_list->nExpr; i++){
>> +		column_expr = sqlite3ExprSkipCollate(expr_list->a[i].pExpr);
>> +		sql_parts_count += expr_list->a[i].pExpr == column_expr ? 2 : 4;
>> +	}
>> +
>> +	size_t total_sql_str_size = 0;
>> +
>> +	char **sql = region_alloc(r, sizeof(char*) * sql_parts_count);
>> +
>> +	char *CREATE_INDEX = "CREATE INDEX ";
>
>8. Do not store const string as non-const. 
Fxd.
>9. Why do you need this variable? Why can not you just
>inline it one line below?. 
Made it inline.
>> +	append(r, CREATE_INDEX, sql, 0, total_sql_str_size);
>> +	append(r, idx_name, sql, 1, total_sql_str_size);
>> +
>> +	char *ON = " ON ";
>10. Same. 
Fxd.
>
>> +	append(r, ON, sql, 2, total_sql_str_size);
>> +
>> +	char *name = space_def->name;
>> +	append(r, name, sql, 3, total_sql_str_size);
>> +	append(r, " (", sql, 4, total_sql_str_size);
>> +
>> +	Expr *collation_expr;
>> +	uint32_t sql_idx = 5;
>> +	for (int i = 0; i < expr_list->nExpr; i++){
>> +		column_expr = sqlite3ExprSkipCollate(expr_list->a[i].pExpr);
>> +		collation_expr = expr_list->a[i].pExpr == column_expr ?
>> +				 NULL : expr_list->a[i].pExpr;
>
>11. Please, do like sql_create_index(): use pExpr->op == TK_COLLATE to detect that
>it is a collated column. 
Ok. Fixed.
>12. Why did not you call sqlite3ResolveSelfReference() ? 
Because it is called in create_index before set_index_def invocation. Anyway I got your point, add call in set_index_def
>13. Why your patch did not delete Index.coll_array, Index.coll_id_array, Index.aiColumn?
>Your patch must remove them. Index_def initialization must replace code in build.c
>on lines 3158:3202. 
Okay, I removed coll_array, coll_id_array and aiColumn.
>> +	// fix last ", " with ")\0"
>> +	strcpy(sql[sql_idx - 1], ")\0");
>> +	char *res = region_join(r, total_sql_str_size);
>> +	return res;
>
>14. Do not use // comments. 
Ok, fixed.
>15. Why do you need separate create_sql() function? Can you generate an expression
>during parts initialization? 
Create_sql() function seems to be separate piece of string building logic. SQL expression
can be generated during part initialising, but IMHO it will mess the code.
>> +void
>> +set_index_def(Parse *parse, Index *index, Table *table,
>> +				   const char *name,
>> +				   uint32_t name_len, int on_error,
>> +				   ExprList *expr_list, u8 idx_type)
>
>16. Big problems with indentation. 
Fixed.
>> +{
>> +	struct space_def *space_def = table->def;
>> +	uint32_t iid = SQLITE_PAGENO_TO_INDEXID(index->tnum);
>
>17. You do not know iid here. The parser just parses and does not
>generate identifiers. I checked this code in a debugger, and in this example
>
>box.sql.execute("CREATE TABLE t1(x integer primary key, a integer unique);")
>
>It creates two struct Index with the same def->iid == 0. It works only thanks
>to that Index.def->iid is not used anywhere now. 
Ok, fixed. Now is retrieved after looping over all existing indexes metadata.
>
>> +	struct index_opts opts;
>> +	index_opts_create(&opts);
>> +	opts.stat = NULL;
>
>18. Already done by index_opts_create. 
Removed.
>> +	opts.is_unique = on_error != ON_CONFLICT_ACTION_NONE;
>> +
>> +	struct key_def *key_def = key_def_new(expr_list->nExpr);
>
>19. Please, check each function result on error. Key_def_new can
>fail. 
Added check.
>> +
>> +	if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>> +		opts.sql = create_sql(name, table->def, expr_list);
>> +	}
>
>20. Please, do not put 'if' body in {} when it consists of a single line. 
Fixed.
>
>> +
>> +	int i = 0;
>
>21. Why can not you put 'int i = 0' into 'for (...'? 
Fixed.
>
>
>> +	Expr *column_expr;
>> +	Expr *collation_expr;
>
>22. Same. These variables are unused out of 'for'. And can be declared +
>initialized inside. 
Fixed.
>
>
>> +	for (i = 0; i < expr_list->nExpr; i++) {
>> +		column_expr = sqlite3ExprSkipCollate(expr_list->a[i].pExpr);
>> +		collation_expr = expr_list->a[i].pExpr == column_expr ?
>> +				 NULL : expr_list->a[i].pExpr;
>
>23. Same as 11. 
Fixed.
>> +
>> +		uint32_t fieldno = column_expr->iColumn;
>> +
>> +		uint32_t coll_id;
>> +		struct coll *coll;
>> +		if (collation_expr != NULL)
>> +			coll = sql_get_coll_seq(parse, collation_expr->u.zToken,&coll_id);
>> +		else
>> +			coll = sql_column_collation(space_def, fieldno, &coll_id);
>> +
>> +		key_def_set_part(key_def, i, fieldno,
>> +				space_def->fields[fieldno].type,
>> +				space_def->fields[fieldno].nullable_action,
>> +				coll, coll_id, SORT_ORDER_ASC);
>> +	}
>> +
>> +	struct key_def *pk_key_def;
>> +	if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>> +		pk_key_def = table->pIndex->def->key_def;
>> +	} else {
>> +		pk_key_def = NULL;
>> +	}
>> +
>> +	index->def = index_def_new(space_def->id, iid, name, name_len,
>> +			     TREE, &opts, key_def, pk_key_def);
>
>24. Same as 16. 
Fixed. There was a problem with indents in my IDE.
>
>> @@ -3068,6 +3200,11 @@ sql_create_index(struct Parse *parse, struct Token *token,
>>  		 */
>>  		pIndex->sort_order[i] = SORT_ORDER_ASC;
>>  	}
>> +
>> +	set_index_def(parse, pIndex, pTab, zName,
>> +		      nName, on_error, col_list,
>> +		      idx_type);
>> +
>
>25. It fits in 2 lines. 
Fitted in 2 lines.
>
>
>> @@ -3133,6 +3270,7 @@ sql_create_index(struct Parse *parse, struct Token *token,
>>  				}
>>  				if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
>>  					pIdx->idxType = idx_type;
>> +
>>  				goto exit_create_index;
>>  			}
>>  		}
>
>26. Garbage diff. 
Removed.
>
>
>> @@ -3141,6 +3279,7 @@ sql_create_index(struct Parse *parse, struct Token *token,
>>  	/* Link the new Index structure to its table and to the other
>>  	 * in-memory database structures.
>>  	 */
>> +
>>  	assert(parse->nErr == 0);
>>  	if (db->init.busy) {
>>  		Index *p;
>
>27. Same. 
Removed.
>28. index_column_count() is useless now. 
Removed.
>
>
>29. index_is_unique_not_null() - same. 
Removed.
>
>
>> diff --git a/src/box/sql/select.c b/src/box/sql/select.c
>> index 5fbcbaffc..34ac846cf 100644
>> --- a/src/box/sql/select.c
>> +++ b/src/box/sql/select.c
>> @@ -4298,7 +4298,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct SrcList_item *pFrom)
>>                 char *zIndexedBy = pFrom->u1.zIndexedBy;
>>                 Index *pIdx;
>>                 for (pIdx = pTab->pIndex;
>> -                    pIdx && strcmp(pIdx->zName, zIndexedBy);
>> +                    pIdx && strcmp(pIdx->def->name, zIndexedBy);
>>                      pIdx = pIdx->pNext) ;
>>                 if (!pIdx) {
>>                         sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
>
>30. If you use def->name instead of zName, then remove zName from struct Index. 
Removed.
>
>
>> diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
>> index 950409d93..fe61dacfe 100644
>> --- a/src/box/sql/sqliteInt.h
>> +++ b/src/box/sql/sqliteInt.h
>> @@ -2124,6 +2124,7 @@ struct Index {
>>  				 * or _NONE
>>  				 */
>>  	unsigned idxType:2;	/* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
>> +    	struct index_def *def;
>>  };
>
>31. 4 spaces + 1 tab is bad indentation. Here 1 tab is enough 
Fixed.
>.
>
>> diff --git a/src/box/sql/where.c b/src/box/sql/where.c
>> index 496b41725..6d95185b3 100644
>> --- a/src/box/sql/where.c
>> +++ b/src/box/sql/where.c
>> @@ -470,7 +470,7 @@ findIndexCol(Parse * pParse,	/* Parse context */
>>  	for (int i = 0; i < pList->nExpr; i++) {
>>  		Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
>>  		if (p->op == TK_COLUMN &&
>> -		    p->iColumn == pIdx->aiColumn[iCol] &&
>> +		    p->iColumn == (int) pIdx->def->key_def->parts[iCol].fieldno &&
>>  		    p->iTable == iBase) {
>>  			bool is_found;
>>  			uint32_t id;
>
>32. Out of 80. 
Fixed.
>
>
>> @@ -2859,6 +2859,19 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,	/* WHERE clause information */
>>  		sPk.aiRowLogEst = aiRowEstPk;
>>  		sPk.onError = ON_CONFLICT_ACTION_REPLACE;
>>  		sPk.pTable = pTab;
>> +
>> +		struct key_def *key_def = key_def_new(1);
>
>33. Check on error. 
Added check.
>
>
>> +		key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
>> +				 ON_CONFLICT_ACTION_ABORT,
>> +				 NULL, COLL_NONE, SORT_ORDER_ASC);
>> +
>> +		struct index_opts index_opts = index_opts_default;
>> +
>> +		sPk.def =
>> +			index_def_new(pTab->def->id, 0, "primary",
>> +				      sizeof("primary") - 1, TREE,
>> +				      &index_opts, key_def, NULL);
>
>34. Same as 33. And it fits in 3 lines. 
Fixed.
>
>
>> diff --git a/test/box/suite.ini b/test/box/suite.ini
>> index ddc71326b..94bc39c57 100644
>> --- a/test/box/suite.ini
>> +++ b/test/box/suite.ini
>> @@ -2,7 +2,7 @@
>>  core = tarantool
>>  description = Database tests
>>  script = box.lua
>> -disabled = rtree_errinj.test.lua tuple_bench.test.lua
>> +disabled = rtree_errinj.test.lua tuple_bench.test.lua errinj.test.lua
>>  release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua
>>  lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua
>>  use_unix_sockets = True
>
>35. Why? 
It is not needed in the current master version. Removed

Patch:

From 9fe52af99c305aaf381de3b5414ab5fb03f65817 Mon Sep 17 00:00:00 2001
From: Ivan Koptelov <ivan.koptelov at tarantool.org>
Date: Thu, 7 Jun 2018 19:42:16 +0300
Subject: [PATCH] sql: add index_def to Index

Now every sqlite struct Index is created with tnt struct
index_def inside. This allows us to use tnt index_def
in work with sqlite indexes in the same manner as with
tnt index and is a step to remove sqlite Index with
tnt index.
Fields coll_array, coll_id_array, aiColumn, sort_order
and zName are removed from Index. All usages of this
fields changed to usage of corresponding index_def
fields.
index_is_unique(), sql_index_collation() and
index_column_count() are removed with calls of
index_def corresponding fields.

Closes: #3369
Github branch: https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where
---
 src/box/sql.c           |  18 +--
 src/box/sql/analyze.c   |  24 +--
 src/box/sql/build.c     | 398 ++++++++++++++++++++++++------------------------
 src/box/sql/delete.c    |  16 +-
 src/box/sql/expr.c      |  59 ++++---
 src/box/sql/fkey.c      |  41 +++--
 src/box/sql/insert.c    | 134 +++++++---------
 src/box/sql/pragma.c    |  19 ++-
 src/box/sql/select.c    |   2 +-
 src/box/sql/sqliteInt.h |  25 +--
 src/box/sql/trigger.c   |   2 -
 src/box/sql/update.c    |  10 +-
 src/box/sql/vdbemem.c   |   2 +-
 src/box/sql/where.c     | 140 ++++++++---------
 src/box/sql/wherecode.c |  43 +++---
 src/box/sql/whereexpr.c |  15 --
 16 files changed, 433 insertions(+), 515 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index 7379cb418..c5ff8dcbc 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1442,8 +1442,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 
   /* If table's PK is single column which is INTEGER, then
    * treat it as strict type, not affinity.  */
-  if (pk_idx && pk_idx->nColumn == 1) {
-     int pk = pk_idx->aiColumn[0];
+  if (pk_idx && pk_idx->def->key_def->part_count == 1) {
+     int pk = pk_idx->def->key_def->parts[0].fieldno;
      if (def->fields[pk].type == FIELD_TYPE_INTEGER)
         pk_forced_int = pk;
   }
@@ -1563,8 +1563,8 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
 
   /* If table's PK is single column which is INTEGER, then
    * treat it as strict type, not affinity.  */
-  if (primary_index->nColumn == 1) {
-     int pk = primary_index->aiColumn[0];
+  if (primary_index->def->key_def->part_count == 1) {
+     int pk = primary_index->def->key_def->parts[0].fieldno;
      if (def->fields[pk].type == FIELD_TYPE_INTEGER)
         pk_forced_int = pk;
   }
@@ -1575,13 +1575,13 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
    * primary key columns. Query planner depends on this particular
    * data layout.
    */
-  int i, n = pIndex->nColumn;
+  int i, n = pIndex->def->key_def->part_count;
 
   p = enc->encode_array(base, n);
   for (i = 0; i < n; i++) {
-     int col = pIndex->aiColumn[i];
+     int col = pIndex->def->key_def->parts[i].fieldno;
      assert(def->fields[col].is_nullable ==
-            action_is_nullable(def->fields[col].nullable_action));
+        action_is_nullable(def->fields[col].nullable_action));
      const char *t;
      if (pk_forced_int == col) {
         t = "integer";
@@ -1591,7 +1591,7 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
                    def->fields[col].is_nullable);
      }
      /* do not decode default collation */
-     uint32_t cid = pIndex->coll_id_array[i];
+     uint32_t cid = pIndex->def->key_def->parts[i].coll_id;
      p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
      p = enc->encode_str(p, "type", sizeof("type")-1);
      p = enc->encode_str(p, t, strlen(t));
@@ -1609,7 +1609,7 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
      p = enc->encode_str(p, action_str, strlen(action_str));
 
      p = enc->encode_str(p, "sort_order", 10);
-     enum sort_order sort_order = pIndex->sort_order[i];
+     enum sort_order sort_order = pIndex->def->key_def->parts[i].sort_order;
      assert(sort_order < sort_order_MAX);
      const char *sort_order_str = sort_order_strs[sort_order];
      p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index afc824a1a..a90a89d2b 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -860,9 +860,9 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
      if (IsPrimaryKeyIndex(pIdx)) {
         zIdxName = pTab->def->name;
      } else {
-        zIdxName = pIdx->zName;
+        zIdxName = pIdx->def->name;
      }
-     nColTest = index_column_count(pIdx);
+     nColTest = pIdx->def->key_def->part_count;
 
      /* Populate the register containing the index name. */
      sqlite3VdbeLoadString(v, regIdxname, zIdxName);
@@ -917,7 +917,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
      sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
              space_ptr_reg);
      sql_vdbe_set_p4_key_def(pParse, pIdx);
-     VdbeComment((v, "%s", pIdx->zName));
+     VdbeComment((v, "%s", pIdx->def->name));
 
      /* Invoke the stat_init() function. The arguments are:
       *
@@ -969,7 +969,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
          */
         sqlite3VdbeAddOp0(v, OP_Goto);
         addrNextRow = sqlite3VdbeCurrentAddr(v);
-        if (nColTest == 1 && index_is_unique(pIdx)) {
+        if (nColTest == 1 && pIdx->def->opts.is_unique) {
            /* For a single-column UNIQUE index, once we have found a non-NULL
             * row, we know that all the rest will be distinct, so skip
             * subsequent distinctness tests.
@@ -979,12 +979,12 @@ analyzeOneTable(Parse * pParse,  /* Parser context */
            VdbeCoverage(v);
         }
         for (i = 0; i < nColTest; i++) {
-           uint32_t id;
            struct coll *coll =
-              sql_index_collation(pIdx, i, &id);
+              pIdx->def->key_def->parts[i].coll;
            sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                   pIdx->aiColumn[i], regTemp);
+                   pIdx->def->key_def->parts[i].fieldno,
+                   regTemp);
            aGotoChng[i] =
                sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
                        regPrev + i, (char *)coll,
@@ -1006,7 +1006,7 @@ analyzeOneTable(Parse * pParse,  /* Parser context */
         for (i = 0; i < nColTest; i++) {
            sqlite3VdbeJumpHere(v, aGotoChng[i]);
            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                   pIdx->aiColumn[i],
+                   pIdx->def->key_def->parts[i].fieldno,
                    regPrev + i);
         }
         sqlite3VdbeResolveLabel(v, endDistinctTest);
@@ -1023,14 +1023,14 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
      assert(regKey == (regStat4 + 2));
      Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
      int j, k, regKeyStat;
-     int nPkColumn = (int)index_column_count(pPk);
+     int nPkColumn = (int) pPk->def->key_def->part_count;
      regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
      for (j = 0; j < nPkColumn; j++) {
-        k = pPk->aiColumn[j];
+        k = pPk->def->key_def->parts[j].fieldno;
         assert(k >= 0 && k < (int)pTab->def->field_count);
         sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
         VdbeComment((v, "%s",
-           pTab->def->fields[pPk->aiColumn[j]].name));
+           pTab->def->fields[k].name));
      }
      sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
              nPkColumn, regKey);
@@ -1150,7 +1150,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index * pOnlyIdx)
   iStatCur = pParse->nTab;
   pParse->nTab += 3;
   if (pOnlyIdx) {
-     openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
+     openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
   } else {
      openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
   }
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 28e4d7a4d..74fb66565 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -253,6 +253,8 @@ freeIndex(sqlite3 * db, Index * p)
 {
   sql_expr_delete(db, p->pPartIdxWhere, false);
   sql_expr_list_delete(db, p->aColExpr);
+  if (p->def != NULL)
+     index_def_delete(p->def);
   sqlite3DbFree(db, p->zColAff);
   sqlite3DbFree(db, p);
 }
@@ -271,7 +273,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index * pIndex)
 
   struct session *user_session = current_session();
 
-  pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
+  pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
+        pIndex->def->name, 0);
   if (ALWAYS(pIndex)) {
      if (pIndex->pTable->pIndex == pIndex) {
         pIndex->pTable->pIndex = pIndex->pNext;
@@ -388,7 +391,7 @@ deleteTable(sqlite3 * db, Table * pTable)
      pNext = pIndex->pNext;
      assert(pIndex->pSchema == pTable->pSchema);
      if ((db == 0 || db->pnBytesFreed == 0)) {
-        char *zName = pIndex->zName;
+        char *zName = pIndex->def->name;
         TESTONLY(Index *
             pOld =) sqlite3HashInsert(&pTable->idxHash,
                        zName, 0);
@@ -1072,11 +1075,9 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
       * collation type was added. Correct this if it is the case.
       */
      for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
-        assert(pIdx->nColumn == 1);
-        if (pIdx->aiColumn[0] == i) {
-           id = &pIdx->coll_id_array[0];
-           pIdx->coll_array[0] =
-              sql_column_collation(p->def, i, id);
+        assert(pIdx->def->key_def->part_count == 1);
+        if ((int)pIdx->def->key_def->parts[0].fieldno == i) {
+           id = &pIdx->def->key_def->parts[0].coll_id;
         }
      }
   } else {
@@ -1123,52 +1124,10 @@ sql_index_key_def(struct Index *idx)
   return index->def->key_def;
 }
 
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
-{
-  assert(idx != NULL);
-  uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-  struct space *space = space_by_id(space_id);
-
-  assert(column < idx->nColumn);
-  /*
-   * If space is still under construction, or it is
-   * an ephemeral space, then fetch collation from
-   * SQL internal structure.
-   */
-  if (space == NULL) {
-     assert(column < idx->nColumn);
-     *coll_id = idx->coll_id_array[column];
-     return idx->coll_array[column];
-  }
-
-  struct key_def *key_def = sql_index_key_def(idx);
-  assert(key_def != NULL && key_def->part_count >= column);
-  *coll_id = key_def->parts[column].coll_id;
-  return key_def->parts[column].coll;
-}
-
 enum sort_order
 sql_index_column_sort_order(Index *idx, uint32_t column)
 {
-  assert(idx != NULL);
-  uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-  struct space *space = space_by_id(space_id);
-
-  assert(column < idx->nColumn);
-  /*
-   * If space is still under construction, or it is
-   * an ephemeral space, then fetch collation from
-   * SQL internal structure.
-   */
-  if (space == NULL) {
-     assert(column < idx->nColumn);
-     return idx->sort_order[column];
-  }
-
-  struct key_def *key_def = sql_index_key_def(idx);
-  assert(key_def != NULL && key_def->part_count >= column);
-  return key_def->parts[column].sort_order;
+  return idx->def->key_def->parts[column].sort_order;
 }
 
 /**
@@ -1383,14 +1342,16 @@ createTableStmt(sqlite3 * db, Table * p)
   return zStmt;
 }
 
-/* Return true if value x is found any of the first nCol entries of aiCol[]
- */
 static int
-hasColumn(const i16 * aiCol, int nCol, int x)
+hasColumn(const struct key_part *key_parts, int nCol, const struct key_part key_part)
 {
-  while (nCol-- > 0)
-     if (x == *(aiCol++))
+  int i = 0;
+  while (i < nCol) {
+     if (key_part.fieldno == key_parts->fieldno)
         return 1;
+     key_parts++;
+     i++;
+  }
   return 0;
 }
 
@@ -1410,13 +1371,13 @@ static void
 convertToWithoutRowidTable(Parse * pParse, Table * pTab)
 {
   Index *pPk;
-  int i, j;
+  uint32_t i, j;
   sqlite3 *db = pParse->db;
 
   /* Mark every PRIMARY KEY column as NOT NULL (except for imposter tables)
    */
   if (!db->init.imposterTable) {
-     for (i = 0; i < (int)pTab->def->field_count; i++) {
+     for (i = 0; i < pTab->def->field_count; i++) {
         if (pTab->aCol[i].is_primkey) {
            pTab->def->fields[i].nullable_action
               = ON_CONFLICT_ACTION_ABORT;
@@ -1454,14 +1415,17 @@ convertToWithoutRowidTable(Parse * pParse, Table * pTab)
       * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY KEY(a,b,c,d)".  Later
       * code assumes the PRIMARY KEY contains no repeated columns.
       */
-     for (i = j = 1; i < pPk->nColumn; i++) {
-        if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
-           pPk->nColumn--;
+     for (i = j = 1; i < pPk->def->key_def->part_count; i++) {
+        if (hasColumn(pPk->def->key_def->parts, j,
+              pPk->def->key_def->parts[i])) {
+           pPk->def->key_def->part_count--;
         } else {
-           pPk->aiColumn[j++] = pPk->aiColumn[i];
+           pPk->def->key_def->parts[j++] =
+                 pPk->def->key_def->parts[i];
         }
      }
-     pPk->nColumn = j;
+
+     pPk->def->key_def->part_count = j;
   }
   assert(pPk != 0);
 }
@@ -1543,7 +1507,7 @@ createIndex(Parse * pParse, Index * pIndex, int iSpaceId, int iIndexId,
   }
   sqlite3VdbeAddOp4(v,
           OP_String8, 0, iFirstCol + 2, 0,
-          sqlite3DbStrDup(pParse->db, pIndex->zName),
+          sqlite3DbStrDup(pParse->db, pIndex->def->name),
           P4_DYNAMIC);
   sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
           P4_STATIC);
@@ -1580,7 +1544,7 @@ makeIndexSchemaRecord(Parse * pParse,
 
   sqlite3VdbeAddOp4(v,
           OP_String8, 0, iFirstCol, 0,
-          sqlite3DbStrDup(pParse->db, pIndex->zName),
+          sqlite3DbStrDup(pParse->db, pIndex->def->name),
           P4_DYNAMIC);
 
   if (pParse->pNewTable) {
@@ -2654,8 +2618,9 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
   }
   /* Open the sorter cursor if we are to use one. */
   iSorter = pParse->nTab++;
-  sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
-          (char *)def, P4_KEYDEF);
+  sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
+        pIndex->def->key_def->part_count,
+        (char *)def, P4_KEYDEF);
 
   /* Open the table. Loop through all rows of the table, inserting index
    * records into the sorter.
@@ -2687,7 +2652,7 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
      sqlite3VdbeGoto(v, j2);
      addr2 = sqlite3VdbeCurrentAddr(v);
      sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
-                regRecord, pIndex->nColumn);
+           regRecord, pIndex->def->key_def->part_count);
      VdbeCoverage(v);
      sqlite3UniqueConstraint(pParse, ON_CONFLICT_ACTION_ABORT,
               pIndex);
@@ -2733,16 +2698,11 @@ sqlite3AllocateIndexObject(sqlite3 * db,   /* Database connection */
   p = sqlite3DbMallocZero(db, nByte + nExtra);
   if (p) {
      char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
-     p->coll_array = (struct coll **)pExtra;
      pExtra += ROUND8(sizeof(struct coll **) * nCol);
-     p->coll_id_array = (uint32_t *) pExtra;
      pExtra += ROUND8(sizeof(uint32_t) * nCol);
      p->aiRowLogEst = (LogEst *) pExtra;
      pExtra += sizeof(LogEst) * (nCol + 1);
-     p->aiColumn = (i16 *) pExtra;
      pExtra += sizeof(i16) * nCol;
-     p->sort_order = (enum sort_order *) pExtra;
-     p->nColumn = nCol;
      *ppExtra = ((char *)p) + nByte;
   }
   return p;
@@ -2831,18 +2791,119 @@ addIndexToTable(Index * pIndex, Table * pTab)
   }
 }
 
-bool
-index_is_unique(Index *idx)
+void
+append(struct region *r, const char *str, size_t *total_sql_size)
 {
-  assert(idx != NULL);
-  uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-  uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-  struct space *space = space_by_id(space_id);
-  assert(space != NULL);
-  struct index *tnt_index = space_index(space, index_id);
-  assert(tnt_index != NULL);
+  memcpy(region_alloc(r, strlen(str)), str, strlen(str));
+  *total_sql_size += strlen(str);
+}
+
+char *
+create_sql(const char *idx_name, struct space_def *space_def, ExprList *expr_list)
+{
+  struct region *r = &fiber()->gc;
+  size_t total_sql_size = 0;
+  append(r, "CREATE INDEX ", &total_sql_size);
+  append(r, idx_name, &total_sql_size);
+  append(r, " ON ", &total_sql_size);
+  append(r, space_def->name, &total_sql_size);
+  append(r, " (", &total_sql_size);
+
+  for (int i = 0; i < expr_list->nExpr; i++){
+     Expr *expr = expr_list->a[i].pExpr;
+     assert(expr->op == TK_COLLATE || expr->op == TK_COLUMN);
+     Expr *column_expr = sqlite3ExprSkipCollate(expr);
+     const char *name = space_def->fields[column_expr->iColumn].name;
+
+     if (expr->op == TK_COLLATE){
+        append(r, name, &total_sql_size);
+        append(r, " COLLATE ", &total_sql_size);
+        const char *coll_name = expr->u.zToken;
+        append(r, coll_name, &total_sql_size);
+        append(r, ", ", &total_sql_size);
+     } else {
+        append(r, name, &total_sql_size);
+        append(r, ", ", &total_sql_size);
+     }
+  }
+
+  memcpy(region_alloc(r, 1), "\0", 1);
+  total_sql_size += 1;
+  char *res = region_join(r, total_sql_size);
+
+  /*
+   * fix last ", " with ")\0"
+   */
+  res[strlen(res) - 2] = ')';
+  res[strlen(res) - 1] = '\0';
+  return res;
+}
+
+void
+set_index_def(Parse *parse, Index *index, Table *table, uint32_t iid,
+     const char *name, uint32_t name_len, int on_error,
+     ExprList *expr_list, u8 idx_type)
+{
+  struct space_def *space_def = table->def;
+  struct index_opts opts;
+  index_opts_create(&opts);
+  opts.is_unique = on_error != ON_CONFLICT_ACTION_NONE;
+
+  struct key_def *key_def = key_def_new(expr_list->nExpr);
+  if (key_def == NULL)
+     return;
+
+  for (int i = 0; i < expr_list->nExpr; i++) {
+     Expr *expr = expr_list->a[i].pExpr;
+     sql_resolve_self_reference(parse, table, NC_IdxExpr,
+              expr, 0);
+     if (parse->nErr > 0)
+        return;
+
+     Expr *column_expr = sqlite3ExprSkipCollate(expr);
+     if (column_expr->op != TK_COLUMN) {
+        sqlite3ErrorMsg(parse,
+              "functional indexes aren't supported "
+              "in the current version");
+        return;
+     }
+
+     uint32_t fieldno = column_expr->iColumn;
+
+     uint32_t coll_id;
+     struct coll *coll;
+     if (expr->op == TK_COLLATE)
+        coll = sql_get_coll_seq(parse, expr->u.zToken,
+                 &coll_id);
+     else
+        coll = sql_column_collation(space_def, fieldno,
+                 &coll_id);
+
+     if (sqlite3StrICmp(expr->u.zToken, "binary") != 0 &&
+         coll == NULL && expr->op == TK_COLLATE)
+        return;
+
+     /* Tarantool: DESC indexes are not supported so far.
+     * See gh-3016.
+     */
+     key_def_set_part(key_def, i, fieldno,
+           space_def->fields[fieldno].type,
+           space_def->fields[fieldno].nullable_action,
+           coll, coll_id, SORT_ORDER_ASC);
+  }
+
+  if (idx_type == SQLITE_IDXTYPE_APPDEF)
+     opts.sql = create_sql(name, table->def, expr_list);
 
-  return tnt_index->def->opts.is_unique;
+  struct key_def *pk_key_def;
+  if (idx_type == SQLITE_IDXTYPE_APPDEF)
+     pk_key_def = table->pIndex->def->key_def;
+  else
+     pk_key_def = NULL;
+
+
+  index->def = index_def_new(space_def->id, iid, name, name_len,
+           TREE, &opts, key_def, pk_key_def);
 }
 
 void
@@ -2853,12 +2914,11 @@ sql_create_index(struct Parse *parse, struct Token *token,
 {
   Table *pTab = 0;   /* Table to be indexed */
   Index *pIndex = 0; /* The index to be created */
-  char *zName = 0;   /* Name of the index */
+  char *name = 0;    /* Name of the index */
   int nName;    /* Number of characters in zName */
-  int i, j;
+  int i;
   DbFixer sFix;     /* For assigning database names to pTable */
   sqlite3 *db = parse->db;
-  struct ExprList_item *col_listItem;    /* For looping over col_list */
   int nExtra = 0;       /* Space allocated for zExtra[] */
   char *zExtra = 0;  /* Extra space after the Index object */
   struct session *user_session = current_session();
@@ -2934,24 +2994,24 @@ sql_create_index(struct Parse *parse, struct Token *token,
    * our own name.
    */
   if (token) {
-     zName = sqlite3NameFromToken(db, token);
-     if (zName == 0)
+     name = sqlite3NameFromToken(db, token);
+     if (name == 0)
         goto exit_create_index;
      assert(token->z != 0);
      if (!db->init.busy) {
-        if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
+        if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
             NULL) {
            sqlite3ErrorMsg(parse,
                  "there is already a table named %s",
-                 zName);
+                 name);
            goto exit_create_index;
         }
      }
-     if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
+     if (sqlite3HashFind(&pTab->idxHash, name) != NULL) {
         if (!if_not_exist) {
            sqlite3ErrorMsg(parse,
                  "index %s.%s already exists",
-                 pTab->def->name, zName);
+                 pTab->def->name, name);
         } else {
            assert(!db->init.busy);
         }
@@ -2963,10 +3023,10 @@ sql_create_index(struct Parse *parse, struct Token *token,
      for (pLoop = pTab->pIndex, n = 1; pLoop;
           pLoop = pLoop->pNext, n++) {
      }
-     zName =
+     name =
          sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
               n);
-     if (zName == 0) {
+     if (name == 0) {
         goto exit_create_index;
      }
   }
@@ -3006,17 +3066,24 @@ sql_create_index(struct Parse *parse, struct Token *token,
   /*
    * Allocate the index structure.
    */
-  nName = sqlite3Strlen30(zName);
+  nName = sqlite3Strlen30(name);
+
+  if (nName > BOX_NAME_MAX) {
+     sqlite3ErrorMsg(parse,
+           "%s.%s exceeds indexes' names length limit",
+           pTab->def->name, name);
+     goto exit_create_index;
+  }
+
+  if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
+     goto exit_create_index;
+
   pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
                   nName + nExtra + 1, &zExtra);
   if (db->mallocFailed) {
      goto exit_create_index;
   }
   assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
-  assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
-  pIndex->zName = zExtra;
-  zExtra += nName + 1;
-  memcpy(pIndex->zName, zName, nName + 1);
   pIndex->pTable = pTab;
   pIndex->onError = (u8) on_error;
   /*
@@ -3031,7 +3098,6 @@ sql_create_index(struct Parse *parse, struct Token *token,
      pIndex->idxType = idx_type;
   }
   pIndex->pSchema = db->pSchema;
-  pIndex->nColumn = col_list->nExpr;
   /* Tarantool have access to each column by any index */
   if (where) {
      sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
@@ -3040,60 +3106,25 @@ sql_create_index(struct Parse *parse, struct Token *token,
      where = NULL;
   }
 
-  /* Analyze the list of expressions that form the terms of the index and
-   * report any errors.  In the common case where the expression is exactly
-   * a table column, store that column in aiColumn[].  For general expressions,
-   * populate pIndex->aColExpr and store XN_EXPR (-2) in aiColumn[].
-   *
+  /*
    * TODO: Issue a warning if two or more columns of the index are identical.
    * TODO: Issue a warning if the table primary key is used as part of the
    * index key.
    */
-  for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
-       i++, col_listItem++) {
-     Expr *pCExpr;  /* The i-th index expression */
-     sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
-                 col_listItem->pExpr, NULL);
-     if (parse->nErr > 0)
-        goto exit_create_index;
-     pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
-     if (pCExpr->op != TK_COLUMN) {
-        sqlite3ErrorMsg(parse,
-              "functional indexes aren't supported "
-              "in the current version");
-        goto exit_create_index;
-     } else {
-        j = pCExpr->iColumn;
-        assert(j <= 0x7fff);
-        if (j < 0) {
-           j = pTab->iPKey;
-        }
-        pIndex->aiColumn[i] = (i16) j;
-     }
-     struct coll *coll;
-     uint32_t id;
-     if (col_listItem->pExpr->op == TK_COLLATE) {
-        const char *coll_name = col_listItem->pExpr->u.zToken;
-        coll = sql_get_coll_seq(parse, coll_name, &id);
 
-        if (coll == NULL &&
-            sqlite3StrICmp(coll_name, "binary") != 0) {
-           goto exit_create_index;
-        }
-     } else if (j >= 0) {
-        coll = sql_column_collation(pTab->def, j, &id);
-     } else {
-        id = COLL_NONE;
-        coll = NULL;
-     }
-     pIndex->coll_array[i] = coll;
-     pIndex->coll_id_array[i] = id;
-
-     /* Tarantool: DESC indexes are not supported so far.
-      * See gh-3016.
-      */
-     pIndex->sort_order[i] = SORT_ORDER_ASC;
+  uint32_t max_iid = 0;
+  for (Index *index = pTab->pIndex; index; index = index->pNext) {
+     max_iid = (max_iid > index->def->iid) ? max_iid :
+           index->def->iid + 1;
   }
+
+  set_index_def(parse, pIndex, pTab, max_iid, name, nName, on_error,
+        col_list, idx_type);
+
+  if (pIndex->def == NULL ||
+      !index_def_is_valid(pIndex->def, pTab->def->name))
+        goto exit_create_index;
+
   if (pTab == parse->pNewTable) {
      /* This routine has been called to create an automatic index as a
       * result of a PRIMARY KEY or UNIQUE clause on a column definition, or
@@ -3118,25 +3149,24 @@ sql_create_index(struct Parse *parse, struct Token *token,
       */
      Index *pIdx;
      for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-        int k;
+        uint32_t k;
         assert(IsUniqueIndex(pIdx));
         assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
         assert(IsUniqueIndex(pIndex));
 
-        if (pIdx->nColumn != pIndex->nColumn)
+        if (pIdx->def->key_def->part_count !=
+              pIndex->def->key_def->part_count)
            continue;
-        for (k = 0; k < pIdx->nColumn; k++) {
-           assert(pIdx->aiColumn[k] >= 0);
-           if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
+        for (k = 0; k < pIdx->def->key_def->part_count; k++) {
+           if (pIdx->def->key_def->parts[k].fieldno != pIndex->def->key_def->parts[k].fieldno)
               break;
            struct coll *coll1, *coll2;
-           uint32_t id;
-           coll1 = sql_index_collation(pIdx, k, &id);
-           coll2 = sql_index_collation(pIndex, k, &id);
+           coll1 = pIdx->def->key_def->parts[k].coll;
+           coll2 = pIndex->def->key_def->parts[k].coll;
            if (coll1 != coll2)
               break;
         }
-        if (k == pIdx->nColumn) {
+        if (k == pIdx->def->key_def->part_count) {
            if (pIdx->onError != pIndex->onError) {
               /* This constraint creates the same index as a previous
                * constraint specified somewhere in the CREATE TABLE statement.
@@ -3159,6 +3189,7 @@ sql_create_index(struct Parse *parse, struct Token *token,
            }
            if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
               pIdx->idxType = idx_type;
+
            goto exit_create_index;
         }
      }
@@ -3170,7 +3201,7 @@ sql_create_index(struct Parse *parse, struct Token *token,
   assert(parse->nErr == 0);
   if (db->init.busy) {
      Index *p;
-     p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
+     p = sqlite3HashInsert(&pTab->idxHash, pIndex->def->name, pIndex);
      if (p) {
         assert(p == pIndex);   /* Malloc must have failed */
         sqlite3OomFault(db);
@@ -3268,28 +3299,7 @@ sql_create_index(struct Parse *parse, struct Token *token,
   sql_expr_delete(db, where, false);
   sql_expr_list_delete(db, col_list);
   sqlite3SrcListDelete(db, tbl_name);
-  sqlite3DbFree(db, zName);
-}
-
-/**
- * Return number of columns in given index.
- * If space is ephemeral, use internal
- * SQL structure to fetch the value.
- */
-uint32_t
-index_column_count(const Index *idx)
-{
-  assert(idx != NULL);
-  uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-  struct space *space = space_by_id(space_id);
-  /* It is impossible to find an ephemeral space by id. */
-  if (space == NULL)
-     return idx->nColumn;
-
-  uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-  struct index *index = space_index(space, index_id);
-  assert(index != NULL);
-  return index->def->key_def->part_count;
+  sqlite3DbFree(db, name);
 }
 
 /** Return true if given index is unique and not nullable. */
@@ -3297,15 +3307,8 @@ bool
 index_is_unique_not_null(const Index *idx)
 {
   assert(idx != NULL);
-  uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-  struct space *space = space_by_id(space_id);
-  assert(space != NULL);
-
-  uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-  struct index *index = space_index(space, index_id);
-  assert(index != NULL);
-  return (index->def->opts.is_unique &&
-     !index->def->key_def->is_nullable);
+  assert(idx->def != NULL);
+  return (idx->def->key_def->is_nullable && idx->def->opts.is_unique);
 }
 
 void
@@ -3933,18 +3936,18 @@ sqlite3UniqueConstraint(Parse * pParse,    /* Parsing context */
     )
 {
   char *zErr;
-  int j;
+  uint32_t j;
   StrAccum errMsg;
   Table *pTab = pIdx->pTable;
 
   sqlite3StrAccumInit(&errMsg, pParse->db, 0, 0, 200);
   if (pIdx->aColExpr) {
-     sqlite3XPrintf(&errMsg, "index '%q'", pIdx->zName);
+     sqlite3XPrintf(&errMsg, "index '%q'", pIdx->def->name);
   } else {
-     for (j = 0; j < pIdx->nColumn; j++) {
+     for (j = 0; j < pIdx->def->key_def->part_count; j++) {
         char *zCol;
-        assert(pIdx->aiColumn[j] >= 0);
-        zCol = pTab->def->fields[pIdx->aiColumn[j]].name;
+        uint32_t fieldno = pIdx->def->key_def->parts[j].fieldno;
+        zCol = pTab->def->fields[fieldno].name;
         if (j)
            sqlite3StrAccumAppend(&errMsg, ", ", 2);
         sqlite3XPrintf(&errMsg, "%s.%s", pTab->def->name, zCol);
@@ -3967,11 +3970,10 @@ static bool
 collationMatch(struct coll *coll, struct Index *index)
 {
   assert(coll != NULL);
-  for (int i = 0; i < index->nColumn; i++) {
-     uint32_t id;
-     struct coll *idx_coll = sql_index_collation(index, i, &id);
-     assert(idx_coll != 0 || index->aiColumn[i] < 0);
-     if (index->aiColumn[i] >= 0 && coll == idx_coll)
+  for (uint32_t i = 0; i < index->def->key_def->part_count; i++) {
+     struct coll *idx_coll = index->def->key_def->parts[i].coll;
+     assert(idx_coll != NULL);
+     if (coll == idx_coll)
         return true;
   }
   return false;
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index ddad54b3e..504738cd5 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -209,7 +209,7 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
      } else {
         pk = sqlite3PrimaryKeyIndex(table);
         assert(pk != NULL);
-        pk_len = index_column_count(pk);
+        pk_len = pk->def->key_def->part_count;
         parse->nMem += pk_len;
         sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, eph_cursor,
                 pk_len);
@@ -252,11 +252,11 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
      /* Extract the primary key for the current row */
      if (!is_view) {
         for (int i = 0; i < pk_len; i++) {
-           assert(pk->aiColumn[i] >= 0);
            sqlite3ExprCodeGetColumnOfTable(v, table->def,
                        tab_cursor,
-                       pk->
-                       aiColumn[i],
+                       pk->def->
+                       key_def->
+                       parts[i].fieldno,
                        reg_pk + i);
         }
      } else {
@@ -326,7 +326,7 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
         sqlite3VdbeAddOp3(v, OP_OpenWrite, tab_cursor,
                 table->tnum, space_ptr_reg);
         sql_vdbe_set_p4_key_def(parse, pk);
-        VdbeComment((v, "%s", pk->zName));
+        VdbeComment((v, "%s", pk->def->name));
 
         if (one_pass == ONEPASS_MULTI)
            sqlite3VdbeJumpHere(v, iAddrOnce);
@@ -536,14 +536,14 @@ sql_generate_index_key(struct Parse *parse, struct Index *index, int cursor,
         *part_idx_label = 0;
      }
   }
-  int col_cnt = index_column_count(index);
+  int col_cnt = index->def->key_def->part_count;
   int reg_base = sqlite3GetTempRange(parse, col_cnt);
   if (prev != NULL && (reg_base != reg_prev ||
              prev->pPartIdxWhere != NULL))
      prev = NULL;
   for (int j = 0; j < col_cnt; j++) {
-     if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]
-         && prev->aiColumn[j] != XN_EXPR) {
+     if (prev->def->key_def->parts[j].fieldno ==
+         index->def->key_def->parts[j].fieldno && prev == NULL) {
         /*
          * This column was already computed by the
          * previous index.
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 8866f6fed..93cd45470 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2422,21 +2422,20 @@ sqlite3FindInIndex(Parse * pParse, /* Parsing context */
              pIdx = pIdx->pNext) {
            Bitmask colUsed; /* Columns of the index used */
            Bitmask mCol;  /* Mask for the current column */
-           if (pIdx->nColumn < nExpr)
+           if ((int)pIdx->def->key_def->part_count < nExpr)
               continue;
            /* Maximum nColumn is BMS-2, not BMS-1, so that we can compute
             * BITMASK(nExpr) without overflowing
             */
-           testcase(pIdx->nColumn == BMS - 2);
-           testcase(pIdx->nColumn == BMS - 1);
-           if (pIdx->nColumn >= BMS - 1)
+           testcase(pIdx->def->key_def->part_count == BMS - 2);
+           testcase(pIdx->def->key_def->part_count == BMS - 1);
+           if (pIdx->def->key_def->part_count >= BMS - 1)
               continue;
            if (mustBeUnique) {
-              if (pIdx->nColumn > nExpr
-                  || (pIdx->nColumn > nExpr
-                  && !index_is_unique(pIdx))) {
-                    continue;  /* This index is not unique over the IN RHS columns */
-              }
+              if ((int)pIdx->def->key_def->part_count > nExpr
+                  || !pIdx->def->opts.is_unique)
+                 /* This index is not unique over the IN RHS columns */
+                    continue;
            }
 
            colUsed = 0;   /* Columns of index used so far */
@@ -2449,12 +2448,13 @@ sqlite3FindInIndex(Parse * pParse, /* Parsing context */
               int j;
 
               for (j = 0; j < nExpr; j++) {
-                 if (pIdx->aiColumn[j] !=
-                     pRhs->iColumn) {
-                    continue;
-                 }
-                 struct coll *idx_coll;
-                 idx_coll = sql_index_collation(pIdx, j, &id);
+                 if ((int)pIdx->def->key_def->
+                    parts[j].fieldno
+                    != pRhs->iColumn)
+                       continue;
+
+                 struct coll *idx_coll =
+                    pIdx->def->key_def->parts[j].coll;
                  if (pReq != NULL &&
                      pReq != idx_coll) {
                     continue;
@@ -2483,12 +2483,12 @@ sqlite3FindInIndex(Parse * pParse, /* Parsing context */
                       0, 0, 0,
                       sqlite3MPrintf(db,
                       "USING INDEX %s FOR IN-OPERATOR",
-                      pIdx->zName),
+                      pIdx->def->name),
                       P4_DYNAMIC);
               emit_open_cursor(pParse, iTab,
                      pIdx->tnum);
               sql_vdbe_set_p4_key_def(pParse, pIdx);
-              VdbeComment((v, "%s", pIdx->zName));
+              VdbeComment((v, "%s", pIdx->def->name));
               assert(IN_INDEX_INDEX_DESC ==
                      IN_INDEX_INDEX_ASC + 1);
               eType = IN_INDEX_INDEX_ASC +
@@ -2515,7 +2515,7 @@ sqlite3FindInIndex(Parse * pParse,   /* Parsing context */
                     /* Tarantool: Check for null is performed on first key of the index.  */
                     sqlite3SetHasNullFlag(v,
                                 iTab,
-                                pIdx->aiColumn[0],
+                                pIdx->def->key_def->parts[0].fieldno,
                                 *prRhsHasNull);
                  }
               }
@@ -3146,12 +3146,13 @@ sqlite3ExprCodeIN(Parse * pParse,  /* Parsing and code generating context */
      struct Index *pk = sqlite3PrimaryKeyIndex(tab);
      assert(pk);
 
+     uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
      enum affinity_type affinity =
-        tab->def->fields[pk->aiColumn[0]].affinity;
-     if (pk->nColumn == 1
+        tab->def->fields[fieldno].affinity;
+     if (pk->def->key_def->part_count == 1
          && affinity == AFFINITY_INTEGER
-         && pk->aiColumn[0] < nVector) {
-        int reg_pk = rLhs + pk->aiColumn[0];
+         && (int) fieldno < nVector) {
+        int reg_pk = rLhs + (int)fieldno;
         sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
      }
   }
@@ -3483,17 +3484,9 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,  /* The parsing context */
                int regOut  /* Store the index column value in this register */
     )
 {
-  i16 iTabCol = pIdx->aiColumn[iIdxCol];
-  if (iTabCol == XN_EXPR) {
-     assert(pIdx->aColExpr);
-     assert(pIdx->aColExpr->nExpr > iIdxCol);
-     pParse->iSelfTab = iTabCur;
-     sqlite3ExprCodeCopy(pParse, pIdx->aColExpr->a[iIdxCol].pExpr,
-               regOut);
-  } else {
-     sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
-                 iTabCur, iTabCol, regOut);
-  }
+  i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
+  sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
+              iTabCur, iTabCol, regOut);
 }
 
 void
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 70ebef89f..878c6d0b3 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -256,8 +256,8 @@ sqlite3FkLocateIndex(Parse * pParse,   /* Parse context to store any error in */
   }
 
   for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
-     int nIdxCol = index_column_count(pIdx);
-     if (nIdxCol == nCol && index_is_unique(pIdx)
+     int nIdxCol = pIdx->def->key_def->part_count;
+     if (nIdxCol == nCol && pIdx->def->opts.is_unique
          && pIdx->pPartIdxWhere == 0) {
         /* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the right number
          * of columns. If each indexed column corresponds to a foreign key
@@ -287,7 +287,8 @@ sqlite3FkLocateIndex(Parse * pParse,   /* Parse context to store any error in */
             */
            int i, j;
            for (i = 0; i < nCol; i++) {
-              i16 iCol = pIdx->aiColumn[i];  /* Index of column in parent tbl */
+              i16 iCol = (int) pIdx->def->key_def->parts[i].fieldno; /* Index of column in parent tbl */
+
               char *zIdxCol; /* Name of indexed column */
 
               if (iCol < 0)
@@ -303,8 +304,7 @@ sqlite3FkLocateIndex(Parse * pParse,   /* Parse context to store any error in */
                           iCol,
                           &id);
               struct coll *coll =
-                 sql_index_collation(pIdx, i,
-                           &id);
+                 pIdx->def->key_def->parts[i].coll;
               if (def_coll != coll)
                  break;
 
@@ -464,13 +464,14 @@ fkLookupParent(Parse * pParse,   /* Parse context */
            for (i = 0; i < nCol; i++) {
               int iChild = aiCol[i] + 1 + regData;
               int iParent =
-                  pIdx->aiColumn[i] + 1 + regData;
-              assert(pIdx->aiColumn[i] >= 0);
+                 (int) pIdx->def->key_def->parts[i].fieldno
+                 + 1 + regData;
               assert(aiCol[i] != pTab->iPKey);
-              if (pIdx->aiColumn[i] == pTab->iPKey) {
+              if ((int)pIdx->def->key_def->
+                 parts[i].fieldno == pTab->iPKey)
                  /* The parent key is a composite key that includes the IPK column */
-                 iParent = regData;
-              }
+                    iParent = regData;
+
               sqlite3VdbeAddOp3(v, OP_Ne, iChild,
                       iJump, iParent);
               VdbeCoverage(v);
@@ -622,7 +623,7 @@ fkScanChildren(Parse * pParse, /* Parse context */
   Vdbe *v = sqlite3GetVdbe(pParse);
 
   assert(pIdx == 0 || pIdx->pTable == pTab);
-  assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
+  assert(pIdx == 0 || (int) pIdx->def->key_def->part_count == pFKey->nCol);
   assert(pIdx != 0);
 
   if (nIncr < 0) {
@@ -646,7 +647,7 @@ fkScanChildren(Parse * pParse, /* Parse context */
      i16 iCol;  /* Index of column in child table */
      const char *zCol;  /* Name of column in child table */
 
-     iCol = pIdx ? pIdx->aiColumn[i] : -1;
+     iCol = pIdx ? pIdx->def->key_def->parts[i].fieldno : -1;
      pLeft = exprTableRegister(pParse, pTab, regData, iCol);
      iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
      assert(iCol >= 0);
@@ -671,10 +672,9 @@ fkScanChildren(Parse * pParse,    /* Parse context */
      Expr *pEq, *pAll = 0;
      Index *pPk = sqlite3PrimaryKeyIndex(pTab);
      assert(pIdx != 0);
-     int col_count = index_column_count(pPk);
+     int col_count = pPk->def->key_def->part_count;
      for (i = 0; i < col_count; i++) {
-        i16 iCol = pIdx->aiColumn[i];
-        assert(iCol >= 0);
+        i16 iCol = (int) pIdx->def->key_def->parts[i].fieldno;
         pLeft = exprTableRegister(pParse, pTab, regData, iCol);
         pRight =
            exprTableColumn(db, pTab->def,
@@ -992,7 +992,6 @@ sqlite3FkCheck(Parse * pParse, /* Parse context */
         if (aiCol[i] == pTab->iPKey) {
            aiCol[i] = -1;
         }
-        assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
      }
 
      pParse->nTab++;
@@ -1126,10 +1125,9 @@ sqlite3FkOldmask(Parse * pParse,    /* Parse context */
         Index *pIdx = 0;
         sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
         if (pIdx) {
-           int nIdxCol = index_column_count(pIdx);
+           int nIdxCol = pIdx->def->key_def->part_count;
            for (i = 0; i < nIdxCol; i++) {
-              assert(pIdx->aiColumn[i] >= 0);
-              mask |= COLUMN_MASK(pIdx->aiColumn[i]);
+              mask |= COLUMN_MASK(pIdx->def->key_def->parts[i].fieldno);
            }
         }
      }
@@ -1264,11 +1262,10 @@ fkActionTrigger(Parse * pParse,    /* Parse context */
                || (pTab->iPKey >= 0
               && pTab->iPKey <
                  (int)pTab->def->field_count));
-        assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
         sqlite3TokenInit(&tToCol,
                pTab->def->fields[pIdx ? pIdx->
-                     aiColumn[i] : pTab->iPKey].
-               name);
+                  def->key_def->parts[i].fieldno
+                 : pTab->iPKey].name);
         sqlite3TokenInit(&tFromCol,
                pFKey->pFrom->def->fields[
                  iFromCol].name);
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index 59c61c703..09bda843b 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -89,7 +89,7 @@ sqlite3IndexAffinityStr(sqlite3 * db, Index * pIdx)
       * up.
       */
      int n;
-     int nColumn = index_column_count(pIdx);
+     int nColumn = pIdx->def->key_def->part_count;
      pIdx->zColAff =
          (char *)sqlite3DbMallocRaw(0, nColumn + 1);
      if (!pIdx->zColAff) {
@@ -97,22 +97,8 @@ sqlite3IndexAffinityStr(sqlite3 * db, Index * pIdx)
         return 0;
      }
      for (n = 0; n < nColumn; n++) {
-        i16 x = pIdx->aiColumn[n];
-        if (x >= 0) {
-           char affinity = pIdx->pTable->
-              def->fields[x].affinity;
-           pIdx->zColAff[n] = affinity;
-        } else {
-           char aff;
-           assert(x == XN_EXPR);
-           assert(pIdx->aColExpr != 0);
-           aff =
-               sqlite3ExprAffinity(pIdx->aColExpr->a[n].
-                    pExpr);
-           if (aff == 0)
-              aff = AFFINITY_BLOB;
-           pIdx->zColAff[n] = aff;
-        }
+        i16 x = pIdx->def->key_def->parts[n].fieldno;
+        pIdx->zColAff[n] = pIdx->pTable->def->fields[x].affinity;
      }
      pIdx->zColAff[n] = 0;
   }
@@ -645,7 +631,7 @@ sqlite3Insert(Parse * pParse,  /* Parser context */
           pIdx = pIdx->pNext, i++) {
         assert(pIdx);
         aRegIdx[i] = ++pParse->nMem;
-        pParse->nMem += index_column_count(pIdx);
+        pParse->nMem += pIdx->def->key_def->part_count;
      }
   }
 
@@ -1088,7 +1074,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,     /* The parser context */
   nCol = pTab->def->field_count;
 
   pPk = sqlite3PrimaryKeyIndex(pTab);
-  nPkField = index_column_count(pPk);
+  nPkField = pPk->def->key_def->part_count;
 
   /* Record that this module has started */
   VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
@@ -1252,38 +1238,27 @@ sqlite3GenerateConstraintChecks(Parse * pParse,       /* The parser context */
       * the insert or update.  Store that record in the aRegIdx[ix] register
       */
      regIdx = aRegIdx[ix] + 1;
-     int nIdxCol = (int)index_column_count(pIdx);
+     int nIdxCol = pIdx->def->key_def->part_count;
      for (i = 0; i < nIdxCol; i++) {
-        int iField = pIdx->aiColumn[i];
+        int iField = (int) pIdx->def->key_def->parts[i].fieldno;
         int x;
-        if (iField == XN_EXPR) {
-           pParse->ckBase = regNewData + 1;
-           sqlite3ExprCodeCopy(pParse,
-                     pIdx->aColExpr->a[i].pExpr,
-                     regIdx + i);
-           pParse->ckBase = 0;
-           VdbeComment((v, "%s column %d", pIdx->zName,
-                   i));
-        } else {
-           /* OP_SCopy copies value in separate register,
-            * which later will be used by OP_NoConflict.
-            * But OP_NoConflict is necessary only in cases
-            * when bytecode is needed for proper UNIQUE
-            * constraint handling.
-            */
-           if (uniqueByteCodeNeeded) {
-              if (iField == pTab->iPKey)
-                 x = regNewData;
-              else
-                 x = iField + regNewData + 1;
-
-              assert(iField >= 0);
-              sqlite3VdbeAddOp2(v, OP_SCopy,
-                      x, regIdx + i);
-              VdbeComment((v, "%s",
-                      pTab->def->fields[
-                    iField].name));
-           }
+        /* OP_SCopy copies value in separate register,
+         * which later will be used by OP_NoConflict.
+         * But OP_NoConflict is necessary only in cases
+         * when bytecode is needed for proper UNIQUE
+         * constraint handling.
+         */
+        if (uniqueByteCodeNeeded) {
+           if (iField == pTab->iPKey)
+              x = regNewData;
+           else
+              x = iField + regNewData + 1;
+
+           assert(iField >= 0);
+           sqlite3VdbeAddOp2(v, OP_SCopy,
+                   x, regIdx + i);
+           VdbeComment((v, "%s",
+                   pTab->def->fields[iField].name));
         }
      }
 
@@ -1293,8 +1268,12 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    /* The parser context */
         /* If PK is marked as INTEGER, use it as strict type,
          * not as affinity. Emit code for type checking */
         if (nIdxCol == 1) {
-           reg_pk = regNewData + 1 + pIdx->aiColumn[0];
-           if (pTab->zColAff[pIdx->aiColumn[0]] ==
+           reg_pk = regNewData + 1 +
+               pIdx->def->key_def->parts[0].fieldno;
+
+           int fieldno = (int)pIdx->def->key_def->
+              parts[0].fieldno;
+           if (pTab->zColAff[fieldno] ==
                AFFINITY_INTEGER) {
               int skip_if_null = sqlite3VdbeMakeLabel(v);
               if ((pTab->tabFlags & TF_Autoincrement) != 0) {
@@ -1312,7 +1291,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,     /* The parser context */
 
         sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
               pTab->def->field_count, aRegIdx[ix]);
-        VdbeComment((v, "for %s", pIdx->zName));
+        VdbeComment((v, "for %s", pIdx->def->name));
      }
 
      /* In an UPDATE operation, if this index is the PRIMARY KEY
@@ -1400,7 +1379,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,     /* The parser context */
      if (uniqueByteCodeNeeded) {
         sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
                    addrUniqueOk, regIdx,
-                   index_column_count(pIdx));
+                   pIdx->def->key_def->part_count);
      }
      VdbeCoverage(v);
 
@@ -1410,19 +1389,18 @@ sqlite3GenerateConstraintChecks(Parse * pParse,       /* The parser context */
                         nPkField);
      if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
         int x;
-        int nPkCol = index_column_count(pPk);
+        int nPkCol = pPk->def->key_def->part_count;
         /* Extract the PRIMARY KEY from the end of the index entry and
          * store it in registers regR..regR+nPk-1
          */
         if (pIdx != pPk) {
            for (i = 0; i < nPkCol; i++) {
-              assert(pPk->aiColumn[i] >= 0);
-              x = pPk->aiColumn[i];
+              x = pPk->def->key_def->parts[i].fieldno;
               sqlite3VdbeAddOp3(v, OP_Column,
                       iThisCur, x, regR + i);
               VdbeComment((v, "%s.%s", pTab->def->name,
                  pTab->def->fields[
-                    pPk->aiColumn[i]].name));
+                    x].name));
            }
         }
         if (isUpdate && uniqueByteCodeNeeded) {
@@ -1440,10 +1418,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse,       /* The parser context */
                     regIdx : regR);
 
            for (i = 0; i < nPkCol; i++) {
-              uint32_t id;
-              char *p4 = (char *)sql_index_collation(pPk, i, &id);
-              x = pPk->aiColumn[i];
-              assert(x >= 0);
+              char *p4 = (char *) pPk->def->key_def->parts[i].coll;
+              x = pPk->def->key_def->parts[i].fieldno;
+              if (pPk->tnum==0) {
+                 x = -1;
+              }
               if (i == (nPkCol - 1)) {
                  addrJump = addrUniqueOk;
                  op = OP_Eq;
@@ -1620,8 +1599,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,   /* Parsing context */
          IsPrimaryKeyIndex(pIdx) ||    /* Condition 2 */
          sqlite3FkReferences(pTab) ||   /* Condition 3 */
          /* Condition 4 */
-         (index_is_unique(pIdx) && pIdx->onError !=
-          ON_CONFLICT_ACTION_DEFAULT &&
+         (pIdx->def->opts.is_unique && pIdx->onError !=
+                   ON_CONFLICT_ACTION_DEFAULT &&
           /* Condition 4.1 */
           pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
           /* Condition 4.2 */
@@ -1639,7 +1618,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,   /* Parsing context */
                    space_ptr_reg);
            sql_vdbe_set_p4_key_def(pParse, pIdx);
            sqlite3VdbeChangeP5(v, p5);
-           VdbeComment((v, "%s", pIdx->zName));
+           VdbeComment((v, "%s", pIdx->def->name));
         }
      }
   }
@@ -1676,8 +1655,8 @@ xferCompatibleIndex(Index * pDest, Index * pSrc)
   uint32_t i;
   assert(pDest && pSrc);
   assert(pDest->pTable != pSrc->pTable);
-  uint32_t nDestCol = index_column_count(pDest);
-  uint32_t nSrcCol = index_column_count(pSrc);
+  uint32_t nDestCol = pDest->def->key_def->part_count;
+  uint32_t nSrcCol = pSrc->def->key_def->part_count;
   if (nDestCol != nSrcCol) {
      return 0;  /* Different number of columns */
   }
@@ -1685,24 +1664,17 @@ xferCompatibleIndex(Index * pDest, Index * pSrc)
      return 0;  /* Different conflict resolution strategies */
   }
   for (i = 0; i < nSrcCol; i++) {
-     if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
+
+     if (pSrc->def->key_def->parts[i].fieldno !=
+         pDest->def->key_def->parts[i].fieldno) {
         return 0;  /* Different columns indexed */
      }
-     if (pSrc->aiColumn[i] == XN_EXPR) {
-        assert(pSrc->aColExpr != 0 && pDest->aColExpr != 0);
-        if (sqlite3ExprCompare(pSrc->aColExpr->a[i].pExpr,
-                     pDest->aColExpr->a[i].pExpr,
-                     -1) != 0) {
-           return 0;  /* Different expressions in the index */
-        }
-     }
      if (sql_index_column_sort_order(pSrc, i) !=
          sql_index_column_sort_order(pDest, i)) {
         return 0;  /* Different sort orders */
      }
-     uint32_t id;
-     if (sql_index_collation(pSrc, i, &id) !=
-         sql_index_collation(pDest, i, &id)) {
+     if (pSrc->def->key_def->parts[i].coll !=
+         pDest->def->key_def->parts[i].coll) {
         return 0;  /* Different collating sequences */
      }
   }
@@ -1876,7 +1848,7 @@ xferOptimization(Parse * pParse, /* Parser context */
      }
   }
   for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
-     if (index_is_unique(pDestIdx)) {
+     if (pDestIdx->def->opts.is_unique) {
         destHasUniqueIdx = 1;
      }
      for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) {
@@ -1960,11 +1932,11 @@ xferOptimization(Parse * pParse,   /* Parser context */
      assert(pSrcIdx);
      emit_open_cursor(pParse, iSrc, pSrcIdx->tnum);
      sql_vdbe_set_p4_key_def(pParse, pSrcIdx);
-     VdbeComment((v, "%s", pSrcIdx->zName));
+     VdbeComment((v, "%s", pSrcIdx->def->name));
      emit_open_cursor(pParse, iDest, pDestIdx->tnum);
      sql_vdbe_set_p4_key_def(pParse, pDestIdx);
      sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR);
-     VdbeComment((v, "%s", pDestIdx->zName));
+     VdbeComment((v, "%s", pDestIdx->def->name));
      addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
      VdbeCoverage(v);
      sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index 9dab5a7fd..1065489f8 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -370,7 +370,7 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First part of [schema.]id field */
                  for (k = 1;
                       k <=
                       (int)pTab->def->field_count
-                      && pPk->aiColumn[k - 1] !=
+                      && (int) pPk->def->key_def->parts[k - 1].fieldno !=
                       i; k++) {
                  }
               }
@@ -430,7 +430,7 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First part of [schema.]id field */
               size_t avg_tuple_size_idx =
                  sql_index_tuple_size(space, idx);
               sqlite3VdbeMultiLoad(v, 2, "sii",
-                         pIdx->zName,
+                         pIdx->def->name,
                          avg_tuple_size_idx,
                          index_field_tuple_est(pIdx, 0));
               sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
@@ -459,11 +459,11 @@ sqlite3Pragma(Parse * pParse, Token * pId,   /* First part of [schema.]id field */
                   */
                  pParse->nMem = 3;
               }
-              mx = index_column_count(pIdx);
+              mx = pIdx->def->key_def->part_count;
               assert(pParse->nMem <=
                      pPragma->nPragCName);
               for (i = 0; i < mx; i++) {
-                 i16 cnum = pIdx->aiColumn[i];
+                 i16 cnum = (int) pIdx->def->key_def->parts[i].fieldno;
                  assert(pIdx->pTable);
                  sqlite3VdbeMultiLoad(v, 1,
                             "iis", i,
@@ -477,9 +477,10 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /* First part of [schema.]id field */
                             name);
                  if (pPragma->iArg) {
                     const char *c_n;
-                    uint32_t id;
+                    uint32_t id =
+                       pIdx->def->key_def->parts[i].coll_id;
                     struct coll *coll =
-                       sql_index_collation(pIdx, i, &id);
+                       pIdx->def->key_def->parts[i].coll;
                     if (coll != NULL)
                        c_n = coll_by_id(id)->name;
                     else
@@ -519,10 +520,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /* First part of [schema.]id field */
                      { "c", "u", "pk" };
                  sqlite3VdbeMultiLoad(v, 1,
                             "isisi", i,
-                            pIdx->
-                            zName,
-                            index_is_unique
-                            (pIdx),
+                            pIdx->def->name,
+                            pIdx->def->opts.is_unique,
                             azOrigin
                             [pIdx->
                              idxType],
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 2aa35a114..2646a99c3 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4291,7 +4291,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct SrcList_item *pFrom)
      char *zIndexedBy = pFrom->u1.zIndexedBy;
      Index *pIdx;
      for (pIdx = pTab->pIndex;
-          pIdx && strcmp(pIdx->zName, zIndexedBy);
+          pIdx && strcmp(pIdx->def->name, zIndexedBy);
           pIdx = pIdx->pNext) ;
      if (!pIdx) {
         sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 01351a183..0680c2adb 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2102,27 +2102,19 @@ struct UnpackedRecord {
  * program is executed). See convertToWithoutRowidTable() for details.
  */
 struct Index {
-  char *zName;      /* Name of this index */
-  i16 *aiColumn;    /* Which columns are used by this index.  1st is 0 */
   LogEst *aiRowLogEst;   /* From ANALYZE: Est. rows selected by each column */
   Table *pTable;    /* The SQL table being indexed */
   char *zColAff;    /* String defining the affinity of each column */
   Index *pNext;     /* The next index associated with the same table */
   Schema *pSchema;   /* Schema containing this index */
-  /** Sorting order for each column. */
-  enum sort_order *sort_order;
-  /** Array of collation sequences for index. */
-  struct coll **coll_array;
-  /** Array of collation identifiers. */
-  uint32_t *coll_id_array;
   Expr *pPartIdxWhere;   /* WHERE clause for partial indices */
   ExprList *aColExpr;    /* Column expressions */
   int tnum;     /* DB Page containing root of this index */
-  u16 nColumn;      /* Number of columns stored in the index */
   u8 onError;       /* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
             * or _NONE
             */
   unsigned idxType:2;    /* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
+  struct index_def *def;
 };
 
 /**
@@ -3526,16 +3518,7 @@ void sqlite3AddCollateType(Parse *, Token *);
  */
 struct coll *
 sql_column_collation(struct space_def *def, uint32_t column, uint32_t *coll_id);
-/**
- * Return name of given column collation from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @param[out] coll_id Collation identifier.
- * @retval Pointer to collation.
- */
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
+
 bool
 space_is_view(Table *);
 
@@ -3607,8 +3590,6 @@ void sqlite3SrcListAssignCursors(Parse *, SrcList *);
 void sqlite3IdListDelete(sqlite3 *, IdList *);
 void sqlite3SrcListDelete(sqlite3 *, SrcList *);
 Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
-bool
-index_is_unique(Index *);
 
 /**
  * Create a new index for an SQL table.  name is the name of the
@@ -4293,8 +4274,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
 int
 sql_analysis_load(struct sqlite3 *db);
 
-uint32_t
-index_column_count(const Index *);
 bool
 index_is_unique_not_null(const Index *);
 void sqlite3RegisterLikeFunctions(sqlite3 *, int);
diff --git a/src/box/sql/trigger.c b/src/box/sql/trigger.c
index e1126b2d2..ea3521133 100644
--- a/src/box/sql/trigger.c
+++ b/src/box/sql/trigger.c
@@ -872,8 +872,6 @@ codeRowTrigger(Parse * pParse, /* Current parse context */
   pSubParse->pToplevel = pTop;
   pSubParse->eTriggerOp = pTrigger->op;
   pSubParse->nQueryLoop = pParse->nQueryLoop;
-  struct region *region = &fiber()->gc;
-  pSubParse->region_initial_size = region_used(region);
 
   v = sqlite3GetVdbe(pSubParse);
   if (v) {
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 590aad28b..6545b3b06 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -237,14 +237,14 @@ sqlite3Update(Parse * pParse,       /* The parser context */
    */
   for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
      int reg;
-     int nIdxCol = index_column_count(pIdx);
+     int nIdxCol = pIdx->def->key_def->part_count;
      if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
         reg = ++pParse->nMem;
         pParse->nMem += nIdxCol;
      } else {
         reg = 0;
         for (i = 0; i < nIdxCol; i++) {
-           i16 iIdxCol = pIdx->aiColumn[i];
+           i16 iIdxCol = pIdx->def->key_def->parts[i].fieldno;
            if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
               reg = ++pParse->nMem;
               pParse->nMem += nIdxCol;
@@ -306,7 +306,7 @@ sqlite3Update(Parse * pParse,     /* The parser context */
      nPk = nKey;
   } else {
      assert(pPk != 0);
-     nPk = index_column_count(pPk);
+     nPk = pPk->def->key_def->part_count;
   }
   iPk = pParse->nMem + 1;
   pParse->nMem += nPk;
@@ -333,9 +333,9 @@ sqlite3Update(Parse * pParse,     /* The parser context */
      }
   } else {
      for (i = 0; i < nPk; i++) {
-        assert(pPk->aiColumn[i] >= 0);
         sqlite3ExprCodeGetColumnOfTable(v, pTab->def, iDataCur,
-                    pPk->aiColumn[i],
+                    pPk->def->key_def->
+                       parts[i].fieldno,
                     iPk + i);
      }
   }
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index f408b7701..7f9eeb71a 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1087,7 +1087,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
         Index *pIdx = p->pIdx; /* Index being probed */
         int nByte; /* Bytes of space to allocate */
         int i; /* Counter variable */
-        int nCol = index_column_count(pIdx);
+        int nCol = pIdx->def->key_def->part_count;
 
         nByte = sizeof(Mem) * nCol +
            ROUND8(sizeof(UnpackedRecord));
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index e79164781..9dd4721ad 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -376,19 +376,21 @@ whereScanInit(WhereScan * pScan, /* The WhereScan object being initialized */
   pScan->is_column_seen = false;
   if (pIdx) {
      int j = iColumn;
-     iColumn = pIdx->aiColumn[j];
-     if (iColumn == XN_EXPR) {
-        pScan->pIdxExpr = pIdx->aColExpr->a[j].pExpr;
-     } else if (iColumn >= 0) {
+     iColumn = pIdx->def->key_def->parts[j].fieldno;
+     /*
+      * pIdx->tnum == 0 means that pIdx is a fake integer
+      * primary key index
+      */
+     if (pIdx->tnum == 0)
+        iColumn = -1;
+
+     if (iColumn >= 0) {
         char affinity =
            pIdx->pTable->def->fields[iColumn].affinity;
         pScan->idxaff = affinity;
-        uint32_t id;
-        pScan->coll = sql_index_collation(pIdx, j, &id);
+        pScan->coll = pIdx->def->key_def->parts[j].coll;
         pScan->is_column_seen = true;
      }
-  } else if (iColumn == XN_EXPR) {
-     return 0;
   }
   pScan->opMask = opMask;
   pScan->k = 0;
@@ -466,16 +468,16 @@ findIndexCol(Parse * pParse, /* Parse context */
 {
   for (int i = 0; i < pList->nExpr; i++) {
      Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
-     if (p->op == TK_COLUMN &&
-         p->iColumn == pIdx->aiColumn[iCol] &&
-         p->iTable == iBase) {
+     if (p->op == TK_COLUMN && p->iTable == iBase &&
+        p->iColumn == (int) pIdx->def->key_def->
+        parts[iCol].fieldno) {
         bool is_found;
         uint32_t id;
         struct coll *coll = sql_expr_coll(pParse,
                       pList->a[i].pExpr,
                       &is_found, &id);
         if (is_found &&
-            coll == sql_index_collation(pIdx, iCol, &id)) {
+            coll == pIdx->def->key_def->parts[iCol].coll) {
            return i;
         }
      }
@@ -484,27 +486,6 @@ findIndexCol(Parse * pParse,  /* Parse context */
   return -1;
 }
 
-/*
- * Return TRUE if the iCol-th column of index pIdx is NOT NULL
- */
-static int
-indexColumnNotNull(Index * pIdx, int iCol)
-{
-  int j;
-  assert(pIdx != 0);
-  assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
-  j = pIdx->aiColumn[iCol];
-  if (j >= 0) {
-     return !pIdx->pTable->def->fields[j].is_nullable;
-  } else if (j == (-1)) {
-     return 1;
-  } else {
-     assert(j == (-2));
-     return 0;  /* Assume an indexed expression can always yield a NULL */
-
-  }
-}
-
 /*
  * Return true if the DISTINCT expression-list passed as the third argument
  * is redundant.
@@ -556,9 +537,9 @@ isDistinctRedundant(Parse * pParse,       /* Parsing context */
    *      contain a "col=X" term are subject to a NOT NULL constraint.
    */
   for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-     if (!index_is_unique(pIdx))
+     if (!pIdx->def->opts.is_unique)
         continue;
-     int col_count = index_column_count(pIdx);
+     int col_count = pIdx->def->key_def->part_count;
      for (i = 0; i < col_count; i++) {
         if (0 ==
             sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
@@ -566,11 +547,12 @@ isDistinctRedundant(Parse * pParse,     /* Parsing context */
            if (findIndexCol
                (pParse, pDistinct, iBase, pIdx, i) < 0)
               break;
-           if (indexColumnNotNull(pIdx, i) == 0)
+           uint32_t j = pIdx->def->key_def->parts[i].fieldno;
+           if (!pIdx->pTable->def->fields[j].is_nullable == 0)
               break;
         }
      }
-     if (i == (int)index_column_count(pIdx)) {
+     if (i == (int) pIdx->def->key_def->part_count) {
         /* This index implies that the DISTINCT qualifier is redundant. */
         return 1;
      }
@@ -1107,7 +1089,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
 char
 sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
 {
-  assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
+  assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
   if (!pIdx->zColAff) {
      if (sqlite3IndexAffinityStr(db, pIdx) == 0)
         return AFFINITY_BLOB;
@@ -1169,13 +1151,12 @@ whereRangeSkipScanEst(Parse * pParse,     /* Parsing & code generating context */
   int nUpper = index->def->opts.stat->sample_count + 1;
   int rc = SQLITE_OK;
   u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
-  uint32_t id;
 
   sqlite3_value *p1 = 0; /* Value extracted from pLower */
   sqlite3_value *p2 = 0; /* Value extracted from pUpper */
   sqlite3_value *pVal = 0;   /* Value extracted from record */
 
-  struct coll *pColl = sql_index_collation(p, nEq, &id);
+  struct coll *pColl = p->def->key_def->parts[nEq].coll;
   if (pLower) {
      rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
                      aff, &p1);
@@ -1521,7 +1502,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing & code generating context */
   int bOk;
 
   assert(nEq >= 1);
-  assert(nEq <= (int)index_column_count(p));
+  assert(nEq <= (int) p->def->key_def->part_count);
   assert(pBuilder->nRecValid < nEq);
 
   /* If values are not available for all fields of the index to the left
@@ -1542,7 +1523,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing & code generating context */
 
   whereKeyStats(pParse, p, pRec, 0, a);
   WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
-          p->zName, nEq - 1, (int)a[1]));
+        p->def->name, nEq - 1, (int)a[1]));
   *pnRow = a[1];
 
   return rc;
@@ -1674,7 +1655,7 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
            pItem->zAlias ? pItem->zAlias : pTab->def->name);
 #endif
   const char *zName;
-  if (p->pIndex && (zName = p->pIndex->zName) != 0) {
+  if (p->pIndex && (zName = p->pIndex->def->name) != 0) {
      if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
         int i = sqlite3Strlen30(zName) - 1;
         while (zName[i] != '_')
@@ -2236,7 +2217,7 @@ whereRangeVectorLen(Parse * pParse,  /* Parsing context */
   int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
   int i;
 
-  nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
+  nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
   for (i = 1; i < nCmp; i++) {
      /* Test if comparison i of pTerm is compatible with column (i+nEq)
       * of the index. If not, exit the loop.
@@ -2259,7 +2240,7 @@ whereRangeVectorLen(Parse * pParse,  /* Parsing context */
       */
      if (pLhs->op != TK_COLUMN
          || pLhs->iTable != iCur
-         || pLhs->iColumn != pIdx->aiColumn[i + nEq]
+         || pLhs->iColumn != (int)pIdx->def->key_def->parts[i + nEq].fieldno
          || sql_index_column_sort_order(pIdx, i + nEq) !=
             sql_index_column_sort_order(pIdx, nEq)) {
         break;
@@ -2275,7 +2256,7 @@ whereRangeVectorLen(Parse * pParse,  /* Parsing context */
      pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
      if (pColl == 0)
         break;
-          if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
+     if (pIdx->def->key_def->parts[(i + nEq)].coll != pColl)
         break;
   }
   return i;
@@ -2318,13 +2299,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,    /* The WhereLoop factory */
   LogEst rSize;     /* Number of rows in the table */
   LogEst rLogSize;   /* Logarithm of table size */
   WhereTerm *pTop = 0, *pBtm = 0;    /* Top and bottom range constraints */
-  uint32_t nProbeCol = index_column_count(pProbe);
+  uint32_t nProbeCol = pProbe->def->key_def->part_count;
 
   pNew = pBuilder->pNew;
   if (db->mallocFailed)
      return SQLITE_NOMEM_BKPT;
   WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
-           pProbe->zName, pNew->nEq));
+        pProbe->def->name, pNew->nEq));
 
   assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
   if (pNew->wsFlags & WHERE_BTM_LIMIT) {
@@ -2374,8 +2355,9 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,  /* The WhereLoop factory */
      LogEst nOutUnadjusted; /* nOut before IN() and WHERE adjustments */
      int nIn = 0;
      int nRecValid = pBuilder->nRecValid;
+     uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
      if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
-         && indexColumnNotNull(pProbe, saved_nEq)
+         && !pProbe->pTable->def->fields[j].is_nullable
          ) {
         continue;  /* ignore IS [NOT] NULL constraints on NOT NULL columns */
      }
@@ -2445,13 +2427,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,    /* The WhereLoop factory */
                      */
         }
      } else if (eOp & WO_EQ) {
-        int iCol = pProbe->aiColumn[saved_nEq];
+        int iCol = pProbe->def->key_def->parts[saved_nEq].fieldno;
         pNew->wsFlags |= WHERE_COLUMN_EQ;
         assert(saved_nEq == pNew->nEq);
-        if ((iCol > 0 && nInMul == 0
-           && saved_nEq == nProbeCol - 1)
-            ) {
-           if (iCol >= 0 &&
+        if ((iCol > 0 && nInMul == 0 &&
+              saved_nEq == nProbeCol - 1)
+           ) {
+           if (pProbe->tnum != 0 &&
                !index_is_unique_not_null(pProbe)) {
               pNew->wsFlags |= WHERE_UNQ_WANTED;
            } else {
@@ -2514,8 +2496,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,  /* The WhereLoop factory */
         assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
 
         assert(pNew->nOut == saved_nOut);
-        if (pTerm->truthProb <= 0
-            && pProbe->aiColumn[saved_nEq] >= 0) {
+        if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
            assert((eOp & WO_IN) || nIn == 0);
            testcase(eOp & WO_IN);
            pNew->nOut += pTerm->truthProb;
@@ -2671,7 +2652,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,  /* The WhereLoop factory */
   }
 
   WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
-           pProbe->zName, saved_nEq, rc));
+        pProbe->def->name, saved_nEq, rc));
   return rc;
 }
 
@@ -2715,7 +2696,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
   ExprList *pOB;
   ExprList *aColExpr;
   int ii, jj;
-  int nIdxCol = index_column_count(pIndex);
+  int nIdxCol = pIndex->def->key_def->part_count;
   if (index_is_unordered(pIndex))
      return 0;
   if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
@@ -2726,13 +2707,12 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
         if (pExpr->iColumn < 0)
            return 1;
         for (jj = 0; jj < nIdxCol; jj++) {
-           if (pExpr->iColumn == pIndex->aiColumn[jj])
+           if (pExpr->iColumn == (int)
+               pIndex->def->key_def->parts[jj].fieldno)
               return 1;
         }
      } else if ((aColExpr = pIndex->aColExpr) != 0) {
         for (jj = 0; jj < nIdxCol; jj++) {
-           if (pIndex->aiColumn[jj] != XN_EXPR)
-              continue;
            if (sqlite3ExprCompare
                (pExpr, aColExpr->a[jj].pExpr,
                 iCursor) == 0) {
@@ -2815,7 +2795,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,   /* WHERE clause information */
   Index *pProbe;    /* An index we are evaluating */
   Index sPk;    /* A fake index object for the primary key */
   LogEst aiRowEstPk[2];  /* The aiRowLogEst[] value for the sPk index */
-  i16 aiColumnPk = -1;   /* The aColumn[] value for the sPk index */
   SrcList *pTabList; /* The FROM clause */
   struct SrcList_item *pSrc; /* The FROM clause btree term to add */
   WhereLoop *pNew;   /* Template WhereLoop object */
@@ -2846,11 +2825,27 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder, /* WHERE clause information */
       */
      Index *pFirst; /* First of real indices on the table */
      memset(&sPk, 0, sizeof(Index));
-     sPk.nColumn = 1;
-     sPk.aiColumn = &aiColumnPk;
      sPk.aiRowLogEst = aiRowEstPk;
      sPk.onError = ON_CONFLICT_ACTION_REPLACE;
      sPk.pTable = pTab;
+
+     struct key_def *key_def = key_def_new(1);
+     if (key_def == NULL)
+        return SQLITE_ERROR;
+
+     key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
+            ON_CONFLICT_ACTION_ABORT,
+            NULL, COLL_NONE, SORT_ORDER_ASC);
+
+     struct index_opts index_opts = index_opts_default;
+
+     sPk.def = index_def_new(pTab->def->id, 0, "primary",
+           sizeof("primary") - 1, TREE, &index_opts,
+           key_def, NULL);
+
+     if (sPk.def == NULL)
+        return SQLITE_ERROR;
+
      aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
      aiRowEstPk[1] = 0;
      pFirst = pSrc->pTab->pIndex;
@@ -3325,8 +3320,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    /* The WHERE clause */
               index_is_unordered(pIndex)) {
            return 0;
         } else {
-           nColumn = index_column_count(pIndex);
-           isOrderDistinct = index_is_unique(pIndex);
+           nColumn = pIndex->def->key_def->part_count;
+           isOrderDistinct = pIndex->def->opts.is_unique;
         }
 
         /* Loop through all columns of the index and deal with the ones
@@ -3387,7 +3382,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    /* The WHERE clause */
             * (revIdx) for the j-th column of the index.
             */
            if (pIndex) {
-              iColumn = pIndex->aiColumn[j];
+              iColumn = pIndex->def->key_def->parts[j].fieldno;
               revIdx = sql_index_column_sort_order(pIndex,
                                j);
               if (iColumn == pIndex->pTable->iPKey)
@@ -3442,8 +3437,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    /* The WHERE clause */
                              pOrderBy->a[i].pExpr,
                              &is_found, &id);
                  struct coll *idx_coll =
-                    sql_index_collation(pIndex,
-                              j, &id);
+                    pIndex->def->key_def->parts[j].coll;
                  if (is_found &&
                      coll != idx_coll)
                     continue;
@@ -4105,9 +4099,9 @@ whereShortCut(WhereLoopBuilder * pBuilder)
   } else {
      for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
         int opMask;
-        int nIdxCol = index_column_count(pIdx);
+        int nIdxCol = pIdx->def->key_def->part_count;
         assert(pLoop->aLTermSpace == pLoop->aLTerm);
-        if (!index_is_unique(pIdx)
+        if (!pIdx->def->opts.is_unique
             || pIdx->pPartIdxWhere != 0
             || nIdxCol > ArraySize(pLoop->aLTermSpace)
             )
@@ -4650,7 +4644,7 @@ sqlite3WhereBegin(Parse * pParse,    /* The parser context */
               wctrlFlags & WHERE_ORDERBY_MIN) == 0) {
               sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ); /* Hint to COMDB2 */
            }
-           VdbeComment((v, "%s", pIx->zName));
+           VdbeComment((v, "%s", pIx->def->name));
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
            {
               u64 colUsed = 0;
@@ -4781,7 +4775,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
      if (pLevel->addrSkip) {
         sqlite3VdbeGoto(v, pLevel->addrSkip);
         VdbeComment((v, "next skip-scan on %s",
-                pLoop->pIndex->zName));
+                pLoop->pIndex->def->name));
         sqlite3VdbeJumpHere(v, pLevel->addrSkip);
         sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
      }
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index 09b267194..94aae9958 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -47,9 +47,8 @@
 static const char *
 explainIndexColumnName(Index * pIdx, int i)
 {
-  i = pIdx->aiColumn[i];
-  if (i == XN_EXPR)
-     return "<expr>";
+//    i = pIdx->aiColumn[i];
+  i = pIdx->def->key_def->parts[i].fieldno;
   return pIdx->pTable->def->fields[i].name;
 }
 
@@ -222,7 +221,7 @@ sqlite3WhereExplainOneScan(Parse * pParse, /* Parse context */
         }
         if (zFmt) {
            sqlite3StrAccumAppend(&str, " USING ", 7);
-           sqlite3XPrintf(&str, zFmt, pIdx->zName);
+           sqlite3XPrintf(&str, zFmt, pIdx->def->name);
            explainIndexRange(&str, pLoop);
         }
      } else if ((flags & WHERE_IPK) != 0
@@ -708,7 +707,7 @@ codeAllEqualityTerms(Parse * pParse,   /* Parsing context */
      sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
      VdbeCoverageIf(v, bRev == 0);
      VdbeCoverageIf(v, bRev != 0);
-     VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
+     VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
      j = sqlite3VdbeAddOp0(v, OP_Goto);
      pLevel->addrSkip =
          sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
@@ -718,8 +717,7 @@ codeAllEqualityTerms(Parse * pParse,   /* Parsing context */
      sqlite3VdbeJumpHere(v, j);
      for (j = 0; j < nSkip; j++) {
         sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                pIdx->aiColumn[j], regBase + j);
-        testcase(pIdx->aiColumn[j] == XN_EXPR);
+                pIdx->def->key_def->parts[j].fieldno, regBase + j);
         VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
      }
   }
@@ -1245,10 +1243,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,   /* Complete information about t
      assert(pWInfo->pOrderBy == 0
             || pWInfo->pOrderBy->nExpr == 1
             || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
-     int nIdxCol = index_column_count(pIdx);
+     int nIdxCol = pIdx->def->key_def->part_count;
      if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
          && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
-        j = pIdx->aiColumn[nEq];
+        j = pIdx->def->key_def->parts[nEq].fieldno;
         /* Allow seek for column with `NOT NULL` == false attribute.
          * If a column may contain NULL-s, the comparator installed
          * by Tarantool is prepared to seek using a NULL value.
@@ -1304,12 +1302,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,    /* Complete information about t
         }
 #endif
         if (pRangeStart == 0) {
-           j = pIdx->aiColumn[nEq];
-           if ((j >= 0 &&
-                pIdx->pTable->def->fields[j].is_nullable)||
-               j == XN_EXPR) {
+           j = pIdx->def->key_def->parts[nEq].fieldno;
+           if (pIdx->pTable->def->fields[j].is_nullable)
               bSeekPastNull = 1;
-           }
         }
      }
      assert(pRangeEnd == 0
@@ -1386,9 +1381,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,    /* Complete information about t
      }
      struct Index *pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
      assert(pk);
-     int nPkCol = index_column_count(pk);
+     int nPkCol = pk->def->key_def->part_count;
+     uint32_t zero_fieldno = pk->def->key_def->parts[0].fieldno;
      char affinity =
-        pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+        pIdx->pTable->def->fields[zero_fieldno].affinity;
      if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
         /* Right now INTEGER PRIMARY KEY is the only option to
          * get Tarantool's INTEGER column type. Need special handling
@@ -1397,7 +1393,8 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo, /* Complete information about t
          */
         int limit = pRangeStart == NULL ? nEq : nEq + 1;
         for (int i = 0; i < limit; i++) {
-           if (pIdx->aiColumn[i] == pk->aiColumn[0]) {
+           if (pIdx->def->key_def->parts[i].fieldno ==
+               zero_fieldno) {
               /* Here: we know for sure that table has INTEGER
                  PRIMARY KEY, single column, and Index we're
                  trying to use for scan contains this column. */
@@ -1506,10 +1503,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,   /* Complete information about t
         /* pIdx is a covering index.  No need to access the main table. */
      }  else if (iCur != iIdxCur) {
         Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-        int nPkCol = index_column_count(pPk);
+        int nPkCol = pPk->def->key_def->part_count;
         int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
         for (j = 0; j < nPkCol; j++) {
-           k = pPk->aiColumn[j];
+           k = pPk->def->key_def->parts[j].fieldno;
            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
                    iKeyReg + j);
         }
@@ -1614,7 +1611,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo, /* Complete information about t
       */
      if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
         Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-        int nPkCol = index_column_count(pPk);
+        int nPkCol = pPk->def->key_def->part_count;
         regRowset = pParse->nTab++;
         sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
                 regRowset, nPkCol);
@@ -1718,13 +1715,15 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,   /* Complete information about t
                  int iSet =
                      ((ii == pOrWc->nTerm - 1) ? -1 : ii);
                  Index *pPk = sqlite3PrimaryKeyIndex (pTab);
-                 int nPk = index_column_count(pPk);
+                 int nPk = pPk->def->key_def->part_count;
                  int iPk;
 
                  /* Read the PK into an array of temp registers. */
                  r = sqlite3GetTempRange(pParse, nPk);
                  for (iPk = 0; iPk < nPk; iPk++) {
-                    int iCol = pPk->aiColumn[iPk];
+                    int iCol = pPk->def->
+                       key_def->
+                       parts[iPk].fieldno;
                     sqlite3ExprCodeGetColumnToReg
                        (pParse, pTab->def,
                         iCol, iCur,
diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
index aa6d4524d..40e4e2577 100644
--- a/src/box/sql/whereexpr.c
+++ b/src/box/sql/whereexpr.c
@@ -903,7 +903,6 @@ exprMightBeIndexed(SrcList * pFrom,    /* The FROM clause */
         int *piColumn   /* Write the referenced table column number here */
     )
 {
-  Index *pIdx;
   int i;
   int iCur;
 
@@ -930,20 +929,6 @@ exprMightBeIndexed(SrcList * pFrom,   /* The FROM clause */
   for (i = 0; mPrereq > 1; i++, mPrereq >>= 1) {
   }
   iCur = pFrom->a[i].iCursor;
-  for (pIdx = pFrom->a[i].pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-     if (pIdx->aColExpr == 0)
-        continue;
-     for (i = 0; i < pIdx->nColumn; i++) {
-        if (pIdx->aiColumn[i] != XN_EXPR)
-           continue;
-        if (sqlite3ExprCompare
-            (pExpr, pIdx->aColExpr->a[i].pExpr, iCur) == 0) {
-           *piCur = iCur;
-           *piColumn = XN_EXPR;
-           return 1;
-        }
-     }
-  }
   return 0;
 }
 
-- 
2.14.3 (Apple Git-98)
-- 
Ivan Koptelov
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20180607/0e38911a/attachment.html>


More information about the Tarantool-patches mailing list