<html>
  <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
  </head>
  <body text="#000000" bgcolor="#FFFFFF">
    <div class="moz-cite-prefix">On 02/07/2018 09:06, Ivan Koptelov
      wrote:<br>
    </div>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">Thank
      you for the patch. <br>
      <blockquote type="cite">Hello. Thanks for the fixes! See my 6
        comments below. <br>
        <br>
        And I have pushed more fixes on the branch. Please, <br>
        look and squash. <br>
        <br>
        <blockquote type="cite">Now every sqlite struct Index is created
          with tnt struct <br>
          index_def inside. This allows us to use tnt index_def <br>
          in work with sqlite indexes in the same manner as with <br>
          tnt index and is a step to remove sqlite Index with <br>
          tnt index. <br>
          Fields coll_array, coll_id_array, aiColumn, sort_order <br>
          and zName are removed from Index. All usages of this <br>
          fields changed to usage of corresponding index_def <br>
          fields. <br>
          index_is_unique(), sql_index_collation() and <br>
          index_column_count() are removed with calls of <br>
          index_def corresponding fields. <br>
          <br>
          Closes: #3369 <br>
          <br>
          --- <br>
          Branch: <br>
          <a class="moz-txt-link-freetext"
href="https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where">https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where</a>
          <br>
          Issue:<a class="moz-txt-link-freetext"
            href="https://github.com/tarantool/tarantool/issues/3369">https://github.com/tarantool/tarantool/issues/3369</a>
          <br>
          <br>
            src/box/sql.c                        |  54 ++- <br>
            src/box/sql/analyze.c                |  85 ++--- <br>
            src/box/sql/build.c                  | 713
          +++++++++++++++++------------------ <br>
            src/box/sql/delete.c                 |  10 +- <br>
            src/box/sql/expr.c                   |  61 +-- <br>
            src/box/sql/fkey.c                   | 132 +++---- <br>
            src/box/sql/insert.c                 | 145 ++++--- <br>
            src/box/sql/pragma.c                 |  30 +- <br>
            src/box/sql/select.c                 |   2 +- <br>
            src/box/sql/sqliteInt.h              | 111 ++---- <br>
            src/box/sql/update.c                 |  39 +- <br>
            src/box/sql/vdbeaux.c                |   2 +- <br>
            src/box/sql/vdbemem.c                |  21 +- <br>
            src/box/sql/where.c                  | 180 ++++----- <br>
            src/box/sql/wherecode.c              | 102 ++--- <br>
            test/sql-tap/colname.test.lua        |   4 +- <br>
            test/sql/message-func-indexes.result |   8 +- <br>
            17 files changed, 821 insertions(+), 878 deletions(-) <br>
          <br>
          diff --git a/src/box/sql.c b/src/box/sql.c <br>
          index 11353150e..24e37652e 100644 <br>
          --- a/src/box/sql.c <br>
          +++ b/src/box/sql.c <br>
          @@ -1452,8 +1452,8 @@ int
          tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf) <br>
          <br>
                /* If table's PK is single column which is INTEGER, then
          <br>
                 * treat it as strict type, not affinity.  */ <br>
          -    if (pk_idx && pk_idx->nColumn == 1) { <br>
          -        int pk = pk_idx->aiColumn[0]; <br>
          +    if (pk_idx != NULL &&
          pk_idx->def->key_def->part_count == 1) { <br>
          +        int pk =
          pk_idx->def->key_def->parts[0].fieldno; <br>
        </blockquote>
        <br>
        1. You again sent the patch with spaces instead of tabs. Please,
        cope <br>
        with it. Looks like you copied 'git diff/show' output. Either
        use format-patch <br>
        or use 'git --no-pager diff/show'.<br>
      </blockquote>
    </blockquote>
    Fixed and tested by mailing myself - now should be ok.<br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org"> <br>
                     } else { <br>
      -                    enum affinity_type affinity =
      def->fields[col].affinity; <br>
      -                    t = convertSqliteAffinity(affinity, <br>
      -                                          
      def->fields[col].is_nullable); <br>
      +                    t =
      convertSqliteAffinity(fields[col].affinity, <br>
      +                                          
      fields[col].is_nullable); <br>
                    } <br>
      <br>
               } else { <br>
      -            enum affinity_type affinity =
      def->fields[col].affinity; <br>
      -            t = convertSqliteAffinity(affinity, <br>
      -                          def->fields[col].is_nullable); <br>
      +            t = convertSqliteAffinity(fields[col].affinity, <br>
      +                          fields[col].is_nullable); <br>
               } <br>
      <br>
      -    if (pk_idx && pk_idx->nColumn == 1) { <br>
      -        int pk = pk_idx->aiColumn[0]; <br>
      <br>
      <blockquote type="cite"> <br>
        <blockquote type="cite">          if (def->fields[pk].type ==
          FIELD_TYPE_INTEGER) <br>
                        pk_forced_int = pk; <br>
                } <br>
          diff --git a/src/box/sql/build.c b/src/box/sql/build.c <br>
          index 0da7d805b..662fc698e 100644 <br>
          --- a/src/box/sql/build.c <br>
          +++ b/src/box/sql/build.c> @@ -2646,18 +2535,154 @@
          addIndexToTable(Index * pIndex, Table * pTab)> + struct
          Expr *column_expr = sqlite3ExprSkipCollate(expr); <br>
          +        if (column_expr->op != TK_COLUMN) { <br>
          +            sqlite3ErrorMsg(parse,
          tnt_errcode_desc(ER_UNSUPPORTED), <br>
          +                        "Tarantool", "functional indexes"); <br>
        </blockquote>
        <br>
        2. Patch to allow SQL_TARANTOOL_ERROR has been pushed today, so
        you can use <br>
        here diag_set again. <br>
      </blockquote>
    </blockquote>
    Fxd.<br>
    <br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">
      <blockquote type="cite"> <br>
        <blockquote type="cite">+            goto tnt_error; <br>
          +        } <br>
          + <br>
          @@ -2805,108 +2828,92 @@ sql_create_index(struct Parse *parse,
          struct Token *token, <br>
        </blockquote>
        <br>
        3. Crash: <br>
        <br>
        box.cfg{} <br>
        box.sql.execute('CREATE TABLE test (a int, b int, c int, PRIMARY
        KEY (a, a COLLATE kek, b, c))') <br>
        Process 15886 stopped <br>
        * thread #1, queue = 'com.apple.main-thread', stop reason =
        EXC_BAD_ACCESS (code=1, address=0x80) <br>
            frame #0: 0x000000010035f77b
        tarantool`sql_create_index(parse=0x000000010481f8b0,
        token=0x0000000000000000, tbl_name=0x0000000000000000,
        col_list=0x00000001029039d8,
        on_error=ON_CONFLICT_ACTION_DEFAULT, start=0x0000000000000000,
        where=0x0000000000000000, sort_order=SORT_ORDER_ASC,
        if_not_exist=false, idx_type='\x02') at build.c:2895 <br>
           2892         * PRIMARY KEY contains no repeated columns. <br>
           2893         */ <br>
           2894        if (IsPrimaryKeyIndex(index)) { <br>
        -> 2895            struct key_part *parts =
        index->def->key_def->parts; <br>
           2896            uint32_t part_count =
        index->def->key_def->part_count; <br>
           2897            uint32_t new_part_count = 1; <br>
           2898 <br>
        Target 0: (tarantool) stopped. <br>
        <br>
      </blockquote>
    </blockquote>
    Fixed, added a test on that. Also on case from the previous review.<br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">
      <blockquote type="cite">
        <blockquote type="cite">@@ -3070,54 +3080,17 @@
          sql_create_index(struct Parse *parse, struct Token *token, <br>
          -/** <br>
          - * Return number of columns in given index. <br>
          - * If space is ephemeral, use internal <br>
          - * SQL structure to fetch the value. <br>
          - */ <br>
          -uint32_t <br>
          -index_column_count(const Index *idx) <br>
          -{ <br>
          -    assert(idx != NULL); <br>
          -    uint32_t space_id =
          SQLITE_PAGENO_TO_SPACEID(idx->tnum); <br>
          -    struct space *space = space_by_id(space_id); <br>
          -    /* It is impossible to find an ephemeral space by id. */
          <br>
          -    if (space == NULL) <br>
          -        return idx->nColumn; <br>
          - <br>
          -    uint32_t index_id =
          SQLITE_PAGENO_TO_INDEXID(idx->tnum); <br>
          -    struct index *index = space_index(space, index_id); <br>
          -    assert(index != NULL); <br>
          -    return index->def->key_def->part_count; <br>
          -} <br>
          - <br>
          -/** Return true if given index is unique and not nullable. */
          <br>
          -bool <br>
          -index_is_unique_not_null(const Index *idx) <br>
        </blockquote>
        <br>
        4. Same as on the previous review. Still is used in a pair of
        places. <br>
      </blockquote>
    </blockquote>
    Are you sure? I searched through the whole project and didn't find
    it. <br>
    There is only a variable with the same name in one place.<br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">
      <blockquote type="cite">
        <blockquote type="cite">-{ <br>
          -    assert(idx != NULL); <br>
          -    uint32_t space_id =
          SQLITE_PAGENO_TO_SPACEID(idx->tnum); <br>
          -    struct space *space = space_by_id(space_id); <br>
          -    assert(space != NULL); <br>
          - <br>
          -    uint32_t index_id =
          SQLITE_PAGENO_TO_INDEXID(idx->tnum); <br>
          -    struct index *index = space_index(space, index_id); <br>
          -    assert(index != NULL); <br>
          -    return (index->def->opts.is_unique && <br>
          -        !index->def->key_def->is_nullable); <br>
          +    sqlite3DbFree(db, name); <br>
            } <br>
          diff --git a/src/box/sql/where.c b/src/box/sql/where.c <br>
          index c0c26ce29..225fddc23 100644 <br>
          --- a/src/box/sql/where.c <br>
          +++ b/src/box/sql/where.c <br>
          @@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder *
          pBuilder,    /* WHERE clause information */ <br>
                     */ <br>
                    Index *pFirst;    /* First of real indices on the
          table */ <br>
                    memset(&sPk, 0, sizeof(Index)); <br>
          -        sPk.nColumn = 1; <br>
          -        sPk.aiColumn = &aiColumnPk; <br>
                    sPk.aiRowLogEst = aiRowEstPk; <br>
                    sPk.onError = ON_CONFLICT_ACTION_REPLACE; <br>
                    sPk.pTable = pTab; <br>
          + <br>
          +        struct key_def *key_def = key_def_new(1); <br>
          +        if (key_def == NULL) { <br>
          +            pWInfo->pParse->nErr++; <br>
          +            pWInfo->pParse->rc = SQL_TARANTOOL_ERROR; <br>
          +            return SQL_TARANTOOL_ERROR; <br>
          +        } <br>
          + <br>
          +        key_def_set_part(key_def, 0, 0,
          pTab->def->fields[0].type, <br>
          +                 ON_CONFLICT_ACTION_ABORT, <br>
          +                 NULL, COLL_NONE, SORT_ORDER_ASC); <br>
          + <br>
          +        sPk.def = index_def_new(pTab->def->id, 0,
          "primary", <br>
          +                    sizeof("primary") - 1, TREE, <br>
          +                    &index_opts_default, key_def, NULL);
          <br>
        </blockquote>
        <br>
        <blockquote type="cite">
          <blockquote type="cite">11. Where is sPk.def is deleted? <br>
          </blockquote>
          It is deleted in freeIndex() with sPk </blockquote>
        <br>
        5. I do not see any mention of freeIndex() in where.c. Where is
        it deleted? <br>
        <br>
        sPk is declared on stack. If it was deleted with freeIndex,
        Tarantool <br>
        would crash. <br>
      </blockquote>
    </blockquote>
    Sorry.<br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">
      <blockquote type="cite">
        <blockquote type="cite">+        key_def_delete(key_def); <br>
          + <br>
          +        if (sPk.def == NULL) { <br>
          +            pWInfo->pParse->nErr++; <br>
          +            pWInfo->pParse->rc = SQL_TARANTOOL_ERROR; <br>
          +            return SQL_TARANTOOL_ERROR; <br>
          +        } <br>
          + <br>
                    aiRowEstPk[0] = sql_space_tuple_log_count(pTab); <br>
                    aiRowEstPk[1] = 0; <br>
                    pFirst = pSrc->pTab->pIndex; <br>
        </blockquote>
        <br>
        6. Where is the test, that I gave you on the previous review and
        that <br>
        lead to crash? Please, add it to the test suite. And the new
        test in <br>
        this review too. <br>
      </blockquote>
    </blockquote>
    Done.<br>
    <blockquote type="cite"
      cite="mid:146c3bd4-e9e6-f943-5a42-c6db966d1c9c@tarantool.org">
      <blockquote type="cite"> <br>
      </blockquote>
      <br>
    </blockquote>
    <pre>--
Now every sqlite struct Index is created with tnt struct
index_def inside. This allows us to use tnt index_def
in work with sqlite indexes in the same manner as with
tnt index and is a step to remove sqlite Index with
tnt index.
Fields coll_array, coll_id_array, aiColumn, sort_order
and zName are removed from Index. All usages of this
fields changed to usage of corresponding index_def
fields.
index_is_unique(), sql_index_collation() and
index_column_count() are removed with calls of
index_def corresponding fields.

Closes: #3369
---
Branch:

<a class="moz-txt-link-freetext" href="https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where">https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where</a> 
Issue:<a class="moz-txt-link-freetext" href="https://github.com/tarantool/tarantool/issues/3369">https://github.com/tarantool/tarantool/issues/3369</a>

 src/box/errcode.h                                  |   1 +
 src/box/sql.c                                      |  54 +-
 src/box/sql/analyze.c                              |  85 ++-
 src/box/sql/build.c                                | 727 ++++++++++-----------
 src/box/sql/delete.c                               |  10 +-
 src/box/sql/expr.c                                 |  61 +-
 src/box/sql/fkey.c                                 | 213 +++---
 src/box/sql/insert.c                               | 145 ++--
 src/box/sql/pragma.c                               |  30 +-
 src/box/sql/select.c                               |   2 +-
 src/box/sql/sqliteInt.h                            | 116 ++--
 src/box/sql/update.c                               |  39 +-
 src/box/sql/vdbeaux.c                              |   2 +-
 src/box/sql/vdbemem.c                              |  21 +-
 src/box/sql/where.c                                | 182 +++---
 src/box/sql/wherecode.c                            | 102 +--
 .../{collation.test.lua => collation1.test.lua}    |   0
 test/sql-tap/collation2.test.lua                   |  21 +
 test/sql-tap/colname.test.lua                      |   4 +-
 test/sql-tap/identifier_case.test.lua              |   4 +-
 test/sql/message-func-indexes.result               |   8 +-
 21 files changed, 900 insertions(+), 927 deletions(-)
 rename test/sql-tap/{collation.test.lua => collation1.test.lua} (100%)
 create mode 100755 test/sql-tap/collation2.test.lua

diff --git a/src/box/errcode.h b/src/box/errcode.h
index c76018cbf..2229c5cbd 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -215,6 +215,7 @@ struct errcode_record {
        /*160 */_(ER_ACTION_MISMATCH,           "Field %d contains %s on conflict action, but %s in index parts") \
        /*161 */_(ER_VIEW_MISSING_SQL,          "Space declared as a view must have SQL statement") \
        /*162 */_(ER_FOREIGN_KEY_CONSTRAINT,    "Can not commit transaction: deferred foreign keys violations are not resolved") \
+       /*163 */_(ER_NO_SUCH_COLLATION,         "Collation '%s' does not exist") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/sql.c b/src/box/sql.c
index 03b4f156a..b00e8655d 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1418,8 +1418,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 
        /* If table's PK is single column which is INTEGER, then
         * treat it as strict type, not affinity.  */
-       if (pk_idx && pk_idx->nColumn == 1) {
-               int pk = pk_idx->aiColumn[0];
+       if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
+               int pk = pk_idx->def->key_def->parts[0].fieldno;
                if (def->fields[pk].type == FIELD_TYPE_INTEGER)
                        pk_forced_int = pk;
        }
@@ -1530,20 +1530,19 @@ tarantoolSqlite3MakeTableOpts(Table *pTable, const char *zSql, char *buf)
  */
 int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
 {
-       struct space_def *def = pIndex->pTable->def;
-       assert(def != NULL);
+       struct field_def *fields = pIndex->pTable->def->fields;
+       struct key_def *key_def = pIndex->def->key_def;
        const struct Enc *enc = get_enc(buf);
-       struct SqliteIndex *primary_index;
-       char *base = buf, *p;
-       int pk_forced_int = -1;
-
-       primary_index = sqlite3PrimaryKeyIndex(pIndex->pTable);
+       char *base = buf;
+       uint32_t pk_forced_int = UINT32_MAX;
+       struct SqliteIndex *primary_index =
+               sqlite3PrimaryKeyIndex(pIndex->pTable);
 
        /* If table's PK is single column which is INTEGER, then
         * treat it as strict type, not affinity.  */
-       if (primary_index->nColumn == 1) {
-               int pk = primary_index->aiColumn[0];
-               if (def->fields[pk].type == FIELD_TYPE_INTEGER)
+       if (primary_index->def->key_def->part_count == 1) {
+               int pk = primary_index->def->key_def->parts[0].fieldno;
+               if (fields[pk].type == FIELD_TYPE_INTEGER)
                        pk_forced_int = pk;
        }
 
@@ -1553,46 +1552,45 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
         * primary key columns. Query planner depends on this particular
         * data layout.
         */
-       int i, n = pIndex->nColumn;
-
-       p = enc->encode_array(base, n);
-       for (i = 0; i < n; i++) {
-               int col = pIndex->aiColumn[i];
-               assert(def->fields[col].is_nullable ==
-                      action_is_nullable(def->fields[col].nullable_action));
+       struct key_part *part = key_def->parts;
+       char *p = enc->encode_array(base, key_def->part_count);
+       for (uint32_t i = 0; i < key_def->part_count; ++i, ++part) {
+               uint32_t col = part->fieldno;
+               assert(fields[col].is_nullable ==
+                      action_is_nullable(fields[col].nullable_action));
                const char *t;
                if (pk_forced_int == col) {
                        t = "integer";
                } else {
-                       enum affinity_type affinity = def->fields[col].affinity;
-                       t = convertSqliteAffinity(affinity,
-                                                 def->fields[col].is_nullable);
+                       t = convertSqliteAffinity(fields[col].affinity,
+                                                 fields[col].is_nullable);
                }
                /* do not decode default collation */
-               uint32_t cid = pIndex->coll_id_array[i];
+               uint32_t cid = part->coll_id;
                p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
                p = enc->encode_str(p, "type", sizeof("type")-1);
                p = enc->encode_str(p, t, strlen(t));
                p = enc->encode_str(p, "field", sizeof("field")-1);
                p = enc->encode_uint(p, col);
                if (cid != COLL_NONE) {
-                       p = enc->encode_str(p, "collation", sizeof("collation")-1);
+                       p = enc->encode_str(p, "collation",
+                                           sizeof("collation") - 1);
                        p = enc->encode_uint(p, cid);
                }
                p = enc->encode_str(p, "is_nullable", 11);
-               p = enc->encode_bool(p, def->fields[col].is_nullable);
+               p = enc->encode_bool(p, fields[col].is_nullable);
                p = enc->encode_str(p, "nullable_action", 15);
                const char *action_str =
-                       on_conflict_action_strs[def->fields[col].nullable_action];
+                       on_conflict_action_strs[fields[col].nullable_action];
                p = enc->encode_str(p, action_str, strlen(action_str));
 
                p = enc->encode_str(p, "sort_order", 10);
-               enum sort_order sort_order = pIndex->sort_order[i];
+               enum sort_order sort_order = part->sort_order;
                assert(sort_order < sort_order_MAX);
                const char *sort_order_str = sort_order_strs[sort_order];
                p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
        }
-       return (int)(p - base);
+       return p - base;
 }
 
 /*
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index 5f73f026e..c5afff214 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -848,8 +848,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
                int addrRewind; /* Address of "OP_Rewind iIdxCur" */
                int addrNextRow;        /* Address of "next_row:" */
-               const char *zIdxName;   /* Name of the index */
-               int nColTest;   /* Number of columns to test for changes */
+               const char *idx_name;   /* Name of the index */
 
                if (pOnlyIdx && pOnlyIdx != pIdx)
                        continue;
@@ -857,17 +856,16 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                 * names. Thus, for the sake of clarity, use
                 * instead more familiar table name.
                 */
-               if (IsPrimaryKeyIndex(pIdx)) {
-                       zIdxName = pTab->def->name;
-               } else {
-                       zIdxName = pIdx->zName;
-               }
-               nColTest = index_column_count(pIdx);
+               if (IsPrimaryKeyIndex(pIdx))
+                       idx_name = pTab->def->name;
+               else
+                       idx_name = pIdx->def->name;
+               int part_count = pIdx->def->key_def->part_count;
 
                /* Populate the register containing the index name. */
-               sqlite3VdbeLoadString(v, regIdxname, zIdxName);
+               sqlite3VdbeLoadString(v, regIdxname, idx_name);
                VdbeComment((v, "Analysis for %s.%s", pTab->def->name,
-                       zIdxName));
+                           idx_name));
 
                /*
                 * Pseudo-code for loop that calls stat_push():
@@ -906,7 +904,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                 * when building a record to insert into the sample column of
                 * the _sql_stat4 table).
                 */
-               pParse->nMem = MAX(pParse->nMem, regPrev + nColTest);
+               pParse->nMem = MAX(pParse->nMem, regPrev + part_count);
 
                /* Open a read-only cursor on the index being analyzed. */
                struct space *space =
@@ -917,7 +915,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
                                  space_ptr_reg);
                sql_vdbe_set_p4_key_def(pParse, pIdx);
-               VdbeComment((v, "%s", pIdx->zName));
+               VdbeComment((v, "%s", pIdx->def->name));
 
                /* Invoke the stat_init() function. The arguments are:
                 *
@@ -930,8 +928,8 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                 * The third argument is only used for STAT4
                 */
                sqlite3VdbeAddOp2(v, OP_Count, iIdxCur, regStat4 + 3);
-               sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 1);
-               sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 2);
+               sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 1);
+               sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 2);
                sqlite3VdbeAddOp4(v, OP_Function0, 0, regStat4 + 1, regStat4,
                                  (char *)&statInitFuncdef, P4_FUNCDEF);
                sqlite3VdbeChangeP5(v, 3);
@@ -949,11 +947,11 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                sqlite3VdbeAddOp2(v, OP_Integer, 0, regChng);
                addrNextRow = sqlite3VdbeCurrentAddr(v);
 
-               if (nColTest > 0) {
+               if (part_count > 0) {
                        int endDistinctTest = sqlite3VdbeMakeLabel(v);
                        int *aGotoChng; /* Array of jump instruction addresses */
                        aGotoChng =
-                           sqlite3DbMallocRawNN(db, sizeof(int) * nColTest);
+                           sqlite3DbMallocRawNN(db, sizeof(int) * part_count);
                        if (aGotoChng == 0)
                                continue;
 
@@ -969,7 +967,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                         */
                        sqlite3VdbeAddOp0(v, OP_Goto);
                        addrNextRow = sqlite3VdbeCurrentAddr(v);
-                       if (nColTest == 1 && index_is_unique(pIdx)) {
+                       if (part_count == 1 && pIdx->def->opts.is_unique) {
                                /* For a single-column UNIQUE index, once we have found a non-NULL
                                 * row, we know that all the rest will be distinct, so skip
                                 * subsequent distinctness tests.
@@ -978,13 +976,12 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                                                  endDistinctTest);
                                VdbeCoverage(v);
                        }
-                       for (i = 0; i < nColTest; i++) {
-                               uint32_t id;
-                               struct coll *coll =
-                                       sql_index_collation(pIdx, i, &id);
+                       struct key_part *part = pIdx->def->key_def->parts;
+                       for (i = 0; i < part_count; ++i, ++part) {
+                               struct coll *coll = part->coll;
                                sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                                 pIdx->aiColumn[i], regTemp);
+                                                 part->fieldno, regTemp);
                                aGotoChng[i] =
                                    sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
                                                      regPrev + i, (char *)coll,
@@ -992,7 +989,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                                sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
                                VdbeCoverage(v);
                        }
-                       sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regChng);
+                       sqlite3VdbeAddOp2(v, OP_Integer, part_count, regChng);
                        sqlite3VdbeGoto(v, endDistinctTest);
 
                        /*
@@ -1003,11 +1000,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                         *  ...
                         */
                        sqlite3VdbeJumpHere(v, addrNextRow - 1);
-                       for (i = 0; i < nColTest; i++) {
+                       part = pIdx->def->key_def->parts;
+                       for (i = 0; i < part_count; ++i, ++part) {
                                sqlite3VdbeJumpHere(v, aGotoChng[i]);
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                                 pIdx->aiColumn[i],
-                                                 regPrev + i);
+                                                 part->fieldno, regPrev + i);
                        }
                        sqlite3VdbeResolveLabel(v, endDistinctTest);
                        sqlite3DbFree(db, aGotoChng);
@@ -1022,19 +1019,18 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                 */
                assert(regKey == (regStat4 + 2));
                Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-               int j, k, regKeyStat;
-               int nPkColumn = (int)index_column_count(pPk);
-               regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
-               for (j = 0; j < nPkColumn; j++) {
-                       k = pPk->aiColumn[j];
-                       assert(k >= 0 && k < (int)pTab->def->field_count);
-                       sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
-                       VdbeComment((v, "%s",
-                               pTab->def->fields[pPk->aiColumn[j]].name));
+               int pk_part_count = (int) pPk->def->key_def->part_count;
+               int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
+               for (int j = 0; j < pk_part_count; ++j) {
+                       int k = pPk->def->key_def->parts[j].fieldno;
+                       assert(k >= 0 && k < (int) pTab->def->field_count);
+                       sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
+                                         regKeyStat + j);
+                       VdbeComment((v, "%s", pTab->def->fields[k].name));
                }
                sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
-                                 nPkColumn, regKey);
-               sqlite3ReleaseTempRange(pParse, regKeyStat, nPkColumn);
+                                 pk_part_count, regKey);
+               sqlite3ReleaseTempRange(pParse, regKeyStat, pk_part_count);
 
                assert(regChng == (regStat4 + 1));
                sqlite3VdbeAddOp4(v, OP_Function0, 1, regStat4, regTemp,
@@ -1057,11 +1053,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                int regDLt = regStat1 + 2;
                int regSample = regStat1 + 3;
                int regCol = regStat1 + 4;
-               int regSampleKey = regCol + nColTest;
+               int regSampleKey = regCol + part_count;
                int addrNext;
                int addrIsNull;
 
-               pParse->nMem = MAX(pParse->nMem, regCol + nColTest);
+               pParse->nMem = MAX(pParse->nMem, regCol + part_count);
 
                addrNext = sqlite3VdbeCurrentAddr(v);
                callStatGet(v, regStat4, STAT_GET_KEY, regSampleKey);
@@ -1077,12 +1073,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                 * be taken
                 */
                VdbeCoverageNeverTaken(v);
-               for (i = 0; i < nColTest; i++) {
-                       sqlite3ExprCodeLoadIndexColumn(pParse, pIdx,
-                                                                        iTabCur, i,
-                                                                        regCol + i);
+               for (i = 0; i < part_count; i++) {
+                       sqlite3ExprCodeLoadIndexColumn(pParse, pIdx, iTabCur, i,
+                                                      regCol + i);
                }
-               sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nColTest,
+               sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, part_count,
                                  regSample);
                sqlite3VdbeAddOp3(v, OP_MakeRecord, regTabname, 6, regTemp);
                sqlite3VdbeAddOp2(v, OP_IdxReplace, iStatCur + 1, regTemp);
@@ -1146,7 +1141,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index * pOnlyIdx)
        iStatCur = pParse->nTab;
        pParse->nTab += 3;
        if (pOnlyIdx) {
-               openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
+               openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
        } else {
                openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
        }
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 3737a119f..48e0b8d5e 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -241,6 +241,8 @@ static void
 freeIndex(sqlite3 * db, Index * p)
 {
        sql_expr_delete(db, p->pPartIdxWhere, false);
+       if (p->def != NULL)
+               index_def_delete(p->def);
        sqlite3DbFree(db, p->zColAff);
        sqlite3DbFree(db, p);
 }
@@ -259,7 +261,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index * pIndex)
 
        struct session *user_session = current_session();
 
-       pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
+       pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
+                                  pIndex->def->name, 0);
        if (ALWAYS(pIndex)) {
                if (pIndex->pTable->pIndex == pIndex) {
                        pIndex->pTable->pIndex = pIndex->pNext;
@@ -364,7 +367,7 @@ deleteTable(sqlite3 * db, Table * pTable)
                pNext = pIndex->pNext;
                assert(pIndex->pSchema == pTable->pSchema);
                if ((db == 0 || db->pnBytesFreed == 0)) {
-                       char *zName = pIndex->zName;
+                       char *zName = pIndex->def->name;
                        TESTONLY(Index *
                                 pOld =) sqlite3HashInsert(&pTable->idxHash,
                                                           zName, 0);
@@ -1026,7 +1029,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
        Table *p = pParse->pNewTable;
        if (p == NULL)
                return;
-       int i = p->def->field_count - 1;
+       uint32_t i = p->def->field_count - 1;
        sqlite3 *db = pParse->db;
        char *zColl = sqlite3NameFromToken(db, pToken);
        if (!zColl)
@@ -1034,22 +1037,21 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
        uint32_t *id = &p->def->fields[i].coll_id;
        p->aCol[i].coll = sql_get_coll_seq(pParse, zColl, id);
        if (p->aCol[i].coll != NULL) {
-               Index *pIdx;
                /* If the column is declared as "<name> PRIMARY KEY COLLATE <type>",
                 * then an index may have been created on this column before the
                 * collation type was added. Correct this if it is the case.
                 */
-               for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
-                       assert(pIdx->nColumn == 1);
-                       if (pIdx->aiColumn[0] == i) {
-                               id = &pIdx->coll_id_array[0];
-                               pIdx->coll_array[0] =
+               for (struct Index *pIdx = p->pIndex; pIdx != NULL;
+                    pIdx = pIdx->pNext) {
+                       assert(pIdx->def->key_def->part_count == 1);
+                       if (pIdx->def->key_def->parts[0].fieldno == i) {
+                               id = &pIdx->def->key_def->parts[0].coll_id;
+                               pIdx->def->key_def->parts[0].coll =
                                        sql_column_collation(p->def, i, id);
                        }
                }
-       } else {
-               sqlite3DbFree(db, zColl);
        }
+       sqlite3DbFree(db, zColl);
 }
 
 struct coll *
@@ -1079,66 +1081,6 @@ sql_column_collation(struct space_def *def, uint32_t column, uint32_t *coll_id)
        return space->format->fields[column].coll;
 }
 
-struct key_def*
-sql_index_key_def(struct Index *idx)
-{
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL && index->def != NULL);
-       return index->def->key_def;
-}
-
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-       struct space *space = space_by_id(space_id);
-
-       assert(column < idx->nColumn);
-       /*
-        * If space is still under construction, or it is
-        * an ephemeral space, then fetch collation from
-        * SQL internal structure.
-        */
-       if (space == NULL) {
-               assert(column < idx->nColumn);
-               *coll_id = idx->coll_id_array[column];
-               return idx->coll_array[column];
-       }
-
-       struct key_def *key_def = sql_index_key_def(idx);
-       assert(key_def != NULL && key_def->part_count >= column);
-       *coll_id = key_def->parts[column].coll_id;
-       return key_def->parts[column].coll;
-}
-
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-       struct space *space = space_by_id(space_id);
-
-       assert(column < idx->nColumn);
-       /*
-        * If space is still under construction, or it is
-        * an ephemeral space, then fetch collation from
-        * SQL internal structure.
-        */
-       if (space == NULL) {
-               assert(column < idx->nColumn);
-               return idx->sort_order[column];
-       }
-
-       struct key_def *key_def = sql_index_key_def(idx);
-       assert(key_def != NULL && key_def->part_count >= column);
-       return key_def->parts[column].sort_order;
-}
-
 struct ExprList *
 space_checks_expr_list(uint32_t space_id)
 {
@@ -1322,17 +1264,6 @@ createTableStmt(sqlite3 * db, Table * p)
        return zStmt;
 }
 
-/* Return true if value x is found any of the first nCol entries of aiCol[]
- */
-static int
-hasColumn(const i16 * aiCol, int nCol, int x)
-{
-       while (nCol-- > 0)
-               if (x == *(aiCol++))
-                       return 1;
-       return 0;
-}
-
 /*
  * This routine runs at the end of parsing a CREATE TABLE statement.
  * The job of this routine is to convert both
@@ -1349,13 +1280,12 @@ static void
 convertToWithoutRowidTable(Parse * pParse, Table * pTab)
 {
        Index *pPk;
-       int i, j;
        sqlite3 *db = pParse->db;
 
        /* Mark every PRIMARY KEY column as NOT NULL (except for imposter tables)
         */
        if (!db->init.imposterTable) {
-               for (i = 0; i < (int)pTab->def->field_count; i++) {
+               for (uint32_t i = 0; i < pTab->def->field_count; i++) {
                        if (pTab->aCol[i].is_primkey) {
                                pTab->def->fields[i].nullable_action
                                        = ON_CONFLICT_ACTION_ABORT;
@@ -1387,20 +1317,6 @@ convertToWithoutRowidTable(Parse * pParse, Table * pTab)
                pTab->iPKey = -1;
        } else {
                pPk = sqlite3PrimaryKeyIndex(pTab);
-
-               /*
-                * Remove all redundant columns from the PRIMARY KEY.  For example, change
-                * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY KEY(a,b,c,d)".  Later
-                * code assumes the PRIMARY KEY contains no repeated columns.
-                */
-               for (i = j = 1; i < pPk->nColumn; i++) {
-                       if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
-                               pPk->nColumn--;
-                       } else {
-                               pPk->aiColumn[j++] = pPk->aiColumn[i];
-                       }
-               }
-               pPk->nColumn = j;
        }
        assert(pPk != 0);
 }
@@ -1482,7 +1398,7 @@ createIndex(Parse * pParse, Index * pIndex, int iSpaceId, int iIndexId,
        }
        sqlite3VdbeAddOp4(v,
                          OP_String8, 0, iFirstCol + 2, 0,
-                         sqlite3DbStrDup(pParse->db, pIndex->zName),
+                         sqlite3DbStrDup(pParse->db, pIndex->def->name),
                          P4_DYNAMIC);
        sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
                          P4_STATIC);
@@ -1519,7 +1435,7 @@ makeIndexSchemaRecord(Parse * pParse,
 
        sqlite3VdbeAddOp4(v,
                          OP_String8, 0, iFirstCol, 0,
-                         sqlite3DbStrDup(pParse->db, pIndex->zName),
+                         sqlite3DbStrDup(pParse->db, pIndex->def->name),
                          P4_DYNAMIC);
 
        if (pParse->pNewTable) {
@@ -2448,15 +2364,16 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
        } else {
                tnum = pIndex->tnum;
        }
-       struct key_def *def = key_def_dup(sql_index_key_def(pIndex));
+       struct key_def *def = key_def_dup(pIndex->def->key_def);
        if (def == NULL) {
                sqlite3OomFault(db);
                return;
        }
        /* Open the sorter cursor if we are to use one. */
        iSorter = pParse->nTab++;
-       sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
-                         (char *)def, P4_KEYDEF);
+       sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
+                         pIndex->def->key_def->part_count, (char *)def,
+                         P4_KEYDEF);
 
        /* Open the table. Loop through all rows of the table, inserting index
         * records into the sorter.
@@ -2487,7 +2404,8 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
                sqlite3VdbeGoto(v, j2);
                addr2 = sqlite3VdbeCurrentAddr(v);
                sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
-                                    regRecord, pIndex->nColumn);
+                                    regRecord,
+                                    pIndex->def->key_def->part_count);
                VdbeCoverage(v);
                parser_emit_unique_constraint(pParse, ON_CONFLICT_ACTION_ABORT,
                                              pIndex);
@@ -2507,44 +2425,15 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int memRootPage)
        sqlite3VdbeAddOp1(v, OP_Close, iSorter);
 }
 
-/*
- * Allocate heap space to hold an Index object with nCol columns.
- *
- * Increase the allocation size to provide an extra nExtra bytes
- * of 8-byte aligned space after the Index object and return a
- * pointer to this extra space in *ppExtra.
- */
-Index *
-sqlite3AllocateIndexObject(sqlite3 * db,       /* Database connection */
-                          i16 nCol,    /* Total number of columns in the index */
-                          int nExtra,  /* Number of bytes of extra space to alloc */
-                          char **ppExtra       /* Pointer to the "extra" space */
-    )
+struct Index *
+sql_index_alloc(struct sqlite3 *db, uint32_t part_count)
 {
-       Index *p;               /* Allocated index object */
-       int nByte;              /* Bytes of space for Index object + arrays */
-
-       nByte = ROUND8(sizeof(Index)) +             /* Index structure   */
-           ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
-           ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
-           ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
-                  sizeof(i16) * nCol +             /* Index.aiColumn    */
-                  sizeof(enum sort_order) * nCol); /* Index.sort_order  */
-       p = sqlite3DbMallocZero(db, nByte + nExtra);
-       if (p) {
-               char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
-               p->coll_array = (struct coll **)pExtra;
-               pExtra += ROUND8(sizeof(struct coll **) * nCol);
-               p->coll_id_array = (uint32_t *) pExtra;
-               pExtra += ROUND8(sizeof(uint32_t) * nCol);
-               p->aiRowLogEst = (LogEst *) pExtra;
-               pExtra += sizeof(LogEst) * (nCol + 1);
-               p->aiColumn = (i16 *) pExtra;
-               pExtra += sizeof(i16) * nCol;
-               p->sort_order = (enum sort_order *) pExtra;
-               p->nColumn = nCol;
-               *ppExtra = ((char *)p) + nByte;
-       }
+       /* Size of struct Index and aiRowLogEst. */
+       int nByte = ROUND8(sizeof(struct Index)) +
+                   ROUND8(sizeof(LogEst) * (part_count + 1));
+       struct Index *p = sqlite3DbMallocZero(db, nByte);
+       if (p != NULL)
+               p->aiRowLogEst = (LogEst *) ((char *)p + ROUND8(sizeof(*p)));
        return p;
 }
 
@@ -2631,46 +2520,187 @@ addIndexToTable(Index * pIndex, Table * pTab)
        }
 }
 
-bool
-index_is_unique(Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-       struct index *tnt_index = space_index(space, index_id);
-       assert(tnt_index != NULL);
+/**
+ * Allocate memory on parser region and copy given string (part of
+ * the sql statement) into the allocated memory.
+ * @param parse Parse context.
+ * @param str String (a part of sql statement) to be copied.
+ *
+ * @retval size Appended size.
+ */
+static int
+sql_append(struct Parse *parse, const char *str)
+{
+       const size_t str_len = strlen(str);
+       char *str_part = region_alloc(&parse->region, str_len);
+       if (str_part == NULL) {
+               diag_set(OutOfMemory, str_len, "region_alloc", "str_part");
+               parse->rc = SQL_TARANTOOL_ERROR;
+               parse->nErr++;
+               return 0;
+       }
+       memcpy(str_part, str, str_len);
+       return str_len;
+}
+
+/**
+ * Create and set index_def in the given Index.
+ *
+ * @param parse Parse context.
+ * @param index Index for which index_def should be created. It is
+ *              used only to set index_def at the end of the
+ *              function.
+ * @param table Table which is indexed by 'index' param.
+ * @param iid Index ID.
+ * @param name Index name.
+ * @param name_len Index name length.
+ * @param is_unique Is given 'index' unique or not.
+ * @param expr_list List of expressions, describe which columns
+ *                  of 'table' are used in index and also their
+ *                  collations, orders, etc.
+ * @param idx_type Index type, one of the following:
+ *                 SQLITE_IDXTYPE_APPDEF 0 (Index is created with
+ *                 CREATE INDEX statement)
+ *                 SQLITE_IDXTYPE_UNIQUE 1 (Index is created
+ *                 automatically to implement a UNIQUE constraint)
+ *                 SQLITE_IDXTYPE_PRIMARYKEY 2 (Index is a PRIMARY
+ *                 KEY).
+ */
+static void
+set_index_def(struct Parse *parse, struct Index *index, struct Table *table,
+             uint32_t iid, const char *name, uint32_t name_len, bool is_unique,
+             struct ExprList *expr_list, u8 idx_type)
+{
+       struct space_def *space_def = table->def;
+       size_t sql_size = 0;
+       struct index_opts opts;
+       index_opts_create(&opts);
+       index->def = NULL;
+       opts.is_unique = is_unique;
+
+       struct key_def *key_def = key_def_new(expr_list->nExpr);
+       if (key_def == NULL)
+               goto tnt_error;
+
+       /* Build initial parts of SQL statement.  */
+       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+               sql_size += sql_append(parse, "CREATE INDEX ");
+               sql_size += sql_append(parse, name);
+               sql_size += sql_append(parse, " ON ");
+               sql_size += sql_append(parse, space_def->name);
+               sql_size += sql_append(parse, " (");
+       }
+
+       for (int i = 0; i < expr_list->nExpr; i++) {
+               struct Expr *expr = expr_list->a[i].pExpr;
+               sql_resolve_self_reference(parse, table, NC_IdxExpr, expr, 0);
+               if (parse->nErr > 0)
+                       goto cleanup;
+
+               struct Expr *column_expr = sqlite3ExprSkipCollate(expr);
+               if (column_expr->op != TK_COLUMN) {
+                       diag_set(ClientError, ER_UNSUPPORTED, "Tarantool",
+                                "functional indexes");
+                       goto tnt_error;
+               }
+
+               uint32_t fieldno = column_expr->iColumn;
+               const char *column_name = column_expr->u.zToken;
+               uint32_t coll_id;
+               struct coll *coll;
+               if (expr->op == TK_COLLATE) {
+                       coll = sql_get_coll_seq(parse, expr->u.zToken,
+                                               &coll_id);
+                       if (coll == NULL &&
+                           strcasecmp(expr->u.zToken, "binary") != 0) {
+                               diag_set(ClientError, ER_NO_SUCH_COLLATION,
+                                        expr->u.zToken);
+                               goto tnt_error;
+                       }
+                       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+                               sql_size += sql_append(parse, column_name);
+                               sql_size += sql_append(parse, " COLLATE ");
+                               sql_size += sql_append(parse, expr->u.zToken);
+                               sql_size += sql_append(parse, ", ");
+                       }
+               } else {
+                       coll = sql_column_collation(space_def, fieldno,
+                                                   &coll_id);
+                       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+                               sql_size += sql_append(parse, column_name);
+                               sql_size += sql_append(parse, ", ");
+                       }
+               }
+               /*
+                * Tarantool: DESC indexes are not supported so
+                * far.
+                */
+               key_def_set_part(key_def, i, fieldno,
+                                space_def->fields[fieldno].type,
+                                space_def->fields[fieldno].nullable_action,
+                                coll, coll_id, SORT_ORDER_ASC);
+       }
+       if (parse->nErr > 0)
+               goto cleanup;
 
-       return tnt_index->def->opts.is_unique;
+       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+               opts.sql = region_join(&parse->region, sql_size);
+               if (opts.sql == NULL) {
+                       diag_set(OutOfMemory, sql_size, "region_join",
+                                "opts.sql");
+                       goto tnt_error;
+               }
+               /*
+                * fix last ", " with ")\0" to finish statement.
+                */
+               memcpy(&opts.sql[sql_size - 2], ")\0", 2);
+       }
+
+       struct key_def *pk_key_def;
+       if (idx_type == SQLITE_IDXTYPE_APPDEF)
+               pk_key_def = table->pIndex->def->key_def;
+       else
+               pk_key_def = NULL;
+
+       index->def = index_def_new(space_def->id, iid, name, name_len, TREE,
+                                  &opts, key_def, pk_key_def);
+       if (index->def == NULL)
+               goto tnt_error;
+cleanup:
+       if (key_def != NULL)
+               key_def_delete(key_def);
+       return;
+tnt_error:
+       parse->rc = SQL_TARANTOOL_ERROR;
+       ++parse->nErr;
+       goto cleanup;
 }
 
 void
 sql_create_index(struct Parse *parse, struct Token *token,
                 struct SrcList *tbl_name, struct ExprList *col_list,
-                int on_error, struct Token *start, struct Expr *where,
-                enum sort_order sort_order, bool if_not_exist, u8 idx_type)
-{
-       Table *pTab = 0;        /* Table to be indexed */
-       Index *pIndex = 0;      /* The index to be created */
-       char *zName = 0;        /* Name of the index */
-       int nName;              /* Number of characters in zName */
-       int i, j;
-       DbFixer sFix;           /* For assigning database names to pTable */
-       sqlite3 *db = parse->db;
-       struct ExprList_item *col_listItem;     /* For looping over col_list */
-       int nExtra = 0;         /* Space allocated for zExtra[] */
-       char *zExtra = 0;       /* Extra space after the Index object */
+                enum on_conflict_action on_error, struct Token *start,
+                struct Expr *where, enum sort_order sort_order,
+                bool if_not_exist, u8 idx_type)
+{
+       /* Table to be indexed.  */
+       struct Table *table = NULL;
+       /* The index to be created.  */
+       struct Index *index = NULL;
+       /* Name of the index.  */
+       char *name = NULL;
+       int name_len;
+       struct sqlite3 *db = parse->db;
        struct session *user_session = current_session();
 
-       if (db->mallocFailed || parse->nErr > 0) {
+       if (db->mallocFailed || parse->nErr > 0)
                goto exit_create_index;
-       }
-       /* Do not account nested operations: the count of such
-        * operations depends on Tarantool data dictionary internals,
-        * such as data layout in system spaces. Also do not account
-        * PRIMARY KEY and UNIQUE constraint - they had been accounted
-        * in CREATE TABLE already.
+       /*
+        * Do not account nested operations: the count of such
+        * operations depends on Tarantool data dictionary
+        * internals, such as data layout in system spaces. Also
+        * do not account PRIMARY KEY and UNIQUE constraint - they
+        * had been accounted in CREATE TABLE already.
         */
        if (!parse->nested && idx_type == SQLITE_IDXTYPE_APPDEF) {
                Vdbe *v = sqlite3GetVdbe(parse);
@@ -2681,39 +2711,43 @@ sql_create_index(struct Parse *parse, struct Token *token,
        assert(db->pSchema != NULL);
 
        /*
-        * Find the table that is to be indexed.  Return early if not found.
+        * Find the table that is to be indexed.
+        * Return early if not found.
         */
        if (tbl_name != NULL) {
-
-               /* Use the two-part index name to determine the database
-                * to search for the table. 'Fix' the table name to this db
-                * before looking up the table.
+               /*
+                * Use the two-part index name to determine the
+                * database to search for the table. 'Fix' the
+                * table name to this db before looking up the
+                * table.
                 */
                assert(token && token->z);
-
-               sqlite3FixInit(&sFix, parse, "index", token);
-               if (sqlite3FixSrcList(&sFix, tbl_name)) {
-                       /* Because the parser constructs tbl_name from a single identifier,
+               DbFixer db_fixer;
+               sqlite3FixInit(&db_fixer, parse, "index", token);
+               if (sqlite3FixSrcList(&db_fixer, tbl_name)) {
+                       /*
+                        * Because the parser constructs tbl_name
+                        * from a single identifier,
                         * sqlite3FixSrcList can never fail.
                         */
-                       assert(0);
+                       unreachable();
                }
-               pTab = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
-               assert(db->mallocFailed == 0 || pTab == 0);
-               if (pTab == 0)
+               table = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
+               assert(db->mallocFailed == 0 || table == NULL);
+               if (table == NULL)
                        goto exit_create_index;
-               sqlite3PrimaryKeyIndex(pTab);
+               sqlite3PrimaryKeyIndex(table);
        } else {
                assert(token == NULL);
                assert(start == NULL);
-               pTab = parse->pNewTable;
-               if (!pTab)
+               table = parse->pNewTable;
+               if (table == NULL)
                        goto exit_create_index;
        }
 
-       assert(pTab != 0);
+       assert(table != NULL);
        assert(parse->nErr == 0);
-       if (pTab->def->opts.is_view) {
+       if (table->def->opts.is_view) {
                sqlite3ErrorMsg(parse, "views may not be indexed");
                goto exit_create_index;
        }
@@ -2731,42 +2765,38 @@ sql_create_index(struct Parse *parse, struct Token *token,
         * primary key or UNIQUE constraint.  We have to invent
         * our own name.
         */
-       if (token) {
-               zName = sqlite3NameFromToken(db, token);
-               if (zName == 0)
+       if (token != NULL) {
+               name = sqlite3NameFromToken(db, token);
+               if (name == NULL)
                        goto exit_create_index;
-               assert(token->z != 0);
+               assert(token->z != NULL);
                if (!db->init.busy) {
-                       if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
+                       if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
                            NULL) {
-                               sqlite3ErrorMsg(parse,
-                                               "there is already a table named %s",
-                                               zName);
+                               sqlite3ErrorMsg(parse, "there is already a "\
+                                               "table named %s", name);
                                goto exit_create_index;
                        }
                }
-               if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
+               if (sqlite3HashFind(&table->idxHash, name) != NULL) {
                        if (!if_not_exist) {
                                sqlite3ErrorMsg(parse,
                                                "index %s.%s already exists",
-                                               pTab->def->name, zName);
+                                               table->def->name, name);
                        } else {
                                assert(!db->init.busy);
                        }
                        goto exit_create_index;
                }
        } else {
-               int n;
-               Index *pLoop;
-               for (pLoop = pTab->pIndex, n = 1; pLoop;
+               int n = 1;
+               for (struct Index *pLoop = table->pIndex; pLoop != NULL;
                     pLoop = pLoop->pNext, n++) {
                }
-               zName =
-                   sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
-                                  n);
-               if (zName == 0) {
+               name = sqlite3MPrintf(db, "sqlite_autoindex_%s_%d",
+                                     table->def->name, n);
+               if (name == NULL)
                        goto exit_create_index;
-               }
        }
 
        /*
@@ -2776,9 +2806,9 @@ sql_create_index(struct Parse *parse, struct Token *token,
         * simulate this.
         */
        if (col_list == NULL) {
-               Token prevCol;
-               uint32_t last_field = pTab->def->field_count - 1;
-               sqlite3TokenInit(&prevCol, pTab->def->fields[last_field].name);
+               struct Token prevCol;
+               uint32_t last_field = table->def->field_count - 1;
+               sqlite3TokenInit(&prevCol, table->def->fields[last_field].name);
                col_list = sql_expr_list_append(parse->db, NULL,
                                                sqlite3ExprAlloc(db, TK_ID,
                                                                 &prevCol, 0));
@@ -2790,108 +2820,93 @@ sql_create_index(struct Parse *parse, struct Token *token,
                sqlite3ExprListCheckLength(parse, col_list, "index");
        }
 
-       /* Figure out how many bytes of space are required to store explicitly
-        * specified collation sequence names.
-        */
-       for (i = 0; i < col_list->nExpr; i++) {
-               Expr *pExpr = col_list->a[i].pExpr;
-               assert(pExpr != 0);
-               if (pExpr->op == TK_COLLATE) {
-                       nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
-               }
-       }
+       /* Allocate the index structure.  */
+       name_len = sqlite3Strlen30(name);
 
-       /*
-        * Allocate the index structure.
-        */
-       nName = sqlite3Strlen30(zName);
-       pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
-                                           nName + nExtra + 1, &zExtra);
-       if (db->mallocFailed) {
+       if (name_len > BOX_NAME_MAX) {
+               sqlite3ErrorMsg(parse, "%s.%s exceeds indexes' names length "\
+                               "limit", table->def->name, name);
                goto exit_create_index;
        }
-       assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
-       assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
-       pIndex->zName = zExtra;
-       zExtra += nName + 1;
-       memcpy(pIndex->zName, zName, nName + 1);
-       pIndex->pTable = pTab;
-       pIndex->onError = (u8) on_error;
+
+       if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
+               goto exit_create_index;
+
+       index = sql_index_alloc(db, col_list->nExpr);
+       if (db->mallocFailed)
+               goto exit_create_index;
+
+       assert(EIGHT_BYTE_ALIGNMENT(index->aiRowLogEst));
+       index->pTable = table;
+       index->onError = (u8) on_error;
        /*
         * Don't make difference between UNIQUE indexes made by user
         * using CREATE INDEX statement and those created during
         * CREATE TABLE processing.
         */
        if (idx_type == SQLITE_IDXTYPE_APPDEF &&
-           on_error != ON_CONFLICT_ACTION_NONE) {
-               pIndex->idxType = SQLITE_IDXTYPE_UNIQUE;
-       } else {
-               pIndex->idxType = idx_type;
-       }
-       pIndex->pSchema = db->pSchema;
-       pIndex->nColumn = col_list->nExpr;
+           on_error != ON_CONFLICT_ACTION_NONE)
+               index->idxType = SQLITE_IDXTYPE_UNIQUE;
+       else
+               index->idxType = idx_type;
+       index->pSchema = db->pSchema;
        /* Tarantool have access to each column by any index */
-       if (where) {
-               sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
+       if (where != NULL) {
+               sql_resolve_self_reference(parse, table, NC_PartIdx, where,
                                           NULL);
-               pIndex->pPartIdxWhere = where;
+               index->pPartIdxWhere = where;
                where = NULL;
        }
 
-       /* Analyze the list of expressions that form the terms of the index and
-        * report any errors.  In the common case where the expression is exactly
-        * a table column, store that column in aiColumn[].
-        *
-        * TODO: Issue a warning if two or more columns of the index are identical.
-        * TODO: Issue a warning if the table primary key is used as part of the
-        * index key.
+       /*
+        * TODO: Issue a warning if two or more columns of the
+        * index are identical.
+        * TODO: Issue a warning if the table primary key is used
+        * as part of the index key.
         */
-       for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
-            i++, col_listItem++) {
-               Expr *pCExpr;   /* The i-th index expression */
-               sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
-                                          col_listItem->pExpr, NULL);
-               if (parse->nErr > 0)
-                       goto exit_create_index;
-               pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
-               if (pCExpr->op != TK_COLUMN) {
-                       sqlite3ErrorMsg(parse,
-                                       "functional indexes aren't supported "
-                                       "in the current version");
-                       goto exit_create_index;
-               } else {
-                       j = pCExpr->iColumn;
-                       assert(j <= 0x7fff);
-                       if (j < 0) {
-                               j = pTab->iPKey;
-                       }
-                       pIndex->aiColumn[i] = (i16) j;
-               }
-               struct coll *coll;
-               uint32_t id;
-               if (col_listItem->pExpr->op == TK_COLLATE) {
-                       const char *coll_name = col_listItem->pExpr->u.zToken;
-                       coll = sql_get_coll_seq(parse, coll_name, &id);
 
-                       if (coll == NULL &&
-                           sqlite3StrICmp(coll_name, "binary") != 0) {
-                               goto exit_create_index;
+       uint32_t max_iid = 0;
+       for (struct Index *index = table->pIndex; index != NULL;
+            index = index->pNext) {
+               max_iid = max_iid > index->def->iid ?
+                         max_iid : index->def->iid + 1;
+       }
+
+       bool is_unique = on_error != ON_CONFLICT_ACTION_NONE;
+       set_index_def(parse,  index, table, max_iid, name, name_len,
+                     is_unique, col_list, idx_type);
+
+       if (index->def == NULL)
+               goto exit_create_index;
+       /*
+        * Remove all redundant columns from the PRIMARY KEY.
+        * For example, change "PRIMARY KEY(a,b,a,b,c,b,c,d)" into
+        * just "PRIMARY KEY(a,b,c,d)". Later code assumes the
+        * PRIMARY KEY contains no repeated columns.
+        */
+       if (IsPrimaryKeyIndex(index)) {
+               struct key_part *parts = index->def->key_def->parts;
+               uint32_t part_count = index->def->key_def->part_count;
+               uint32_t new_part_count = 1;
+
+               for(uint32_t i = 1; i < part_count; i++) {
+                       uint32_t j;
+                       for(j = 0; j < new_part_count; j++) {
+                               if(parts[i].fieldno == parts[j].fieldno)
+                                       break;
                        }
-               } else if (j >= 0) {
-                       coll = sql_column_collation(pTab->def, j, &id);
-               } else {
-                       id = COLL_NONE;
-                       coll = NULL;
+
+                       if (j == new_part_count)
+                               parts[new_part_count++] = parts[i];
                }
-               pIndex->coll_array[i] = coll;
-               pIndex->coll_id_array[i] = id;
 
-               /* Tarantool: DESC indexes are not supported so far.
-                * See gh-3016.
-                */
-               pIndex->sort_order[i] = SORT_ORDER_ASC;
+               index->def->key_def->part_count = new_part_count;
        }
-       if (pTab == parse->pNewTable) {
+
+       if (!index_def_is_valid(index->def, table->def->name))
+               goto exit_create_index;
+
+       if (table == parse->pNewTable) {
                /* This routine has been called to create an automatic index as a
                 * result of a PRIMARY KEY or UNIQUE clause on a column definition, or
                 * a PRIMARY KEY or UNIQUE clause following the column definitions.
@@ -2913,28 +2928,28 @@ sql_create_index(struct Parse *parse, struct Token *token,
                 * the constraint occur in different orders, then the constraints are
                 * considered distinct and both result in separate indices.
                 */
-               Index *pIdx;
-               for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-                       int k;
+               for (struct Index *pIdx = table->pIndex; pIdx != NULL;
+                    pIdx = pIdx->pNext) {
+                       uint32_t k;
                        assert(IsUniqueIndex(pIdx));
                        assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
-                       assert(IsUniqueIndex(pIndex));
+                       assert(IsUniqueIndex(index));
 
-                       if (pIdx->nColumn != pIndex->nColumn)
+                       if (pIdx->def->key_def->part_count !=
+                           index->def->key_def->part_count)
                                continue;
-                       for (k = 0; k < pIdx->nColumn; k++) {
-                               assert(pIdx->aiColumn[k] >= 0);
-                               if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
+                       for (k = 0; k < pIdx->def->key_def->part_count; k++) {
+                               if (pIdx->def->key_def->parts[k].fieldno !=
+                                   index->def->key_def->parts[k].fieldno)
                                        break;
                                struct coll *coll1, *coll2;
-                               uint32_t id;
-                               coll1 = sql_index_collation(pIdx, k, &id);
-                               coll2 = sql_index_collation(pIndex, k, &id);
+                               coll1 = pIdx->def->key_def->parts[k].coll;
+                               coll2 = index->def->key_def->parts[k].coll;
                                if (coll1 != coll2)
                                        break;
                        }
-                       if (k == pIdx->nColumn) {
-                               if (pIdx->onError != pIndex->onError) {
+                       if (k == pIdx->def->key_def->part_count) {
+                               if (pIdx->onError != index->onError) {
                                        /* This constraint creates the same index as a previous
                                         * constraint specified somewhere in the CREATE TABLE statement.
                                         * However the ON CONFLICT clauses are different. If both this
@@ -2942,17 +2957,19 @@ sql_create_index(struct Parse *parse, struct Token *token,
                                         * ON CONFLICT clauses this is an error. Otherwise, use the
                                         * explicitly specified behavior for the index.
                                         */
-                                       if (!
-                                           (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT
-                                            || pIndex->onError ==
-                                            ON_CONFLICT_ACTION_DEFAULT)) {
+                                       if (pIdx->onError !=
+                                           ON_CONFLICT_ACTION_DEFAULT &&
+                                           index->onError !=
+                                           ON_CONFLICT_ACTION_DEFAULT) {
                                                sqlite3ErrorMsg(parse,
-                                                               "conflicting ON CONFLICT clauses specified",
-                                                               0);
-                                       }
-                                       if (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT) {
-                                               pIdx->onError = pIndex->onError;
+                                                               "conflicting "\
+                                                               "ON CONFLICT "\
+                                                               "clauses "\
+                                                               "specified");
                                        }
+                                       if (pIdx->onError ==
+                                           ON_CONFLICT_ACTION_DEFAULT)
+                                               pIdx->onError = index->onError;
                                }
                                if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
                                        pIdx->idxType = idx_type;
@@ -2966,15 +2983,16 @@ sql_create_index(struct Parse *parse, struct Token *token,
         */
        assert(parse->nErr == 0);
        if (db->init.busy) {
-               Index *p;
-               p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
-               if (p) {
-                       assert(p == pIndex);    /* Malloc must have failed */
+               struct Index *p = sqlite3HashInsert(&table->idxHash,
+                                                   index->def->name, index);
+               if (p != NULL) {
+                       /* Malloc must have failed. */
+                       assert(p == index);
                        sqlite3OomFault(db);
                        goto exit_create_index;
                }
                user_session->sql_flags |= SQLITE_InternChanges;
-               pIndex->tnum = db->init.newTnum;
+               index->tnum = db->init.newTnum;
        }
 
        /*
@@ -3025,14 +3043,14 @@ sql_create_index(struct Parse *parse, struct Token *token,
                                       ON_CONFLICT_ACTION_NONE ? "" : " UNIQUE",
                                       n, token->z);
 
-               iSpaceId = SQLITE_PAGENO_TO_SPACEID(pTab->tnum);
+               iSpaceId = SQLITE_PAGENO_TO_SPACEID(table->tnum);
                iIndexId = getNewIid(parse, iSpaceId, iCursor);
                sqlite3VdbeAddOp1(v, OP_Close, iCursor);
-               createIndex(parse, pIndex, iSpaceId, iIndexId, zStmt);
+               createIndex(parse, index, iSpaceId, iIndexId, zStmt);
 
                /* consumes zStmt */
                iFirstSchemaCol =
-                   makeIndexSchemaRecord(parse, pIndex, iSpaceId, iIndexId,
+                   makeIndexSchemaRecord(parse, index, iSpaceId, iIndexId,
                                          zStmt);
 
                /* Reparse the schema. Code an OP_Expire
@@ -3055,54 +3073,17 @@ sql_create_index(struct Parse *parse, struct Token *token,
 
        if (!db->init.busy && tbl_name != NULL)
                goto exit_create_index;
-       addIndexToTable(pIndex, pTab);
-       pIndex = NULL;
+       addIndexToTable(index, table);
+       index = NULL;
 
        /* Clean up before exiting */
  exit_create_index:
-       if (pIndex)
-               freeIndex(db, pIndex);
+       if (index != NULL)
+               freeIndex(db, index);
        sql_expr_delete(db, where, false);
        sql_expr_list_delete(db, col_list);
        sqlite3SrcListDelete(db, tbl_name);
-       sqlite3DbFree(db, zName);
-}
-
-/**
- * Return number of columns in given index.
- * If space is ephemeral, use internal
- * SQL structure to fetch the value.
- */
-uint32_t
-index_column_count(const Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       /* It is impossible to find an ephemeral space by id. */
-       if (space == NULL)
-               return idx->nColumn;
-
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL);
-       return index->def->key_def->part_count;
-}
-
-/** Return true if given index is unique and not nullable. */
-bool
-index_is_unique_not_null(const Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL);
-       return (index->def->opts.is_unique &&
-               !index->def->key_def->is_nullable);
+       sqlite3DbFree(db, name);
 }
 
 void
@@ -3728,9 +3709,9 @@ parser_emit_unique_constraint(struct Parse *parser,
        const struct space_def *def = index->pTable->def;
        StrAccum err_accum;
        sqlite3StrAccumInit(&err_accum, parser->db, 0, 0, 200);
-       for (int j = 0; j < index->nColumn; ++j) {
-               assert(index->aiColumn[j] >= 0);
-               const char *col_name = def->fields[index->aiColumn[j]].name;
+       struct key_part *part = index->def->key_def->parts;
+       for (uint32_t j = 0; j < index->def->key_def->part_count; ++j, ++part) {
+               const char *col_name = def->fields[part->fieldno].name;
                if (j != 0)
                        sqlite3StrAccumAppend(&err_accum, ", ", 2);
                sqlite3XPrintf(&err_accum, "%s.%s", def->name, col_name);
@@ -3751,11 +3732,11 @@ static bool
 collationMatch(struct coll *coll, struct Index *index)
 {
        assert(coll != NULL);
-       for (int i = 0; i < index->nColumn; i++) {
-               uint32_t id;
-               struct coll *idx_coll = sql_index_collation(index, i, &id);
-               assert(idx_coll != 0 || index->aiColumn[i] < 0);
-               if (index->aiColumn[i] >= 0 && coll == idx_coll)
+       struct key_part *part = index->def->key_def->parts;
+       for (uint32_t i = 0; i < index->def->key_def->part_count; i++, part++) {
+               struct coll *idx_coll = part->coll;
+               assert(idx_coll != NULL);
+               if (coll == idx_coll)
                        return true;
        }
        return false;
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 5a799714d..5a7cf7652 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -268,11 +268,12 @@ sql_table_delete_from(struct Parse *parse, struct SrcList *tab_list,
 
                /* Extract the primary key for the current row */
                if (!is_view) {
-                       for (int i = 0; i < pk_len; i++) {
+                       struct key_part *part = pk_def->parts;
+                       for (int i = 0; i < pk_len; i++, part++) {
                                struct space_def *def = space->def;
                                sqlite3ExprCodeGetColumnOfTable(v, def,
                                                                tab_cursor,
-                                                               pk_def->parts[i].fieldno,
+                                                               part->fieldno,
                                                                reg_pk + i);
                        }
                } else {
@@ -568,13 +569,14 @@ sql_generate_index_key(struct Parse *parse, struct Index *index, int cursor,
                        *part_idx_label = 0;
                }
        }
-       int col_cnt = index_column_count(index);
+       int col_cnt = index->def->key_def->part_count;
        int reg_base = sqlite3GetTempRange(parse, col_cnt);
        if (prev != NULL && (reg_base != reg_prev ||
                             prev->pPartIdxWhere != NULL))
                prev = NULL;
        for (int j = 0; j < col_cnt; j++) {
-               if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]) {
+               if (prev != NULL && prev->def->key_def->parts[j].fieldno ==
+                                   index->def->key_def->parts[j].fieldno) {
                        /*
                         * This column was already computed by the
                         * previous index.
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 70e134f21..00d8dedd8 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2405,21 +2405,28 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing context */
                             pIdx = pIdx->pNext) {
                                Bitmask colUsed; /* Columns of the index used */
                                Bitmask mCol;   /* Mask for the current column */
-                               if (pIdx->nColumn < nExpr)
+                               uint32_t part_count =
+                                       pIdx->def->key_def->part_count;
+                               struct key_part *parts =
+                                       pIdx->def->key_def->parts;
+                               if ((int)part_count < nExpr)
                                        continue;
                                /* Maximum nColumn is BMS-2, not BMS-1, so that we can compute
                                 * BITMASK(nExpr) without overflowing
                                 */
-                               testcase(pIdx->nColumn == BMS - 2);
-                               testcase(pIdx->nColumn == BMS - 1);
-                               if (pIdx->nColumn >= BMS - 1)
+                               testcase(part_count == BMS - 2);
+                               testcase(part_count == BMS - 1);
+                               if (part_count >= BMS - 1)
+                                       continue;
+                               if (mustBeUnique &&
+                                   ((int)part_count > nExpr ||
+                                    !pIdx->def->opts.is_unique)) {
+                                       /*
+                                        * This index is not
+                                        * unique over the IN RHS
+                                        * columns.
+                                        */
                                        continue;
-                               if (mustBeUnique) {
-                                       if (pIdx->nColumn > nExpr
-                                           || (pIdx->nColumn > nExpr
-                                           && !index_is_unique(pIdx))) {
-                                                       continue;       /* This index is not unique over the IN RHS columns */
-                                       }
                                }
 
                                colUsed = 0;    /* Columns of index used so far */
@@ -2432,16 +2439,15 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing context */
                                        int j;
 
                                        for (j = 0; j < nExpr; j++) {
-                                               if (pIdx->aiColumn[j] !=
-                                                   pRhs->iColumn) {
+                                               if ((int) parts[j].fieldno !=
+                                                   pRhs->iColumn)
                                                        continue;
-                                               }
-                                               struct coll *idx_coll;
-                                               idx_coll = sql_index_collation(pIdx, j, &id);
+
+                                               struct coll *idx_coll =
+                                                            parts[j].coll;
                                                if (pReq != NULL &&
-                                                   pReq != idx_coll) {
+                                                   pReq != idx_coll)
                                                        continue;
-                                               }
                                                break;
                                        }
                                        if (j == nExpr)
@@ -2466,18 +2472,17 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing context */
                                                          0, 0, 0,
                                                          sqlite3MPrintf(db,
                                                          "USING INDEX %s FOR IN-OPERATOR",
-                                                         pIdx->zName),
+                                                         pIdx->def->name),
                                                          P4_DYNAMIC);
                                        struct space *space =
                                                space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
                                        vdbe_emit_open_cursor(pParse, iTab,
                                                              pIdx->tnum, space);
-                                       VdbeComment((v, "%s", pIdx->zName));
+                                       VdbeComment((v, "%s", pIdx->def->name));
                                        assert(IN_INDEX_INDEX_DESC ==
                                               IN_INDEX_INDEX_ASC + 1);
                                        eType = IN_INDEX_INDEX_ASC +
-                                               sql_index_column_sort_order(pIdx,
-                                                                           0);
+                                               parts[0].sort_order;
 
                                        if (prRhsHasNull) {
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -2499,7 +2504,7 @@ sqlite3FindInIndex(Parse * pParse,        /* Parsing context */
                                                        /* Tarantool: Check for null is performed on first key of the index.  */
                                                        sqlite3SetHasNullFlag(v,
                                                                              iTab,
-                                                                             pIdx->aiColumn[0],
+                                                                             parts[0].fieldno,
                                                                              *prRhsHasNull);
                                                }
                                        }
@@ -3130,12 +3135,12 @@ sqlite3ExprCodeIN(Parse * pParse,       /* Parsing and code generating context */
                struct Index *pk = sqlite3PrimaryKeyIndex(tab);
                assert(pk);
 
+               uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
                enum affinity_type affinity =
-                       tab->def->fields[pk->aiColumn[0]].affinity;
-               if (pk->nColumn == 1
-                   && affinity == AFFINITY_INTEGER
-                   && pk->aiColumn[0] < nVector) {
-                       int reg_pk = rLhs + pk->aiColumn[0];
+                       tab->def->fields[fieldno].affinity;
+               if (pk->def->key_def->part_count == 1 &&
+                   affinity == AFFINITY_INTEGER && (int)fieldno < nVector) {
+                       int reg_pk = rLhs + (int)fieldno;
                        sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
                }
        }
@@ -3467,7 +3472,7 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,    /* The parsing context */
                               int regOut       /* Store the index column value in this register */
     )
 {
-       i16 iTabCol = pIdx->aiColumn[iIdxCol];
+       i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
        sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
                                        iTabCur, iTabCol, regOut);
 }
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 6c75c4772..2b96bde7e 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -213,7 +213,6 @@ sqlite3FkLocateIndex(Parse * pParse,        /* Parse context to store any error in */
                     int **paiCol       /* OUT: Map of index columns in pFKey */
     )
 {
-       Index *pIdx = 0;        /* Value to return via *ppIdx */
        int *aiCol = 0;         /* Value to return via *paiCol */
        int nCol = pFKey->nCol;      /* Number of columns in parent key */
        char *zKey = pFKey->aCol[0].zCol;    /* Name of left-most parent key column */
@@ -255,83 +254,86 @@ sqlite3FkLocateIndex(Parse * pParse,      /* Parse context to store any error in */
                *paiCol = aiCol;
        }
 
-       for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
-               int nIdxCol = index_column_count(pIdx);
-               if (nIdxCol == nCol && index_is_unique(pIdx)
-                   && pIdx->pPartIdxWhere == 0) {
-                       /* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the right number
-                        * of columns. If each indexed column corresponds to a foreign key
-                        * column of pFKey, then this index is a winner.
+       struct Index *index = NULL;
+       for (index = pParent->pIndex; index != NULL; index = index->pNext) {
+               int part_count = index->def->key_def->part_count;
+               if (part_count != nCol || !index->def->opts.is_unique ||
+                   index->pPartIdxWhere != NULL)
+                       continue;
+               /*
+                * Index is a UNIQUE index (or a PRIMARY KEY) and
+                * has the right number of columns. If each
+                * indexed column corresponds to a foreign key
+                * column of pFKey, then this index is a winner.
+                */
+               if (zKey == NULL) {
+                       /*
+                        * If zKey is NULL, then this foreign key
+                        * is implicitly mapped to the PRIMARY KEY
+                        * of table pParent. The PRIMARY KEY index
+                        * may be identified by the test.
                         */
-
-                       if (zKey == 0) {
-                               /* If zKey is NULL, then this foreign key is implicitly mapped to
-                                * the PRIMARY KEY of table pParent. The PRIMARY KEY index may be
-                                * identified by the test.
-                                */
-                               if (IsPrimaryKeyIndex(pIdx)) {
-                                       if (aiCol) {
-                                               int i;
-                                               for (i = 0; i < nCol; i++)
-                                                       aiCol[i] =
-                                                           pFKey->aCol[i].
-                                                           iFrom;
-                                       }
-                                       break;
+                       if (IsPrimaryKeyIndex(index)) {
+                               if (aiCol != NULL) {
+                                       for (int i = 0; i < nCol; i++)
+                                               aiCol[i] = pFKey->aCol[i].iFrom;
                                }
-                       } else {
-                               /* If zKey is non-NULL, then this foreign key was declared to
-                                * map to an explicit list of columns in table pParent. Check if this
-                                * index matches those columns. Also, check that the index uses
-                                * the default collation sequences for each column.
+                               break;
+                       }
+               } else {
+                       /*
+                        * If zKey is non-NULL, then this foreign
+                        * key was declared to map to an explicit
+                        * list of columns in table pParent. Check
+                        * if this index matches those columns.
+                        * Also, check that the index uses the
+                        * default collation sequences for each
+                        * column.
+                        */
+                       int i, j;
+                       struct key_part *part = index->def->key_def->parts;
+                       for (i = 0; i < nCol; i++, part++) {
+                               /*
+                                * Index of column in parent
+                                * table.
                                 */
-                               int i, j;
-                               for (i = 0; i < nCol; i++) {
-                                       i16 iCol = pIdx->aiColumn[i];        /* Index of column in parent tbl */
-                                       char *zIdxCol;  /* Name of indexed column */
-
-                                       if (iCol < 0)
-                                               break;  /* No foreign keys against expression indexes */
-
-                                       /* If the index uses a collation sequence that is different from
-                                        * the default collation sequence for the column, this index is
-                                        * unusable. Bail out early in this case.
-                                        */
-                                       struct coll *def_coll;
-                                       uint32_t id;
-                                       def_coll = sql_column_collation(pParent->def,
-                                                                       iCol,
-                                                                       &id);
-                                       struct coll *coll =
-                                               sql_index_collation(pIdx, i,
-                                                                   &id);
-                                       if (def_coll != coll)
-                                               break;
-
-                                       zIdxCol =
-                                               pParent->def->fields[iCol].name;
-                                       for (j = 0; j < nCol; j++) {
-                                               if (strcmp
-                                                   (pFKey->aCol[j].zCol,
-                                                    zIdxCol) == 0) {
-                                                       if (aiCol)
-                                                               aiCol[i] =
-                                                                   pFKey->
-                                                                   aCol[j].
-                                                                   iFrom;
-                                                       break;
-                                               }
-                                       }
-                                       if (j == nCol)
-                                               break;
+                               i16 iCol = (int) part->fieldno;
+                               /*
+                                * If the index uses a collation
+                                * sequence that is different from
+                                * the default collation sequence
+                                * for the column, this index is
+                                * unusable. Bail out early in
+                                * this case.
+                                */
+                               uint32_t id;
+                               struct coll *def_coll =
+                                       sql_column_collation(pParent->def,
+                                                            iCol, &id);
+                               struct coll *coll = part->coll;
+                               if (def_coll != coll)
+                                       break;
+
+                               char *zIdxCol = pParent->def->fields[iCol].name;
+                               for (j = 0; j < nCol; j++) {
+                                       if (strcmp(pFKey->aCol[j].zCol,
+                                                  zIdxCol) != 0)
+                                               continue;
+                                       if (aiCol)
+                                               aiCol[i] = pFKey->aCol[j].iFrom;
+                                       break;
                                }
-                               if (i == nCol)
-                                       break;  /* pIdx is usable */
+                               if (j == nCol)
+                                       break;
+                       }
+                       if (i == nCol) {
+                               /* Index is usable. */
+                               break;
                        }
                }
        }
 
-       if (!pIdx) {
+       if (index == NULL) {
                if (!pParse->disableTriggers) {
                        sqlite3ErrorMsg(pParse,
                                        "foreign key mismatch - \"%w\" referencing \"%w\"",
@@ -341,7 +343,7 @@ sqlite3FkLocateIndex(Parse * pParse,        /* Parse context to store any error in */
                return 1;
        }
 
-       *ppIdx = pIdx;
+       *ppIdx = index;
        return 0;
 }
 
@@ -460,17 +462,19 @@ fkLookupParent(Parse * pParse,    /* Parse context */
                         */
                        if (pTab == pFKey->pFrom && nIncr == 1) {
                                int iJump =
-                                   sqlite3VdbeCurrentAddr(v) + nCol + 1;
-                               for (i = 0; i < nCol; i++) {
+                                       sqlite3VdbeCurrentAddr(v) + nCol + 1;
+                               struct key_part *part =
+                                       pIdx->def->key_def->parts;
+                               for (i = 0; i < nCol; ++i, ++part) {
                                        int iChild = aiCol[i] + 1 + regData;
-                                       int iParent =
-                                           pIdx->aiColumn[i] + 1 + regData;
-                                       assert(pIdx->aiColumn[i] >= 0);
+                                       int iParent = 1 + regData +
+                                                     (int)part->fieldno;
                                        assert(aiCol[i] != pTab->iPKey);
-                                       if (pIdx->aiColumn[i] == pTab->iPKey) {
+                                       if ((int)part->fieldno == pTab->iPKey) {
                                                /* The parent key is a composite key that includes the IPK column */
                                                iParent = regData;
                                        }
+
                                        sqlite3VdbeAddOp3(v, OP_Ne, iChild,
                                                          iJump, iParent);
                                        VdbeCoverage(v);
@@ -614,7 +618,6 @@ fkScanChildren(Parse * pParse,      /* Parse context */
     )
 {
        sqlite3 *db = pParse->db;    /* Database handle */
-       int i;                  /* Iterator variable */
        Expr *pWhere = 0;       /* WHERE clause to scan with */
        NameContext sNameContext;       /* Context used to resolve WHERE clause */
        WhereInfo *pWInfo;      /* Context used by sqlite3WhereXXX() */
@@ -622,7 +625,7 @@ fkScanChildren(Parse * pParse,      /* Parse context */
        Vdbe *v = sqlite3GetVdbe(pParse);
 
        assert(pIdx == 0 || pIdx->pTable == pTab);
-       assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
+       assert(pIdx == 0 || (int) pIdx->def->key_def->part_count == pFKey->nCol);
        assert(pIdx != 0);
 
        if (nIncr < 0) {
@@ -639,19 +642,20 @@ fkScanChildren(Parse * pParse,    /* Parse context */
         * the parent key columns. The affinity of the parent key column should
         * be applied to each child key value before the comparison takes place.
         */
-       for (i = 0; i < pFKey->nCol; i++) {
+       for (int i = 0; i < pFKey->nCol; i++) {
                Expr *pLeft;    /* Value from parent table row */
                Expr *pRight;   /* Column ref to child table */
                Expr *pEq;      /* Expression (pLeft = pRight) */
                i16 iCol;       /* Index of column in child table */
-               const char *zCol;       /* Name of column in child table */
+               const char *column_name;
 
-               iCol = pIdx ? pIdx->aiColumn[i] : -1;
+               iCol = pIdx != NULL ?
+                      (int) pIdx->def->key_def->parts[i].fieldno : -1;
                pLeft = exprTableRegister(pParse, pTab, regData, iCol);
                iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
                assert(iCol >= 0);
-               zCol = pFKey->pFrom->def->fields[iCol].name;
-               pRight = sqlite3Expr(db, TK_ID, zCol);
+               column_name = pFKey->pFrom->def->fields[iCol].name;
+               pRight = sqlite3Expr(db, TK_ID, column_name);
                pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
                pWhere = sqlite3ExprAnd(db, pWhere, pEq);
        }
@@ -670,15 +674,14 @@ fkScanChildren(Parse * pParse,    /* Parse context */
 
                Expr *pEq, *pAll = 0;
                Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-               assert(pIdx != 0);
-               int col_count = index_column_count(pPk);
-               for (i = 0; i < col_count; i++) {
-                       i16 iCol = pIdx->aiColumn[i];
-                       assert(iCol >= 0);
-                       pLeft = exprTableRegister(pParse, pTab, regData, iCol);
-                       pRight =
-                               exprTableColumn(db, pTab->def,
-                                               pSrc->a[0].iCursor, iCol);
+               assert(pIdx != NULL);
+               uint32_t part_count = pPk->def->key_def->part_count;
+               for (uint32_t i = 0; i < part_count; i++) {
+                       uint32_t fieldno = pIdx->def->key_def->parts[i].fieldno;
+                       pLeft = exprTableRegister(pParse, pTab, regData,
+                                                 fieldno);
+                       pRight = exprTableColumn(db, pTab->def,
+                                                pSrc->a[0].iCursor, fieldno);
                        pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
                        pAll = sqlite3ExprAnd(db, pAll, pEq);
                }
@@ -983,7 +986,6 @@ sqlite3FkCheck(Parse * pParse,      /* Parse context */
                        if (aiCol[i] == pTab->iPKey) {
                                aiCol[i] = -1;
                        }
-                       assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
                }
 
                pParse->nTab++;
@@ -1108,19 +1110,19 @@ sqlite3FkOldmask(Parse * pParse,        /* Parse context */
 
        if (user_session->sql_flags & SQLITE_ForeignKeys) {
                FKey *p;
-               int i;
                for (p = pTab->pFKey; p; p = p->pNextFrom) {
-                       for (i = 0; i < p->nCol; i++)
+                       for (int i = 0; i < p->nCol; i++)
                                mask |= COLUMN_MASK(p->aCol[i].iFrom);
                }
                for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
                        Index *pIdx = 0;
                        sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
-                       if (pIdx) {
-                               int nIdxCol = index_column_count(pIdx);
-                               for (i = 0; i < nIdxCol; i++) {
-                                       assert(pIdx->aiColumn[i] >= 0);
-                                       mask |= COLUMN_MASK(pIdx->aiColumn[i]);
+                       if (pIdx != NULL) {
+                               uint32_t part_count =
+                                       pIdx->def->key_def->part_count;
+                               for (uint32_t i = 0; i < part_count; i++) {
+                                       mask |= COLUMN_MASK(pIdx->def->
+                                               key_def->parts[i].fieldno);
                                }
                        }
                }
@@ -1264,11 +1266,12 @@ fkActionTrigger(struct Parse *pParse, struct Table *pTab, struct FKey *pFKey,
                               || (pTab->iPKey >= 0
                                   && pTab->iPKey <
                                      (int)pTab->def->field_count));
-                       assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
+
+                       uint32_t fieldno = pIdx != NULL ?
+                                          pIdx->def->key_def->parts[i].fieldno :
+                                          (uint32_t)pTab->iPKey;
                        sqlite3TokenInit(&tToCol,
-                                        pTab->def->fields[pIdx ? pIdx->
-                                                   aiColumn[i] : pTab->iPKey].
-                                        name);
+                                        pTab->def->fields[fieldno].name);
                        sqlite3TokenInit(&tFromCol,
                                         pFKey->pFrom->def->fields[
                                                iFromCol].name);
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index c12043bde..762caaee5 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -89,14 +89,14 @@ sqlite3IndexAffinityStr(sqlite3 *db, Index *index)
         * sqliteDeleteIndex() when the Index structure itself is
         * cleaned up.
         */
-       int column_count = index_column_count(index);
+       int column_count = index->def->key_def->part_count;
        index->zColAff = (char *) sqlite3DbMallocRaw(0, column_count + 1);
        if (index->zColAff == NULL) {
                sqlite3OomFault(db);
                return NULL;
        }
        for (int n = 0; n < column_count; n++) {
-               uint16_t x = index->aiColumn[n];
+               uint16_t x = index->def->key_def->parts[n].fieldno;
                index->zColAff[n] = index->pTable->def->fields[x].affinity;
        }
        index->zColAff[column_count] = 0;
@@ -647,7 +647,7 @@ sqlite3Insert(Parse * pParse,       /* Parser context */
                     pIdx = pIdx->pNext, i++) {
                        assert(pIdx);
                        aRegIdx[i] = ++pParse->nMem;
-                       pParse->nMem += index_column_count(pIdx);
+                       pParse->nMem += pIdx->def->key_def->part_count;
                }
        }
 
@@ -1069,12 +1069,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,          /* The parser context */
        Index *pIdx;            /* Pointer to one of the indices */
        Index *pPk = 0;         /* The PRIMARY KEY index */
        sqlite3 *db;            /* Database connection */
-       int i;                  /* loop counter */
-       int ix;                 /* Index loop counter */
-       int nCol;               /* Number of columns */
        int addr1;              /* Address of jump instruction */
        int seenReplace = 0;    /* True if REPLACE is used to resolve INT PK conflict */
-       int nPkField;           /* Number of fields in PRIMARY KEY. */
        u8 isUpdate;            /* True if this is an UPDATE operation */
        u8 bAffinityDone = 0;   /* True if the OP_Affinity operation has been run */
        struct session *user_session = current_session();
@@ -1086,10 +1082,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,          /* The parser context */
        struct space_def *def = pTab->def;
        /* This table is not a VIEW */
        assert(!def->opts.is_view);
-       nCol = def->field_count;
 
        pPk = sqlite3PrimaryKeyIndex(pTab);
-       nPkField = index_column_count(pPk);
 
        /* Record that this module has started */
        VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
@@ -1099,17 +1093,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
        enum on_conflict_action on_error;
        /* Test all NOT NULL constraints.
         */
-       for (i = 0; i < nCol; i++) {
-               if (i == pTab->iPKey) {
+       for (uint32_t i = 0; i < def->field_count; i++) {
+               if ((int) i == pTab->iPKey)
                        continue;
-               }
                if (aiChng && aiChng[i] < 0) {
                        /* Don't bother checking for NOT NULL on columns that do not change */
                        continue;
                }
                if (def->fields[i].is_nullable ||
                    (pTab->tabFlags & TF_Autoincrement &&
-                    pTab->iAutoIncPKey == i))
+                    pTab->iAutoIncPKey == (int) i))
                        continue;       /* This column is allowed to be NULL */
 
                on_error = table_column_nullable_action(pTab, i);
@@ -1179,7 +1172,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           /* The parser context */
                else
                        on_error = ON_CONFLICT_ACTION_ABORT;
 
-               for (i = 0; i < checks->nExpr; i++) {
+               for (int i = 0; i < checks->nExpr; i++) {
                        int allOk;
                        Expr *pExpr = checks->a[i].pExpr;
                        if (aiChng
@@ -1206,13 +1199,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
                }
        }
 
-       /* Test all UNIQUE constraints by creating entries for each UNIQUE
-        * index and making sure that duplicate entries do not already exist.
-        * Compute the revised record entries for indices as we go.
+       /*
+        * Test all UNIQUE constraints by creating entries for
+        * each UNIQUE index and making sure that duplicate entries
+        * do not already exist. Compute the revised record entries
+        * for indices as we go.
         *
         * This loop also handles the case of the PRIMARY KEY index.
         */
-       for (ix = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, ix++) {
+       pIdx = pTab->pIndex;
+       for (int ix = 0; pIdx != NULL; pIdx = pIdx->pNext, ix++) {
                int regIdx;     /* Range of registers hold conent for pIdx */
                int regR;       /* Range of registers holding conflicting PK */
                int iThisCur;   /* Cursor for this UNIQUE index */
@@ -1253,10 +1249,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
                 * the insert or update.  Store that record in the aRegIdx[ix] register
                 */
                regIdx = aRegIdx[ix] + 1;
-               int nIdxCol = (int) index_column_count(pIdx);
+               uint32_t part_count = pIdx->def->key_def->part_count;
                if (uniqueByteCodeNeeded) {
-                       for (i = 0; i < nIdxCol; ++i) {
-                               int fieldno = pIdx->aiColumn[i];
+                       for (uint32_t i = 0; i < part_count; ++i) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[i].fieldno;
                                int reg;
                                /*
                                 * OP_SCopy copies value in
@@ -1267,11 +1264,10 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
                                 * needed for proper UNIQUE
                                 * constraint handling.
                                 */
-                               if (fieldno == pTab->iPKey)
+                               if ((int) fieldno == pTab->iPKey)
                                        reg = regNewData;
                                else
                                        reg = fieldno + regNewData + 1;
-                               assert(fieldno >= 0);
                                sqlite3VdbeAddOp2(v, OP_SCopy, reg, regIdx + i);
                                VdbeComment((v, "%s",
                                            def->fields[fieldno].name));
@@ -1283,9 +1279,12 @@ sqlite3GenerateConstraintChecks(Parse * pParse,          /* The parser context */
                if (IsPrimaryKeyIndex(pIdx)) {
                        /* If PK is marked as INTEGER, use it as strict type,
                         * not as affinity. Emit code for type checking */
-                       if (nIdxCol == 1) {
-                               reg_pk = regNewData + 1 + pIdx->aiColumn[0];
-                               if (pTab->zColAff[pIdx->aiColumn[0]] ==
+                       if (part_count == 1) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[0].fieldno;
+                               reg_pk = regNewData + 1 + fieldno;
+
+                               if (pTab->zColAff[fieldno] ==
                                    AFFINITY_INTEGER) {
                                        int skip_if_null = sqlite3VdbeMakeLabel(v);
                                        if ((pTab->tabFlags & TF_Autoincrement) != 0) {
@@ -1303,7 +1302,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           /* The parser context */
 
                        sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
                                          def->field_count, aRegIdx[ix]);
-                       VdbeComment((v, "for %s", pIdx->zName));
+                       VdbeComment((v, "for %s", pIdx->def->name));
                }
 
                /* In an UPDATE operation, if this index is the PRIMARY KEY
@@ -1390,24 +1389,22 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
                if (uniqueByteCodeNeeded) {
                        sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
                                             addrUniqueOk, regIdx,
-                                            index_column_count(pIdx));
+                                            pIdx->def->key_def->part_count);
                }
                VdbeCoverage(v);
 
+               const uint32_t pk_part_count = pPk->def->key_def->part_count;
                /* Generate code to handle collisions */
-               regR =
-                   (pIdx == pPk) ? regIdx : sqlite3GetTempRange(pParse,
-                                                                nPkField);
+               regR = pIdx == pPk ? regIdx :
+                      sqlite3GetTempRange(pParse, pk_part_count);
                if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
                        int x;
-                       int nPkCol = index_column_count(pPk);
                        /* Extract the PRIMARY KEY from the end of the index entry and
                         * store it in registers regR..regR+nPk-1
                         */
                        if (pIdx != pPk) {
-                               for (i = 0; i < nPkCol; i++) {
-                                       assert(pPk->aiColumn[i] >= 0);
-                                       x = pPk->aiColumn[i];
+                               for (uint32_t i = 0; i < pk_part_count; i++) {
+                                       x = pPk->def->key_def->parts[i].fieldno;
                                        sqlite3VdbeAddOp3(v, OP_Column,
                                                          iThisCur, x, regR + i);
                                        VdbeComment((v, "%s.%s", def->name,
@@ -1422,22 +1419,25 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         /* The parser context */
                                 * of the matched index row are different from the original PRIMARY
                                 * KEY values of this row before the update.
                                 */
-                               int addrJump =
-                                       sqlite3VdbeCurrentAddr(v) + nPkCol;
+                               int addrJump = sqlite3VdbeCurrentAddr(v) +
+                                              pk_part_count;
                                int op = OP_Ne;
-                               int regCmp = (IsPrimaryKeyIndex(pIdx) ?
-                                             regIdx : regR);
-
-                               for (i = 0; i < nPkCol; i++) {
-                                       uint32_t id;
-                                       char *p4 = (char *)sql_index_collation(pPk, i, &id);
-                                       x = pPk->aiColumn[i];
-                                       assert(x >= 0);
-                                       if (i == (nPkCol - 1)) {
+                               int regCmp = IsPrimaryKeyIndex(pIdx) ?
+                                            regIdx : regR;
+                               struct key_part *part =
+                                       pPk->def->key_def->parts;
+                               for (uint32_t i = 0; i < pk_part_count;
+                                    ++i, ++part) {
+                                       char *p4 = (char *) part->coll;
+                                       x = part->fieldno;
+                                       if (pPk->tnum==0)
+                                               x = -1;
+                                       if (i == (pk_part_count - 1)) {
                                                addrJump = addrUniqueOk;
                                                op = OP_Eq;
                                        }
-                                       sqlite3VdbeAddOp4(v, op, regOldData + 1 + x,
+                                       sqlite3VdbeAddOp4(v, op,
+                                                         regOldData + 1 + x,
                                                          addrJump, regCmp + i,
                                                          p4, P4_COLLSEQ);
                                        sqlite3VdbeChangeP5(v, SQLITE_NOTNULL);
@@ -1480,7 +1480,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           /* The parser context */
                                                              NULL, NULL);
                        }
                        sql_generate_row_delete(pParse, pTab, trigger,
-                                               iDataCur, regR, nPkField, false,
+                                               iDataCur, regR, pk_part_count,
+                                               false,
                                                ON_CONFLICT_ACTION_REPLACE,
                                                pIdx == pPk ? ONEPASS_SINGLE :
                                                ONEPASS_OFF, -1);
@@ -1490,7 +1491,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           /* The parser context */
                }
                sqlite3VdbeResolveLabel(v, addrUniqueOk);
                if (regR != regIdx)
-                       sqlite3ReleaseTempRange(pParse, regR, nPkField);
+                       sqlite3ReleaseTempRange(pParse, regR, pk_part_count);
        }
 
        *pbMayReplace = seenReplace;
@@ -1608,8 +1609,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,        /* Parsing context */
                    IsPrimaryKeyIndex(pIdx) ||          /* Condition 2 */
                    sqlite3FkReferences(pTab) ||        /* Condition 3 */
                    /* Condition 4 */
-                   (index_is_unique(pIdx) && pIdx->onError !=
-                    ON_CONFLICT_ACTION_DEFAULT &&
+                   (pIdx->def->opts.is_unique &&
+                    pIdx->onError != ON_CONFLICT_ACTION_DEFAULT &&
                     /* Condition 4.1 */
                     pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
                     /* Condition 4.2 */
@@ -1627,7 +1628,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,        /* Parsing context */
                                                  space_ptr_reg);
                                sql_vdbe_set_p4_key_def(pParse, pIdx);
                                sqlite3VdbeChangeP5(v, p5);
-                               VdbeComment((v, "%s", pIdx->zName));
+                               VdbeComment((v, "%s", pIdx->def->name));
                        }
                }
        }
@@ -1661,30 +1662,25 @@ int sqlite3_xferopt_count;
 static int
 xferCompatibleIndex(Index * pDest, Index * pSrc)
 {
-       uint32_t i;
        assert(pDest && pSrc);
        assert(pDest->pTable != pSrc->pTable);
-       uint32_t nDestCol = index_column_count(pDest);
-       uint32_t nSrcCol = index_column_count(pSrc);
-       if (nDestCol != nSrcCol) {
-               return 0;       /* Different number of columns */
-       }
+       uint32_t dest_idx_part_count = pDest->def->key_def->part_count;
+       uint32_t src_idx_part_count = pSrc->def->key_def->part_count;
+       if (dest_idx_part_count != src_idx_part_count)
+               return 0;
        if (pDest->onError != pSrc->onError) {
                return 0;       /* Different conflict resolution strategies */
        }
-       for (i = 0; i < nSrcCol; i++) {
-               if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
+       struct key_part *src_part = pSrc->def->key_def->parts;
+       struct key_part *dest_part = pDest->def->key_def->parts;
+       for (uint32_t i = 0; i < src_idx_part_count;
+            ++i, ++src_part, ++dest_part) {
+               if (src_part->fieldno != dest_part->fieldno)
                        return 0;       /* Different columns indexed */
-               }
-               if (sql_index_column_sort_order(pSrc, i) !=
-                   sql_index_column_sort_order(pDest, i)) {
+               if (src_part->sort_order != dest_part->sort_order)
                        return 0;       /* Different sort orders */
-               }
-               uint32_t id;
-               if (sql_index_collation(pSrc, i, &id) !=
-                   sql_index_collation(pDest, i, &id)) {
+               if (src_part->coll != dest_part->coll)
                        return 0;       /* Different collating sequences */
-               }
        }
        if (sqlite3ExprCompare(pSrc->pPartIdxWhere, pDest->pPartIdxWhere, -1)) {
                return 0;       /* Different WHERE clauses */
@@ -1856,16 +1852,15 @@ xferOptimization(Parse * pParse,        /* Parser context */
                }
        }
        for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
-               if (index_is_unique(pDestIdx)) {
+               if (pDestIdx->def->opts.is_unique)
                        destHasUniqueIdx = 1;
-               }
                for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) {
                        if (xferCompatibleIndex(pDestIdx, pSrcIdx))
                                break;
                }
-               if (pSrcIdx == 0) {
-                       return 0;       /* pDestIdx has no corresponding index in pSrc */
-               }
+               /* pDestIdx has no corresponding index in pSrc. */
+               if (pSrcIdx == NULL)
+                       return 0;
        }
        /* Get server checks. */
        ExprList *pCheck_src = space_checks_expr_list(
@@ -1941,11 +1936,11 @@ xferOptimization(Parse * pParse,        /* Parser context */
                struct space *src_space =
                        space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
                vdbe_emit_open_cursor(pParse, iSrc, pSrcIdx->tnum, src_space);
-               VdbeComment((v, "%s", pSrcIdx->zName));
+               VdbeComment((v, "%s", pSrcIdx->def->name));
                struct space *dest_space =
                        space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
                vdbe_emit_open_cursor(pParse, iDest, pDestIdx->tnum, dest_space);
-               VdbeComment((v, "%s", pDestIdx->zName));
+               VdbeComment((v, "%s", pDestIdx->def->name));
                addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
                VdbeCoverage(v);
                sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index be6a01c23..db0b9a482 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -369,9 +369,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,  /* First part of [schema.]id field */
                        } else if (pk == NULL) {
                                k = 1;
                        } else {
-                               for (k = 1; k <= def->field_count &&
-                                    pk->aiColumn[k - 1] != (int) i; ++k) {
-                               }
+                               struct key_def *kdef = pk->def->key_def;
+                               k = key_def_find(kdef, i) - kdef->parts + 1;
                        }
                        bool is_nullable = def->fields[i].is_nullable;
                        char *expr_str = def->fields[i].default_value;
@@ -414,7 +413,7 @@ sqlite3Pragma(Parse * pParse, Token * pId,  /* First part of [schema.]id field */
                                        size_t avg_tuple_size_idx =
                                                sql_index_tuple_size(space, idx);
                                        sqlite3VdbeMultiLoad(v, 2, "sii",
-                                                            pIdx->zName,
+                                                            pIdx->def->name,
                                                             avg_tuple_size_idx,
                                                             index_field_tuple_est(pIdx, 0));
                                        sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
@@ -443,11 +442,13 @@ sqlite3Pragma(Parse * pParse, Token * pId,        /* First part of [schema.]id field */
                                                 */
                                                pParse->nMem = 3;
                                        }
-                                       mx = index_column_count(pIdx);
+                                       mx = pIdx->def->key_def->part_count;
                                        assert(pParse->nMem <=
                                               pPragma->nPragCName);
-                                       for (i = 0; i < mx; i++) {
-                                               i16 cnum = pIdx->aiColumn[i];
+                                       struct key_part *part =
+                                               pIdx->def->key_def->parts;
+                                       for (i = 0; i < mx; i++, part++) {
+                                               i16 cnum = (int) part->fieldno;
                                                assert(pIdx->pTable);
                                                sqlite3VdbeMultiLoad(v, 1,
                                                                     "iis", i,
@@ -461,19 +462,18 @@ sqlite3Pragma(Parse * pParse, Token * pId,        /* First part of [schema.]id field */
                                                                     name);
                                                if (pPragma->iArg) {
                                                        const char *c_n;
-                                                       uint32_t id;
+                                                       uint32_t id =
+                                                               part->coll_id;
                                                        struct coll *coll =
-                                                               sql_index_collation(pIdx, i, &id);
+                                                               part->coll;
                                                        if (coll != NULL)
                                                                c_n = coll_by_id(id)->name;
                                                        else
                                                                c_n = "BINARY";
-                                                       enum sort_order sort_order;
-                                                       sort_order = sql_index_column_sort_order(pIdx,
-                                                                                                i);
                                                        sqlite3VdbeMultiLoad(v,
                                                                             4,
                                                                             "isi",
+                                                                            part->
                                                                             sort_order,
                                                                             c_n,
                                                                             i <
@@ -503,10 +503,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First part of [schema.]id field */
                                                    { "c", "u", "pk" };
                                                sqlite3VdbeMultiLoad(v, 1,
                                                                     "isisi", i,
-                                                                    pIdx->
-                                                                    zName,
-                                                                    index_is_unique
-                                                                    (pIdx),
+                                                                    pIdx->def->name,
+                                                                    pIdx->def->opts.is_unique,
                                                                     azOrigin
                                                                     [pIdx->
                                                                      idxType],
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 54f78a9de..adf10feca 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4366,7 +4366,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct SrcList_item *pFrom)
                char *zIndexedBy = pFrom->u1.zIndexedBy;
                Index *pIdx;
                for (pIdx = pTab->pIndex;
-                    pIdx && strcmp(pIdx->zName, zIndexedBy);
+                    pIdx && strcmp(pIdx->def->name, zIndexedBy);
                     pIdx = pIdx->pNext) ;
                if (!pIdx) {
                        sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index e939663b6..3a560f1db 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2051,28 +2051,6 @@ struct UnpackedRecord {
  * Each SQL index is represented in memory by an
  * instance of the following structure.
  *
- * The columns of the table that are to be indexed are described
- * by the aiColumn[] field of this structure.  For example, suppose
- * we have the following table and index:
- *
- *     CREATE TABLE Ex1(c1 int, c2 int, c3 text);
- *     CREATE INDEX Ex2 ON Ex1(c3,c1);
- *
- * In the Table structure describing Ex1, nCol==3 because there are
- * three columns in the table.  In the Index structure describing
- * Ex2, nColumn==2 since 2 of the 3 columns of Ex1 are indexed.
- * The value of aiColumn is {2, 0}.  aiColumn[0]==2 because the
- * first column to be indexed (c3) has an index of 2 in Ex1.aCol[].
- * The second column to be indexed (c1) has an index of 0 in
- * Ex1.aCol[], hence Ex2.aiColumn[1]==0.
- *
- * The Index.onError field determines whether or not the indexed columns
- * must be unique and what to do if they are not.  When Index.onError=
- * ON_CONFLICT_ACTION_NONE, it means this is not a unique index.
- * Otherwise it is a unique index and the value of Index.onError indicate
- * the which conflict resolution algorithm to employ whenever an attempt
- * is made to insert a non-unique element.
- *
  * While parsing a CREATE TABLE or CREATE INDEX statement in order to
  * generate VDBE code (as opposed to reading from Tarantool's _space
  * space as part of parsing an existing database schema), transient instances
@@ -2082,26 +2060,30 @@ struct UnpackedRecord {
  * program is executed). See convertToWithoutRowidTable() for details.
  */
 struct Index {
-       char *zName;            /* Name of this index */
-       i16 *aiColumn;          /* Which columns are used by this index.  1st is 0 */
-       LogEst *aiRowLogEst;    /* From ANALYZE: Est. rows selected by each column */
-       Table *pTable;          /* The SQL table being indexed */
-       char *zColAff;          /* String defining the affinity of each column */
-       Index *pNext;           /* The next index associated with the same table */
-       Schema *pSchema;        /* Schema containing this index */
-       /** Sorting order for each column. */
-       enum sort_order *sort_order;
-       /** Array of collation sequences for index. */
-       struct coll **coll_array;
-       /** Array of collation identifiers. */
-       uint32_t *coll_id_array;
-       Expr *pPartIdxWhere;    /* WHERE clause for partial indices */
-       int tnum;               /* DB Page containing root of this index */
-       u16 nColumn;            /* Number of columns stored in the index */
-       u8 onError;             /* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
-                                * or _NONE
-                                */
-       unsigned idxType:2;     /* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
+       /** From ANALYZE: Est. rows selected by each column. */
+       LogEst *aiRowLogEst;
+       /** The SQL table being indexed. */
+       Table *pTable;
+       /** String defining the affinity of each column. */
+       char *zColAff;
+       /** The next index associated with the same table. */
+       Index *pNext;
+       /** Schema containing this index. */
+       Schema *pSchema;
+       /** WHERE clause for partial indices. */
+       Expr *pPartIdxWhere;
+       /** DB Page containing root of this index. */
+       int tnum;
+       /**
+        * Conflict resolution algorithm to employ whenever an
+        * attempt is made to insert a non-unique element in
+        * unique index.
+        */
+       u8 onError;
+       /** 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX. */
+       unsigned idxType:2;
+       /** Index definition. */
+       struct index_def *def;
 };
 
 /**
@@ -3532,34 +3514,6 @@ void sqlite3AddCollateType(Parse *, Token *);
  */
 struct coll *
 sql_column_collation(struct space_def *def, uint32_t column, uint32_t *coll_id);
-/**
- * Return name of given column collation from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @param[out] coll_id Collation identifier.
- * @retval Pointer to collation.
- */
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
-
-/**
- * Return key_def of provided struct Index.
- * @param idx Pointer to `struct Index` object.
- * @retval Pointer to `struct key_def`.
- */
-struct key_def*
-sql_index_key_def(struct Index *idx);
-
-/**
- * Return sort order of given column from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @retval Sort order of requested column.
- */
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column);
 
 void sqlite3EndTable(Parse *, Token *, Token *, Select *);
 
@@ -3646,9 +3600,16 @@ void sqlite3SrcListShiftJoinType(SrcList *);
 void sqlite3SrcListAssignCursors(Parse *, SrcList *);
 void sqlite3IdListDelete(sqlite3 *, IdList *);
 void sqlite3SrcListDelete(sqlite3 *, SrcList *);
-Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
-bool
-index_is_unique(Index *);
+/**
+ * Allocate SQL index object with part count fields.
+ * @param db SQLite environment.
+ * @param part_count Index part_count.
+ *
+ * @retval NULL Memory error.
+ * @retval not NULL Index object.
+ */
+struct Index *
+sql_index_alloc(struct sqlite3 *db, uint32_t part_count);
 
 /**
  * Create a new index for an SQL table.  name is the name of the
@@ -3678,8 +3639,9 @@ index_is_unique(Index *);
 void
 sql_create_index(struct Parse *parse, struct Token *token,
                 struct SrcList *tbl_name, struct ExprList *col_list,
-                int on_error, struct Token *start, struct Expr *pi_where,
-                enum sort_order sort_order, bool if_not_exist, u8 idx_type);
+                enum on_conflict_action on_error, struct Token *start,
+                struct Expr *pi_where, enum sort_order sort_order,
+                bool if_not_exist, u8 idx_type);
 
 /**
  * This routine will drop an existing named index.  This routine
@@ -4531,10 +4493,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
 int
 sql_analysis_load(struct sqlite3 *db);
 
-uint32_t
-index_column_count(const Index *);
-bool
-index_is_unique_not_null(const Index *);
 void sqlite3RegisterLikeFunctions(sqlite3 *, int);
 int sqlite3IsLikeFunction(sqlite3 *, Expr *, int *, char *);
 void sqlite3SchemaClear(sqlite3 *);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 212adbcb3..113e3ba0e 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -239,17 +239,18 @@ sqlite3Update(Parse * pParse,             /* The parser context */
         */
        for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
                int reg;
-               int nIdxCol = index_column_count(pIdx);
+               uint32_t part_count = pIdx->def->key_def->part_count;
                if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
                        reg = ++pParse->nMem;
-                       pParse->nMem += nIdxCol;
+                       pParse->nMem += part_count;
                } else {
                        reg = 0;
-                       for (i = 0; i < nIdxCol; i++) {
-                               i16 iIdxCol = pIdx->aiColumn[i];
-                               if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
+                       for (uint32_t i = 0; i < part_count; i++) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[i].fieldno;
+                               if (aXRef[fieldno] >= 0) {
                                        reg = ++pParse->nMem;
-                                       pParse->nMem += nIdxCol;
+                                       pParse->nMem += part_count;
                                        break;
                                }
                        }
@@ -299,17 +300,18 @@ sqlite3Update(Parse * pParse,             /* The parser context */
         * In this case we have to manually load columns in order to make tuple.
         */
        int iPk;        /* First of nPk memory cells holding PRIMARY KEY value */
-       i16 nPk;        /* Number of components of the PRIMARY KEY */
+       /* Number of components of the PRIMARY KEY.  */
+       uint32_t pk_part_count;
        int addrOpen;   /* Address of the OpenEphemeral instruction */
 
        if (is_view) {
-               nPk = nKey;
+               pk_part_count = nKey;
        } else {
                assert(pPk != 0);
-               nPk = index_column_count(pPk);
+               pk_part_count = pPk->def->key_def->part_count;
        }
        iPk = pParse->nMem + 1;
-       pParse->nMem += nPk;
+       pParse->nMem += pk_part_count;
        regKey = ++pParse->nMem;
        iEph = pParse->nTab++;
        sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
@@ -318,7 +320,8 @@ sqlite3Update(Parse * pParse,               /* The parser context */
                addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
                                             nKey);
        } else {
-               addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
+               addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
+                                            pk_part_count);
                sql_vdbe_set_p4_key_def(pParse, pPk);
        }
 
@@ -328,27 +331,27 @@ sqlite3Update(Parse * pParse,             /* The parser context */
                goto update_cleanup;
        okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
        if (is_view) {
-               for (i = 0; i < nPk; i++) {
+               for (i = 0; i < (int) pk_part_count; i++) {
                        sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
                }
        } else {
-               for (i = 0; i < nPk; i++) {
-                       assert(pPk->aiColumn[i] >= 0);
+               for (i = 0; i < (int) pk_part_count; i++) {
                        sqlite3ExprCodeGetColumnOfTable(v, def, iDataCur,
-                                                       pPk->aiColumn[i],
+                                                       pPk->def->key_def->
+                                                               parts[i].fieldno,
                                                        iPk + i);
                }
        }
 
        if (okOnePass) {
                sqlite3VdbeChangeToNoop(v, addrOpen);
-               nKey = nPk;
+               nKey = pk_part_count;
                regKey = iPk;
        } else {
                const char *zAff = is_view ? 0 :
                                   sqlite3IndexAffinityStr(pParse->db, pPk);
-               sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
-                                         zAff, nPk);
+               sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, pk_part_count,
+                                 regKey, zAff, pk_part_count);
                sqlite3VdbeAddOp2(v, OP_IdxInsert, iEph, regKey);
                /* Set flag to save memory allocating one by malloc. */
                sqlite3VdbeChangeP5(v, 1);
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index a8bd1e582..e50a23b3d 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -1150,7 +1150,7 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, struct Index *idx)
        struct Vdbe *v = parse->pVdbe;
        assert(v != NULL);
        assert(idx != NULL);
-       struct key_def *def = key_def_dup(sql_index_key_def(idx));
+       struct key_def *def = key_def_dup(idx->def->key_def);
        if (def == NULL)
                sqlite3OomFault(parse->db);
        else
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 2ce90747d..d0e16bafb 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1087,15 +1087,15 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
                        Index *pIdx = p->pIdx;       /* Index being probed */
                        int nByte;      /* Bytes of space to allocate */
                        int i;  /* Counter variable */
-                       int nCol = index_column_count(pIdx);
+                       int part_count = pIdx->def->key_def->part_count;
 
-                       nByte = sizeof(Mem) * nCol +
+                       nByte = sizeof(Mem) * part_count +
                                ROUND8(sizeof(UnpackedRecord));
                        pRec =
                            (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
                        if (pRec == NULL)
                                return NULL;
-                       pRec->key_def = key_def_dup(sql_index_key_def(pIdx));
+                       pRec->key_def = key_def_dup(pIdx->def->key_def);
                        if (pRec->key_def == NULL) {
                                sqlite3DbFree(db, pRec);
                                sqlite3OomFault(db);
@@ -1103,7 +1103,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
                        }
                        pRec->aMem = (Mem *)((char *) pRec +
                                             ROUND8(sizeof(UnpackedRecord)));
-                       for (i = 0; i < nCol; i++) {
+                       for (i = 0; i < (int) part_count; i++) {
                                pRec->aMem[i].flags = MEM_Null;
                                pRec->aMem[i].db = db;
                        }
@@ -1621,15 +1621,12 @@ sql_stat4_column(struct sqlite3 *db, const char *record, uint32_t col_num,
 void
 sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
 {
-       if (pRec) {
-               int i;
-               int nCol = pRec->key_def->part_count;
-               Mem *aMem = pRec->aMem;
-               sqlite3 *db = aMem[0].db;
-               for (i = 0; i < nCol; i++) {
+       if (pRec != NULL) {
+               int part_count = pRec->key_def->part_count;
+               struct Mem *aMem = pRec->aMem;
+               for (int i = 0; i < part_count; i++)
                        sqlite3VdbeMemRelease(&aMem[i]);
-               }
-               sqlite3DbFree(db, pRec);
+               sqlite3DbFree(aMem[0].db, pRec);
        }
 }
 
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 85143ed20..4824c87f8 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -372,13 +372,19 @@ whereScanInit(WhereScan * pScan,  /* The WhereScan object being initialized */
        pScan->is_column_seen = false;
        if (pIdx) {
                int j = iColumn;
-               iColumn = pIdx->aiColumn[j];
+               iColumn = pIdx->def->key_def->parts[j].fieldno;
+               /*
+                * pIdx->tnum == 0 means that pIdx is a fake
+                * integer primary key index.
+                */
+               if (pIdx->tnum == 0)
+                       iColumn = -1;
+
                if (iColumn >= 0) {
                        char affinity =
                                pIdx->pTable->def->fields[iColumn].affinity;
                        pScan->idxaff = affinity;
-                       uint32_t id;
-                       pScan->coll = sql_index_collation(pIdx, j, &id);
+                       pScan->coll = pIdx->def->key_def->parts[j].coll;
                        pScan->is_column_seen = true;
                }
        }
@@ -541,47 +547,24 @@ findIndexCol(Parse * pParse,      /* Parse context */
             Index * pIdx,      /* Index to match column of */
             int iCol)          /* Column of index to match */
 {
+       struct key_part *part_to_match = &pIdx->def->key_def->parts[iCol];
        for (int i = 0; i < pList->nExpr; i++) {
                Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
-               if (p->op == TK_COLUMN &&
-                   p->iColumn == pIdx->aiColumn[iCol] &&
-                   p->iTable == iBase) {
+               if (p->op == TK_COLUMN && p->iTable == iBase &&
+                   p->iColumn == (int) part_to_match->fieldno) {
                        bool is_found;
                        uint32_t id;
                        struct coll *coll = sql_expr_coll(pParse,
                                                          pList->a[i].pExpr,
                                                          &is_found, &id);
-                       if (is_found &&
-                           coll == sql_index_collation(pIdx, iCol, &id)) {
+                       if (is_found && coll == part_to_match->coll)
                                return i;
-                       }
                }
        }
 
        return -1;
 }
 
-/*
- * Return TRUE if the iCol-th column of index pIdx is NOT NULL
- */
-static int
-indexColumnNotNull(Index * pIdx, int iCol)
-{
-       int j;
-       assert(pIdx != 0);
-       assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
-       j = pIdx->aiColumn[iCol];
-       if (j >= 0) {
-               return !pIdx->pTable->def->fields[j].is_nullable;
-       } else if (j == (-1)) {
-               return 1;
-       } else {
-               assert(j == (-2));
-               return 0;       /* Assume an indexed expression can always yield a NULL */
-
-       }
-}
-
 /*
  * Return true if the DISTINCT expression-list passed as the third argument
  * is redundant.
@@ -633,9 +616,9 @@ isDistinctRedundant(Parse * pParse,         /* Parsing context */
         *      contain a "col=X" term are subject to a NOT NULL constraint.
         */
        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-               if (!index_is_unique(pIdx))
+               if (!pIdx->def->opts.is_unique)
                        continue;
-               int col_count = index_column_count(pIdx);
+               int col_count = pIdx->def->key_def->part_count;
                for (i = 0; i < col_count; i++) {
                        if (0 ==
                            sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
@@ -643,11 +626,12 @@ isDistinctRedundant(Parse * pParse,               /* Parsing context */
                                if (findIndexCol
                                    (pParse, pDistinct, iBase, pIdx, i) < 0)
                                        break;
-                               if (indexColumnNotNull(pIdx, i) == 0)
+                               uint32_t j = pIdx->def->key_def->parts[i].fieldno;
+                               if (pIdx->pTable->def->fields[j].is_nullable)
                                        break;
                        }
                }
-               if (i == (int)index_column_count(pIdx)) {
+               if (i == (int) pIdx->def->key_def->part_count) {
                        /* This index implies that the DISTINCT qualifier is redundant. */
                        return 1;
                }
@@ -835,8 +819,7 @@ constructAutomaticIndex(Parse * pParse,                     /* The parsing context */
        }
 
        /* Construct the Index object to describe this index */
-       pIdx =
-           sqlite3AllocateIndexObject(pParse->db, nKeyCol + 1, 0, &zNotUsed);
+       pIdx = sql_index_alloc(pParse->db, nKeyCol + 1);
        if (pIdx == 0)
                goto end_auto_index_create;
        pLoop->pIndex = pIdx;
@@ -1184,7 +1167,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
 char
 sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
 {
-       assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
+       assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
        if (!pIdx->zColAff) {
                if (sqlite3IndexAffinityStr(db, pIdx) == 0)
                        return AFFINITY_BLOB;
@@ -1246,13 +1229,12 @@ whereRangeSkipScanEst(Parse * pParse,           /* Parsing & code generating context */
        int nUpper = index->def->opts.stat->sample_count + 1;
        int rc = SQLITE_OK;
        u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
-       uint32_t id;
 
        sqlite3_value *p1 = 0;  /* Value extracted from pLower */
        sqlite3_value *p2 = 0;  /* Value extracted from pUpper */
        sqlite3_value *pVal = 0;        /* Value extracted from record */
 
-       struct coll *pColl = sql_index_collation(p, nEq, &id);
+       struct coll *coll = p->def->key_def->parts[nEq].coll;
        if (pLower) {
                rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
                                               aff, &p1);
@@ -1273,12 +1255,12 @@ whereRangeSkipScanEst(Parse * pParse,           /* Parsing & code generating context */
                        rc = sql_stat4_column(db, samples[i].sample_key, nEq,
                                              &pVal);
                        if (rc == SQLITE_OK && p1) {
-                               int res = sqlite3MemCompare(p1, pVal, pColl);
+                               int res = sqlite3MemCompare(p1, pVal, coll);
                                if (res >= 0)
                                        nLower++;
                        }
                        if (rc == SQLITE_OK && p2) {
-                               int res = sqlite3MemCompare(p2, pVal, pColl);
+                               int res = sqlite3MemCompare(p2, pVal, coll);
                                if (res >= 0)
                                        nUpper++;
                        }
@@ -1448,7 +1430,7 @@ whereRangeScanEst(Parse * pParse, /* Parsing & code generating context */
                               || (pLower->eOperator & (WO_GT | WO_GE)) != 0);
                        assert(pUpper == 0
                               || (pUpper->eOperator & (WO_LT | WO_LE)) != 0);
-                       if (sql_index_column_sort_order(p, nEq) !=
+                       if (p->def->key_def->parts[nEq].sort_order !=
                            SORT_ORDER_ASC) {
                                /* The roles of pLower and pUpper are swapped for a DESC index */
                                SWAP(pLower, pUpper);
@@ -1598,7 +1580,7 @@ whereEqualScanEst(Parse * pParse, /* Parsing & code generating context */
        int bOk;
 
        assert(nEq >= 1);
-       assert(nEq <= (int)index_column_count(p));
+       assert(nEq <= (int) p->def->key_def->part_count);
        assert(pBuilder->nRecValid < nEq);
 
        /* If values are not available for all fields of the index to the left
@@ -1619,7 +1601,7 @@ whereEqualScanEst(Parse * pParse, /* Parsing & code generating context */
 
        whereKeyStats(pParse, p, pRec, 0, a);
        WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
-                         p->zName, nEq - 1, (int)a[1]));
+                         p->def->name, nEq - 1, (int)a[1]));
        *pnRow = a[1];
 
        return rc;
@@ -1751,7 +1733,7 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
                           pItem->zAlias ? pItem->zAlias : pTab->def->name);
 #endif
        const char *zName;
-       if (p->pIndex && (zName = p->pIndex->zName) != 0) {
+       if (p->pIndex != NULL && (zName = p->pIndex->def->name) != NULL) {
                if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
                        int i = sqlite3Strlen30(zName) - 1;
                        while (zName[i] != '_')
@@ -2314,7 +2296,7 @@ whereRangeVectorLen(Parse * pParse,       /* Parsing context */
        int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
        int i;
 
-       nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
+       nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
        for (i = 1; i < nCmp; i++) {
                /* Test if comparison i of pTerm is compatible with column (i+nEq)
                 * of the index. If not, exit the loop.
@@ -2335,13 +2317,11 @@ whereRangeVectorLen(Parse * pParse,     /* Parsing context */
                 * order of the index column is the same as the sort order of the
                 * leftmost index column.
                 */
-               if (pLhs->op != TK_COLUMN
-                   || pLhs->iTable != iCur
-                   || pLhs->iColumn != pIdx->aiColumn[i + nEq]
-                   || sql_index_column_sort_order(pIdx, i + nEq) !=
-                      sql_index_column_sort_order(pIdx, nEq)) {
+               struct key_part *parts = pIdx->def->key_def->parts;
+               if (pLhs->op != TK_COLUMN || pLhs->iTable != iCur ||
+                   pLhs->iColumn != (int)parts[i + nEq].fieldno ||
+                   parts[i + nEq].sort_order != parts[nEq].sort_order)
                        break;
-               }
 
                aff = sqlite3CompareAffinity(pRhs, sqlite3ExprAffinity(pLhs));
                idxaff =
@@ -2353,7 +2333,7 @@ whereRangeVectorLen(Parse * pParse,       /* Parsing context */
                pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
                if (pColl == 0)
                        break;
-               if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
+               if (pIdx->def->key_def->parts[i + nEq].coll != pColl)
                        break;
        }
        return i;
@@ -2396,13 +2376,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     /* The WhereLoop factory */
        LogEst rSize;           /* Number of rows in the table */
        LogEst rLogSize;        /* Logarithm of table size */
        WhereTerm *pTop = 0, *pBtm = 0; /* Top and bottom range constraints */
-       uint32_t nProbeCol = index_column_count(pProbe);
+       uint32_t probe_part_count = pProbe->def->key_def->part_count;
 
        pNew = pBuilder->pNew;
        if (db->mallocFailed)
                return SQLITE_NOMEM_BKPT;
        WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
-                          pProbe->zName, pNew->nEq));
+                          pProbe->def->name, pNew->nEq));
 
        assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
        if (pNew->wsFlags & WHERE_BTM_LIMIT) {
@@ -2431,7 +2411,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       /* The WhereLoop factory */
                stat = &surrogate_stat;
        if (stat->is_unordered)
                opMask &= ~(WO_GT | WO_GE | WO_LT | WO_LE);
-       assert(pNew->nEq < nProbeCol);
+       assert(pNew->nEq < probe_part_count);
 
        saved_nEq = pNew->nEq;
        saved_nBtm = pNew->nBtm;
@@ -2452,10 +2432,14 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     /* The WhereLoop factory */
                LogEst nOutUnadjusted;  /* nOut before IN() and WHERE adjustments */
                int nIn = 0;
                int nRecValid = pBuilder->nRecValid;
-               if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
-                   && indexColumnNotNull(pProbe, saved_nEq)
-                   ) {
-                       continue;       /* ignore IS [NOT] NULL constraints on NOT NULL columns */
+               uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
+               if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0) &&
+                   !pProbe->pTable->def->fields[j].is_nullable) {
+                       /*
+                        * Ignore IS [NOT] NULL constraints on NOT
+                        * NULL columns.
+                        */
+                       continue;
                }
                if (pTerm->prereqRight & pNew->maskSelf)
                        continue;
@@ -2523,14 +2507,16 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     /* The WhereLoop factory */
                                                         */
                        }
                } else if (eOp & WO_EQ) {
-                       int iCol = pProbe->aiColumn[saved_nEq];
+                       int iCol = pProbe->def->key_def->parts[saved_nEq].fieldno;
                        pNew->wsFlags |= WHERE_COLUMN_EQ;
                        assert(saved_nEq == pNew->nEq);
-                       if ((iCol > 0 && nInMul == 0
-                               && saved_nEq == nProbeCol - 1)
-                           ) {
-                               if (iCol >= 0 &&
-                                   !index_is_unique_not_null(pProbe)) {
+                       if (iCol > 0 && nInMul == 0 &&
+                           saved_nEq == probe_part_count - 1) {
+                               bool index_is_unique_not_null =
+                                       pProbe->def->key_def->is_nullable &&
+                                       pProbe->def->opts.is_unique;
+                               if (pProbe->tnum != 0 &&
+                                   !index_is_unique_not_null) {
                                        pNew->wsFlags |= WHERE_UNQ_WANTED;
                                } else {
                                        pNew->wsFlags |= WHERE_ONEROW;
@@ -2592,8 +2578,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       /* The WhereLoop factory */
                        assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
 
                        assert(pNew->nOut == saved_nOut);
-                       if (pTerm->truthProb <= 0
-                           && pProbe->aiColumn[saved_nEq] >= 0) {
+                       if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
                                assert((eOp & WO_IN) || nIn == 0);
                                testcase(eOp & WO_IN);
                                pNew->nOut += pTerm->truthProb;
@@ -2695,8 +2680,8 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       /* The WhereLoop factory */
                        pNew->nOut = nOutUnadjusted;
                }
 
-               if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0
-                   && pNew->nEq < nProbeCol) {
+               if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0 &&
+                   pNew->nEq < probe_part_count) {
                        whereLoopAddBtreeIndex(pBuilder, pSrc, pProbe,
                                               nInMul + nIn);
                }
@@ -2724,7 +2709,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       /* The WhereLoop factory */
         * more expensive.
         */
        assert(42 == sqlite3LogEst(18));
-       if (saved_nEq == saved_nSkip && saved_nEq + 1U < nProbeCol &&
+       if (saved_nEq == saved_nSkip && saved_nEq + 1U < probe_part_count &&
            stat->skip_scan_enabled == true &&
            /* TUNING: Minimum for skip-scan */
            index_field_tuple_est(pProbe, saved_nEq + 1) >= 42 &&
@@ -2749,7 +2734,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       /* The WhereLoop factory */
        }
 
        WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
-                          pProbe->zName, saved_nEq, rc));
+                          pProbe->def->name, saved_nEq, rc));
        return rc;
 }
 
@@ -2792,7 +2777,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
 {
        ExprList *pOB;
        int ii, jj;
-       int nIdxCol = index_column_count(pIndex);
+       int part_count = pIndex->def->key_def->part_count;
        if (index_is_unordered(pIndex))
                return 0;
        if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
@@ -2802,8 +2787,9 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
                if (pExpr->op == TK_COLUMN && pExpr->iTable == iCursor) {
                        if (pExpr->iColumn < 0)
                                return 1;
-                       for (jj = 0; jj < nIdxCol; jj++) {
-                               if (pExpr->iColumn == pIndex->aiColumn[jj])
+                       for (jj = 0; jj < part_count; jj++) {
+                               if (pExpr->iColumn == (int)
+                                   pIndex->def->key_def->parts[jj].fieldno)
                                        return 1;
                        }
                }
@@ -2882,7 +2868,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    /* WHERE clause information */
        Index *pProbe;          /* An index we are evaluating */
        Index sPk;              /* A fake index object for the primary key */
        LogEst aiRowEstPk[2];   /* The aiRowLogEst[] value for the sPk index */
-       i16 aiColumnPk = -1;    /* The aColumn[] value for the sPk index */
        SrcList *pTabList;      /* The FROM clause */
        struct SrcList_item *pSrc;      /* The FROM clause btree term to add */
        WhereLoop *pNew;        /* Template WhereLoop object */
@@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,  /* WHERE clause information */
                 */
                Index *pFirst;  /* First of real indices on the table */
                memset(&sPk, 0, sizeof(Index));
-               sPk.nColumn = 1;
-               sPk.aiColumn = &aiColumnPk;
                sPk.aiRowLogEst = aiRowEstPk;
                sPk.onError = ON_CONFLICT_ACTION_REPLACE;
                sPk.pTable = pTab;
+
+               struct key_def *key_def = key_def_new(1);
+               if (key_def == NULL) {
+                       pWInfo->pParse->nErr++;
+                       pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+                       return SQL_TARANTOOL_ERROR;
+               }
+
+               key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
+                                ON_CONFLICT_ACTION_ABORT,
+                                NULL, COLL_NONE, SORT_ORDER_ASC);
+
+               sPk.def = index_def_new(pTab->def->id, 0, "primary",
+                                       sizeof("primary") - 1, TREE,
+                                       &index_opts_default, key_def, NULL);
+               key_def_delete(key_def);
+
+               if (sPk.def == NULL) {
+                       pWInfo->pParse->nErr++;
+                       pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+                       return SQL_TARANTOOL_ERROR;
+               }
+
                aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
                aiRowEstPk[1] = 0;
                pFirst = pSrc->pTab->pIndex;
@@ -3058,6 +3064,8 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    /* WHERE clause information */
                if (pSrc->pIBIndex)
                        break;
        }
+       if (rc != SQLITE_OK && sPk.def != NULL)
+               index_def_delete(sPk.def);
        return rc;
 }
 
@@ -3392,8 +3400,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,     /* The WHERE clause */
                                   index_is_unordered(pIndex)) {
                                return 0;
                        } else {
-                               nColumn = index_column_count(pIndex);
-                               isOrderDistinct = index_is_unique(pIndex);
+                               nColumn = pIndex->def->key_def->part_count;
+                               isOrderDistinct = pIndex->def->opts.is_unique;
                        }
 
                        /* Loop through all columns of the index and deal with the ones
@@ -3454,9 +3462,10 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    /* The WHERE clause */
                                 * (revIdx) for the j-th column of the index.
                                 */
                                if (pIndex != NULL) {
-                                       iColumn = pIndex->aiColumn[j];
-                                       revIdx = sql_index_column_sort_order(pIndex,
-                                                                            j);
+                                       struct key_def *def =
+                                               pIndex->def->key_def;
+                                       iColumn = def->parts[j].fieldno;
+                                       revIdx = def->parts[j].sort_order;
                                        if (iColumn == pIndex->pTable->iPKey)
                                                iColumn = -1;
                                } else {
@@ -3506,8 +3515,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,     /* The WHERE clause */
                                                                      pOrderBy->a[i].pExpr,
                                                                      &is_found, &id);
                                                struct coll *idx_coll =
-                                                       sql_index_collation(pIndex,
-                                                                           j, &id);
+                                                       pIndex->def->key_def->parts[j].coll;
                                                if (is_found &&
                                                    coll != idx_coll)
                                                        continue;
@@ -4777,7 +4785,7 @@ sqlite3WhereBegin(Parse * pParse, /* The parser context */
                                        sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);  /* Hint to COMDB2 */
                                }
                                if (pIx != NULL)
-                                       VdbeComment((v, "%s", pIx->zName));
+                                       VdbeComment((v, "%s", pIx->def->name));
                                else
                                        VdbeComment((v, "%s", idx_def->name));
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -4910,7 +4918,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
                if (pLevel->addrSkip) {
                        sqlite3VdbeGoto(v, pLevel->addrSkip);
                        VdbeComment((v, "next skip-scan on %s",
-                                    pLoop->pIndex->zName));
+                                    pLoop->pIndex->def->name));
                        sqlite3VdbeJumpHere(v, pLevel->addrSkip);
                        sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
                }
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index c35c25ac4..1976583fa 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -48,7 +48,7 @@
 static const char *
 explainIndexColumnName(Index * pIdx, int i)
 {
-       i = pIdx->aiColumn[i];
+       i = pIdx->def->key_def->parts[i].fieldno;
        return pIdx->pTable->def->fields[i].name;
 }
 
@@ -243,7 +243,7 @@ sqlite3WhereExplainOneScan(Parse * pParse,  /* Parse context */
                        if (zFmt) {
                                sqlite3StrAccumAppend(&str, " USING ", 7);
                                if (pIdx != NULL)
-                                       sqlite3XPrintf(&str, zFmt, pIdx->zName);
+                                       sqlite3XPrintf(&str, zFmt, pIdx->def->name);
                                else if (idx_def != NULL)
                                        sqlite3XPrintf(&str, zFmt, idx_def->name);
                                else
@@ -488,7 +488,7 @@ codeEqualityTerm(Parse * pParse,    /* The parsing context */
                int *aiMap = 0;
 
                if (pLoop->pIndex != 0 &&
-                   sql_index_column_sort_order(pLoop->pIndex, iEq)) {
+                   pLoop->pIndex->def->key_def->parts[iEq].sort_order) {
                        testcase(iEq == 0);
                        testcase(bRev);
                        bRev = !bRev;
@@ -736,7 +736,7 @@ codeAllEqualityTerms(Parse * pParse,        /* Parsing context */
                sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
                VdbeCoverageIf(v, bRev == 0);
                VdbeCoverageIf(v, bRev != 0);
-               VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
+               VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
                j = sqlite3VdbeAddOp0(v, OP_Goto);
                pLevel->addrSkip =
                    sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
@@ -746,7 +746,8 @@ codeAllEqualityTerms(Parse * pParse,        /* Parsing context */
                sqlite3VdbeJumpHere(v, j);
                for (j = 0; j < nSkip; j++) {
                        sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                         pIdx->aiColumn[j], regBase + j);
+                                         pIdx->def->key_def->parts[j].fieldno,
+                                         regBase + j);
                        VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
                }
        }
@@ -1037,14 +1038,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                assert(pWInfo->pOrderBy == 0
                       || pWInfo->pOrderBy->nExpr == 1
                       || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
-               int nIdxCol;
+               uint32_t part_count;
                if (pIdx != NULL)
-                       nIdxCol = index_column_count(pIdx);
+                       part_count = pIdx->def->key_def->part_count;
                else
-                       nIdxCol = idx_def->key_def->part_count;
-               if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
-                   && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
-                       j = pIdx->aiColumn[nEq];
+                       part_count = idx_def->key_def->part_count;
+               if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0 &&
+                   pWInfo->nOBSat > 0 && part_count > nEq) {
+                       j = pIdx->def->key_def->parts[nEq].fieldno;
                        /* Allow seek for column with `NOT NULL` == false attribute.
                         * If a column may contain NULL-s, the comparator installed
                         * by Tarantool is prepared to seek using a NULL value.
@@ -1055,8 +1056,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,  /* Complete information about t
                         * FYI: entries in an index are ordered as follows:
                         *      NULL, ... NULL, min_value, ...
                         */
-                       if (j >= 0 &&
-                           pIdx->pTable->def->fields[j].is_nullable) {
+                       if (pIdx->pTable->def->fields[j].is_nullable) {
                                assert(pLoop->nSkip == 0);
                                bSeekPastNull = 1;
                                nExtraReg = 1;
@@ -1093,16 +1093,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                                testcase(pIdx->aSortOrder[nEq] ==
                                         SORT_ORDER_DESC);
                                assert((bRev & ~1) == 0);
+                               struct key_def *def = pIdx->def->key_def;
                                pLevel->iLikeRepCntr <<= 1;
                                pLevel->iLikeRepCntr |=
-                                       bRev ^ (sql_index_column_sort_order(pIdx, nEq) ==
+                                       bRev ^ (def->parts[nEq].sort_order ==
                                                SORT_ORDER_DESC);
                        }
 #endif
                        if (pRangeStart == 0) {
-                               j = pIdx->aiColumn[nEq];
-                               if (j >= 0 &&
-                                   pIdx->pTable->def->fields[j].is_nullable)
+                               j = pIdx->def->key_def->parts[nEq].fieldno;
+                               if (pIdx->pTable->def->fields[j].is_nullable)
                                        bSeekPastNull = 1;
                        }
                }
@@ -1113,10 +1113,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo, /* Complete information about t
                 * a forward order scan on a descending index, interchange the
                 * start and end terms (pRangeStart and pRangeEnd).
                 */
-               if ((nEq < nIdxCol &&
-                    bRev == (sql_index_column_sort_order(pIdx, nEq) ==
-                             SORT_ORDER_ASC)) ||
-                   (bRev && nIdxCol == nEq)) {
+               if ((nEq < part_count &&
+                    bRev == (pIdx->def->key_def->parts[nEq].sort_order ==
+                             SORT_ORDER_ASC)) || (bRev && part_count == nEq)) {
                        SWAP(pRangeEnd, pRangeStart);
                        SWAP(bSeekPastNull, bStopAtNull);
                        SWAP(nBtm, nTop);
@@ -1196,16 +1195,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                        }
                } else {
                        pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-                       affinity =
-                               pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+                       uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
+                       affinity = pIdx->pTable->def->fields[fieldno].affinity;
                }
 
-               int nPkCol;
+               uint32_t pk_part_count;
                if (pk != NULL)
-                       nPkCol = index_column_count(pk);
+                       pk_part_count = pk->def->key_def->part_count;
                else
-                       nPkCol = idx_pk->key_def->part_count;
-               if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
+                       pk_part_count = idx_pk->key_def->part_count;
+               if (pk_part_count == 1 && affinity == AFFINITY_INTEGER) {
                        /* Right now INTEGER PRIMARY KEY is the only option to
                         * get Tarantool's INTEGER column type. Need special handling
                         * here: try to loosely convert FLOAT to INT. If RHS type
@@ -1213,8 +1212,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,  /* Complete information about t
                         */
                        int limit = pRangeStart == NULL ? nEq : nEq + 1;
                        for (int i = 0; i < limit; i++) {
-                               if ((pIdx != NULL && pIdx->aiColumn[i] ==
-                                    pk->aiColumn[0]) ||
+                               if ((pIdx != NULL &&
+                                    pIdx->def->key_def->parts[i].fieldno ==
+                                    pk->def->key_def->parts[0].fieldno) ||
                                    (idx_pk != NULL &&
                                     idx_def->key_def->parts[i].fieldno ==
                                     idx_pk->key_def->parts[0].fieldno)) {
@@ -1326,17 +1326,17 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                        /* pIdx is a covering index.  No need to access the main table. */
                }  else if (iCur != iIdxCur) {
                        Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-                       int nPkCol = index_column_count(pPk);
-                       int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
-                       for (j = 0; j < nPkCol; j++) {
-                               k = pPk->aiColumn[j];
+                       int pk_part_count = pPk->def->key_def->part_count;
+                       int iKeyReg = sqlite3GetTempRange(pParse, pk_part_count);
+                       for (j = 0; j < pk_part_count; j++) {
+                               k = pPk->def->key_def->parts[j].fieldno;
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
                                                  iKeyReg + j);
                        }
                        sqlite3VdbeAddOp4Int(v, OP_NotFound, iCur, addrCont,
-                                            iKeyReg, nPkCol);
+                                            iKeyReg, pk_part_count);
                        VdbeCoverage(v);
-                       sqlite3ReleaseTempRange(pParse, iKeyReg, nPkCol);
+                       sqlite3ReleaseTempRange(pParse, iKeyReg, pk_part_count);
                }
 
                /* Record the instruction used to terminate the loop. */
@@ -1434,10 +1434,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                 */
                if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
                        Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-                       int nPkCol = index_column_count(pPk);
+                       int pk_part_count = pPk->def->key_def->part_count;
                        regRowset = pParse->nTab++;
                        sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
-                                         regRowset, nPkCol);
+                                         regRowset, pk_part_count);
                        sql_vdbe_set_p4_key_def(pParse, pPk);
                        regPk = ++pParse->nMem;
                }
@@ -1538,16 +1538,23 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                                                int iSet =
                                                    ((ii == pOrWc->nTerm - 1) ? -1 : ii);
                                                Index *pPk = sqlite3PrimaryKeyIndex (pTab);
-                                               int nPk = index_column_count(pPk);
-                                               int iPk;
+                                               struct key_def *def =
+                                                       pPk->def->key_def;
 
                                                /* Read the PK into an array of temp registers. */
-                                               r = sqlite3GetTempRange(pParse, nPk);
-                                               for (iPk = 0; iPk < nPk; iPk++) {
-                                                       int iCol = pPk->aiColumn[iPk];
+                                               r = sqlite3GetTempRange(pParse,
+                                                                       def->part_count);
+                                               for (uint32_t iPk = 0;
+                                                    iPk < def->part_count;
+                                                    iPk++) {
+                                                       uint32_t fieldno =
+                                                               def->parts[iPk].
+                                                               fieldno;
                                                        sqlite3ExprCodeGetColumnToReg
-                                                               (pParse, pTab->def,
-                                                                iCol, iCur,
+                                                               (pParse,
+                                                                pTab->def,
+                                                                fieldno,
+                                                                iCur,
                                                                 r + iPk);
                                                }
 
@@ -1567,20 +1574,21 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        /* Complete information about t
                                                        jmp1 = sqlite3VdbeAddOp4Int
                                                                (v, OP_Found,
                                                                 regRowset, 0,
-                                                                r, nPk);
+                                                                r,
+                                                                def->part_count);
                                                        VdbeCoverage(v);
                                                }
                                                if (iSet >= 0) {
                                                        sqlite3VdbeAddOp3
                                                                (v, OP_MakeRecord,
-                                                                r, nPk, regPk);
+                                                                r, def->part_count, regPk);
                                                        sqlite3VdbeAddOp2
                                                                (v, OP_IdxInsert,
                                                                 regRowset, regPk);
                                                }
 
                                                /* Release the array of temp registers */
-                                               sqlite3ReleaseTempRange(pParse, r, nPk);
+                                               sqlite3ReleaseTempRange(pParse, r, def->part_count);
                                        }
 
                                        /* Invoke the main loop body as a subroutine */
diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation1.test.lua
similarity index 100%
rename from test/sql-tap/collation.test.lua
rename to test/sql-tap/collation1.test.lua
diff --git a/test/sql-tap/collation2.test.lua b/test/sql-tap/collation2.test.lua
new file mode 100755
index 000000000..eff8f536d
--- /dev/null
+++ b/test/sql-tap/collation2.test.lua
@@ -0,0 +1,21 @@
+#!/usr/bin/env tarantool
+test = require("sqltester")
+test:plan(3)
+
+test:do_catchsql_test(
+        "collation-2.1",
+        'CREATE TABLE test1 (a int, b int, c int, PRIMARY KEY (a, a, a, b, c))',
+        nil)
+
+test:do_catchsql_test(
+        "collation-2.2",
+        'CREATE TABLE test2 (a int, b int, c int, PRIMARY KEY (a, a, a, b, b, a, c))',
+        nil)
+
+test:do_catchsql_test(
+        "collation-2.3",
+        'CREATE TABLE test3 (a int, b int, c int, PRIMARY KEY (a, a COLLATE foo, b, c))',
+        {1, "Collation 'FOO' does not exist"})
+
+test:finish_test()
diff --git a/test/sql-tap/colname.test.lua b/test/sql-tap/colname.test.lua
index c53a1e885..ddc06eea7 100755
--- a/test/sql-tap/colname.test.lua
+++ b/test/sql-tap/colname.test.lua
@@ -643,13 +643,13 @@ test:do_catchsql_test(
     "colname-11.2",
     [[CREATE TABLE t1(a, b, c, d, e, 
       PRIMARY KEY(a), UNIQUE('b' COLLATE "unicode_ci" DESC));]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
 
 test:execsql("create table table1(a primary key, b, c)")
 
 test:do_catchsql_test(
     "colname-11.3",
     [[ CREATE INDEX t1c ON table1('c'); ]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
 
 test:finish_test()
diff --git a/test/sql-tap/identifier_case.test.lua b/test/sql-tap/identifier_case.test.lua
index 5e7573ac4..ed9553c6b 100755
--- a/test/sql-tap/identifier_case.test.lua
+++ b/test/sql-tap/identifier_case.test.lua
@@ -206,8 +206,8 @@ data = {
     { 3,  [["binary"]], {0}},
     { 4,  [["bInaRy"]], {0}},
     { 5,  [["unicode"]], {0}},
-    { 6,  [[ unicode ]], {1,"no such collation sequence: UNICODE"}},
-    { 7,  [["UNICODE"]], {1,"no such collation sequence: UNICODE"}}
+    { 6,  [[ unicode ]], {1,"Collation 'UNICODE' does not exist"}},
+    { 7,  [["UNICODE"]], {1,"Collation 'UNICODE' does not exist"}}
 }
 
 test:do_catchsql_test(
diff --git a/test/sql/message-func-indexes.result b/test/sql/message-func-indexes.result
index 4bf1cab49..dbcb12f03 100644
--- a/test/sql/message-func-indexes.result
+++ b/test/sql/message-func-indexes.result
@@ -12,25 +12,25 @@ box.sql.execute("CREATE TABLE t2(object INTEGER PRIMARY KEY, price INTEGER, coun
 -- should return certain message.
 box.sql.execute("CREATE INDEX i1 ON t1(a+1)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i2 ON t1(a)")
 ---
 ...
 box.sql.execute("CREATE INDEX i3 ON t2(price + 100)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i4 ON t2(price)")
 ---
 ...
 box.sql.execute("CREATE INDEX i5 ON t2(count + 1)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i6 ON t2(count * price)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 -- Cleaning up.
 box.sql.execute("DROP TABLE t1")
-- 

</pre>
    <pre>
  

</pre>
  </body>
</html>