[tarantool-patches] Re: [PATCH v5] sql: add index_def to struct Index

Kirill Shcherbatov kshcherbatov at tarantool.org
Wed Jun 27 20:57:25 MSK 2018


LDTM.
Vlad, please take a look. 
Kirill Y. asked to merge this patch somewhat quicker.

On 27.06.2018 20:46, Ivan Koptelov wrote:
> Thank for the review! The new ver. of the patch is at the end of mail.
>> Hi! Thank you for your patch. I'd like to tell that personally I hate netpicks like "same as 17"(to change variable name to tarantool codetyle on minor refactoring changes - you'll understand what I mean a bit later) and strongly recoment to ask Vlad, is this netpick really nessesary to fix.
>> As for me, I've refactored code this way so many times... So, I've wrote comments "Same as 17", but don't like them, really). Keep it in mind, You could use autorenaming feature of your IDE for this purpose.
>>
>> The biggest problem now is writing in NULL ptr with malloc on region_alloc failure in append_string_part.
>>
>> Here we go: 1. Take a look into commit history: you have picked an extra commit 'sql: remove zName and nColumn from SQL' on rebase.
>> As you see, it contains not too much changes; and they are wrong (compilation error).
>> +++ b/src/box/sql/trigger.c
>> @@ -873,6 +873,8 @@ codeRowTrigger(Parse * pParse,      /* Current parse context */
>>          pSubParse->pToplevel = pTop;
>>          pSubParse->eTriggerOp = pTrigger->op;
>>          pSubParse->nQueryLoop = pParse->nQueryLoop;
>> +       struct region *region = &fiber()->gc;
>> +       pSubParse->region_initial_size = region_used(region);
>>
>> Use interactive rebase to drop this commit.
> I don't remember, why it was needed. Rebased onto 2.0.
>> 2. I still have compilation error on gcc build:
>> /home/kir/tarantool/src/box/sql/fkey.c: In function ‘fkScanChildren’:
>> /home/kir/tarantool/src/box/sql/fkey.c:653:47: error: signed and unsigned type in conditional expression [-Werror=sign-compare]
>>            pIdx->def->key_def->parts[i].fieldno : -1;
>>                                                 ^
>> /home/kir/tarantool/src/box/sql/fkey.c: At top level:
>> cc1: error: unrecognized command line option ‘-Wno-format-truncation’ [-Werror]
>> cc1: all warnings being treated as errors
>> src/box/sql/CMakeFiles/sql.dir/build.make:374: recipe for target 'src/box/sql/CMakeFiles/sql.dir/fkey.c.o' failed
>> make[2]: *** [src/box/sql/CMakeFiles/sql.dir/fkey.c.o] Error 1
>> make[2]: *** Waiting for unfinished jobs....
>>
>> You need to cast fieldno to int.
> Fixed.
>>> Here is the patch:
>>> --
>>> sql: add index_def to Index
>>>
>>> Now every sqlite struct Index is created with tnt struct
>>> index_def inside. This allows us to use tnt index_def
>>> in work with sqlite indexes in the same manner as with
>>> tnt index and is a step to remove sqlite Index with
>>> tnt index.
>>> Fields coll_array, coll_id_array, aiColumn, sort_order
>>> and zName are removed from Index. All usages of this
>>> fields changed to usage of corresponding index_def
>>> fields.
>>> index_is_unique(), sql_index_collation() and
>>> index_column_count() are removed with calls of
>>> index_def corresponding fields.
>>>
>>> Closes: #3369
>>> ---
>>> Branch:
>>> https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where
>>> Issue: https://github.com/tarantool/tarantool/issues/3369
>>>    src/box/sql.c           |  54 +++---
>>>    src/box/sql/analyze.c   |  40 ++--
>>>    src/box/sql/build.c     | 488
>>> ++++++++++++++++++++++++------------------------
>>>    src/box/sql/delete.c    |  10 +-
>>>    src/box/sql/expr.c      |  48 ++---
>>>    src/box/sql/fkey.c      |  48 ++---
>>>    src/box/sql/insert.c    |  76 ++++----
>>>    src/box/sql/pragma.c    |  28 +--
>>>    src/box/sql/select.c    |   2 +-
>>>    src/box/sql/sqliteInt.h |  74 ++------
>>>    src/box/sql/trigger.c   |   2 -
>>>    src/box/sql/update.c    |  10 +-
>>>    src/box/sql/vdbeaux.c   |   2 +-
>>>    src/box/sql/vdbemem.c   |   4 +-
>>>    src/box/sql/where.c     | 151 +++++++--------
>>>    src/box/sql/wherecode.c |  51 ++---
>>>    16 files changed, 517 insertions(+), 571 deletions(-)
>>>
>>> diff --git a/src/box/sql.c b/src/box/sql.c
>>> index 82f3d6d52..a24812c65 100644
>>> --- a/src/box/sql.c
>>> +++ b/src/box/sql.c
>>> @@ -1452,8 +1452,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable,
>>> void *buf)
>>>
>>>        /* If table's PK is single column which is INTEGER, then
>>>         * treat it as strict type, not affinity.  */
>>> -    if (pk_idx && pk_idx->nColumn == 1) {
>>> -        int pk = pk_idx->aiColumn[0];
>>> +    if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
>>> +        int pk = pk_idx->def->key_def->parts[0].fieldno;
>>>            if (def->fields[pk].type == FIELD_TYPE_INTEGER)
>>>                pk_forced_int = pk;
>>>        }
>>> @@ -1564,20 +1564,19 @@ tarantoolSqlite3MakeTableOpts(Table *pTable,
>>> const char *zSql, char *buf)
>>>     */
>>>    int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
>>>    {
>>> -    struct space_def *def = pIndex->pTable->def;
>>> -    assert(def != NULL);
>>> +    struct field_def *fields = pIndex->pTable->def->fields;
>>> +    struct key_def *key_def = pIndex->def->key_def;
>>>        const struct Enc *enc = get_enc(buf);
>>> -    struct SqliteIndex *primary_index;
>>> -    char *base = buf, *p;
>>> -    int pk_forced_int = -1;
>>> -
>>> -    primary_index = sqlite3PrimaryKeyIndex(pIndex->pTable);
>>> +    char *base = buf;
>>> +    uint32_t pk_forced_int = UINT32_MAX;
>>> +    struct SqliteIndex *primary_index =
>>> +        sqlite3PrimaryKeyIndex(pIndex->pTable);
>>>
>>>        /* If table's PK is single column which is INTEGER, then
>>>         * treat it as strict type, not affinity.  */
>>> -    if (primary_index->nColumn == 1) {
>>> -        int pk = primary_index->aiColumn[0];
>>> -        if (def->fields[pk].type == FIELD_TYPE_INTEGER)
>>> +    if (primary_index->def->key_def->part_count == 1) {
>>> +        int pk = primary_index->def->key_def->parts[0].fieldno;
>>> +        if (fields[pk].type == FIELD_TYPE_INTEGER)
>>>                pk_forced_int = pk;
>>>        }
>>>
>>> @@ -1587,46 +1586,45 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex
>>> *pIndex, void *buf)
>>>         * primary key columns. Query planner depends on this particular
>>>         * data layout.
>>>         */
>>> -    int i, n = pIndex->nColumn;
>>> -
>>> -    p = enc->encode_array(base, n);
>>> -    for (i = 0; i < n; i++) {
>>> -        int col = pIndex->aiColumn[i];
>>> -        assert(def->fields[col].is_nullable ==
>>> - action_is_nullable(def->fields[col].nullable_action));
>>> +    struct key_part *part = key_def->parts;
>>> +    char *p = enc->encode_array(base, key_def->part_count);
>>> +    for (uint32_t i = 0; i < key_def->part_count; ++i, ++part) {
>>> +        uint32_t col = part->fieldno;
>>> +        assert(fields[col].is_nullable ==
>>> +               action_is_nullable(fields[col].nullable_action));
>>>            const char *t;
>>>            if (pk_forced_int == col) {
>>>                t = "integer";
>>>            } else {
>>> -            enum affinity_type affinity = def->fields[col].affinity;
>>> -            t = convertSqliteAffinity(affinity,
>>> -                          def->fields[col].is_nullable);
>>> +            t = convertSqliteAffinity(fields[col].affinity,
>>> +                          fields[col].is_nullable);
>>>            }
>>>            /* do not decode default collation */
>>> -        uint32_t cid = pIndex->coll_id_array[i];
>>> +        uint32_t cid = part->coll_id;
>>>            p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
>>>            p = enc->encode_str(p, "type", sizeof("type")-1);
>>>            p = enc->encode_str(p, t, strlen(t));
>>>            p = enc->encode_str(p, "field", sizeof("field")-1);
>>>            p = enc->encode_uint(p, col);
>>>            if (cid != COLL_NONE) {
>>> -            p = enc->encode_str(p, "collation", sizeof("collation")-1);
>>> +            p = enc->encode_str(p, "collation",
>>> +                        sizeof("collation") - 1);
>>>                p = enc->encode_uint(p, cid);
>>>            }
>>>            p = enc->encode_str(p, "is_nullable", 11);
>>> -        p = enc->encode_bool(p, def->fields[col].is_nullable);
>>> +        p = enc->encode_bool(p, fields[col].is_nullable);
>>>            p = enc->encode_str(p, "nullable_action", 15);
>>>            const char *action_str =
>>> - on_conflict_action_strs[def->fields[col].nullable_action];
>>> +            on_conflict_action_strs[fields[col].nullable_action];
>>>            p = enc->encode_str(p, action_str, strlen(action_str));
>>>
>>>            p = enc->encode_str(p, "sort_order", 10);
>>> -        enum sort_order sort_order = pIndex->sort_order[i];
>>> +        enum sort_order sort_order = part->sort_order;
>>>            assert(sort_order < sort_order_MAX);
>>>            const char *sort_order_str = sort_order_strs[sort_order];
>>>            p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
>>>        }
>>> -    return (int)(p - base);
>>> +    return p - base;
>>>    }
>>>
>>>    /*
>>> diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
>>> index 5f73f026e..ca699ecd9 100644
>>> --- a/src/box/sql/analyze.c
>>> +++ b/src/box/sql/analyze.c
>>> @@ -849,7 +849,6 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>>>            int addrRewind;    /* Address of "OP_Rewind iIdxCur" */
>>>            int addrNextRow;    /* Address of "next_row:" */
>>>            const char *zIdxName;    /* Name of the index */
>>> -        int nColTest;    /* Number of columns to test for changes */
>>>
>>>            if (pOnlyIdx && pOnlyIdx != pIdx)
>>>                continue;
>>> @@ -860,9 +859,9 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>>>            if (IsPrimaryKeyIndex(pIdx)) {
>>>                zIdxName = pTab->def->name;
>>>            } else {
>>> -            zIdxName = pIdx->zName;
>>> +            zIdxName = pIdx->def->name;
>>>            }
>> 3. Oh, extra braces here. Hate such diffs too, dude...
>>   > -        nColTest = index_column_count(pIdx);
> Fxd.
>>> +        int nColTest = pIdx->def->key_def->part_count;
>>>
>>>            /* Populate the register containing the index name. */
>>>            sqlite3VdbeLoadString(v, regIdxname, zIdxName);
>>> @@ -917,7 +916,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>>>            sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
>>>                      space_ptr_reg);
>>>            sql_vdbe_set_p4_key_def(pParse, pIdx);
>>> -        VdbeComment((v, "%s", pIdx->zName));
>>> +        VdbeComment((v, "%s", pIdx->def->name));
>>>
>>>            /* Invoke the stat_init() function. The arguments are:
>>>             *
>>> @@ -969,7 +968,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>>>                 */
>>>                sqlite3VdbeAddOp0(v, OP_Goto);
>>>                addrNextRow = sqlite3VdbeCurrentAddr(v);
>>> -            if (nColTest == 1 && index_is_unique(pIdx)) {
>>> +            if (nColTest == 1 && pIdx->def->opts.is_unique) {
>>>                    /* For a single-column UNIQUE index, once we have
>>> found a non-NULL
>>>                     * row, we know that all the rest will be distinct, so
>>> skip
>>>                     * subsequent distinctness tests.
>>> @@ -978,13 +977,12 @@ analyzeOneTable(Parse * pParse,    /* Parser
>>> context */
>>>                              endDistinctTest);
>>>                    VdbeCoverage(v);
>>>                }
>>> -            for (i = 0; i < nColTest; i++) {
>>> -                uint32_t id;
>>> -                struct coll *coll =
>>> -                    sql_index_collation(pIdx, i, &id);
>>> +            struct key_part *part = pIdx->def->key_def->parts;
>>> +            for (i = 0; i < nColTest; ++i, ++part) {
>>> +                struct coll *coll = part->coll;
>>>                    sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
>>>                    sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
>>> -                          pIdx->aiColumn[i], regTemp);
>>> +                          part->fieldno, regTemp);
>>>                    aGotoChng[i] =
>>>                        sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
>>>                                  regPrev + i, (char *)coll,
>>> @@ -1006,7 +1004,8 @@ analyzeOneTable(Parse * pParse,    /* Parser
>>> context */
>>>                for (i = 0; i < nColTest; i++) {
>>>                    sqlite3VdbeJumpHere(v, aGotoChng[i]);
>>>                    sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
>>> -                          pIdx->aiColumn[i],
>>> +                          pIdx->def->key_def->
>>> +                              parts[i].fieldno,
>>>                              regPrev + i);
>>>                }
>>>                sqlite3VdbeResolveLabel(v, endDistinctTest);
>>> @@ -1022,15 +1021,14 @@ analyzeOneTable(Parse * pParse,    /* Parser
>>> context */
>>>             */
>>>            assert(regKey == (regStat4 + 2));
>>>            Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
>>> -        int j, k, regKeyStat;
>>> -        int nPkColumn = (int)index_column_count(pPk);
>>> -        regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
>>> -        for (j = 0; j < nPkColumn; j++) {
>>> -            k = pPk->aiColumn[j];
>>> -            assert(k >= 0 && k < (int)pTab->def->field_count);
>>> -            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
>>> -            VdbeComment((v, "%s",
>>> - pTab->def->fields[pPk->aiColumn[j]].name));
>>> +        int nPkColumn = (int) pPk->def->key_def->part_count;
>>> +        int regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
>> 4. Perhaps, Vlad would intend on tarantool-style names for new variables...Please, change.
> Discussed with Vlad verbally, fixed where necessary.
>>> +        for (int j = 0; j < nPkColumn; ++j) {
>>> +            int k = pPk->def->key_def->parts[j].fieldno;
>>> +            assert(k >= 0 && k < (int) pTab->def->field_count);
>>> +            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
>>> +                      regKeyStat + j);
>>> +            VdbeComment((v, "%s", pTab->def->fields[k].name));
>>>            }
>>>            sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
>>>                      nPkColumn, regKey);
>>> @@ -1146,7 +1144,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index *
>>> pOnlyIdx)
>>>        iStatCur = pParse->nTab;
>>>        pParse->nTab += 3;
>>>        if (pOnlyIdx) {
>>> -        openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
>>> +        openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
>>>        } else {
>>>            openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
>>>        }
>>> diff --git a/src/box/sql/build.c b/src/box/sql/build.c
>>> index 592c9a6fa..84e481de3 100644
>>> --- a/src/box/sql/build.c
>>> +++ b/src/box/sql/build.c
>>> @@ -241,6 +241,8 @@ static void
>>>    freeIndex(sqlite3 * db, Index * p)
>>>    {
>>>        sql_expr_delete(db, p->pPartIdxWhere, false);
>>> +     if (p->def != NULL)
>> 5. Extra space before tab.
> Fxd.
>>
>>> +        index_def_delete(p->def);
>>>        sqlite3DbFree(db, p->zColAff);
>>>        sqlite3DbFree(db, p);
>>>    }
>>> @@ -259,7 +261,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index *
>>> pIndex)
>>>
>>>        struct session *user_session = current_session();
>>>
>>> -    pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
>>> +    pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
>>> +                   pIndex->def->name, 0);
>>>        if (ALWAYS(pIndex)) {
>>>            if (pIndex->pTable->pIndex == pIndex) {
>>>                pIndex->pTable->pIndex = pIndex->pNext;
>>> @@ -376,7 +379,7 @@ deleteTable(sqlite3 * db, Table * pTable)
>>>            pNext = pIndex->pNext;
>>>            assert(pIndex->pSchema == pTable->pSchema);
>>>            if ((db == 0 || db->pnBytesFreed == 0)) {
>>> -            char *zName = pIndex->zName;
>>> +            char *zName = pIndex->def->name;
>>>                TESTONLY(Index *
>>>                     pOld =) sqlite3HashInsert(&pTable->idxHash,
>>>                                   zName, 0);
>>> @@ -1041,7 +1044,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>>>        Table *p = pParse->pNewTable;
>>>        if (p == NULL)
>>>            return;
>>> -    int i = p->def->field_count - 1;
>>> +    uint32_t i = p->def->field_count - 1;
>>>        sqlite3 *db = pParse->db;
>>>        char *zColl = sqlite3NameFromToken(db, pToken);
>>>        if (!zColl)
>>> @@ -1049,22 +1052,20 @@ sqlite3AddCollateType(Parse * pParse, Token *
>>> pToken)
>>>        uint32_t *id = &p->def->fields[i].coll_id;
>>>        p->aCol[i].coll = sql_get_coll_seq(pParse, zColl, id);
>>>        if (p->aCol[i].coll != NULL) {
>>> -        Index *pIdx;
>>>            /* If the column is declared as "<name> PRIMARY KEY COLLATE
>>> <type>",
>>>             * then an index may have been created on this column before the
>>>             * collation type was added. Correct this if it is the case.
>>>             */
>>> -        for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
>>> -            assert(pIdx->nColumn == 1);
>>> -            if (pIdx->aiColumn[0] == i) {
>>> -                id = &pIdx->coll_id_array[0];
>>> -                pIdx->coll_array[0] =
>>> +        for (struct Index *pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
>>> +            assert(pIdx->def->key_def->part_count == 1);
>>> +            if (pIdx->def->key_def->parts[0].fieldno == i) {
>>> +                pIdx->def->key_def->parts[0].coll_id = *id;
>>> +                pIdx->def->key_def->parts[0].coll =
>>>                        sql_column_collation(p->def, i, id);
>>>                }
>>>            }
>>> -    } else {
>>> -        sqlite3DbFree(db, zColl);
>>>        }
>>> +    sqlite3DbFree(db, zColl);
>>>    }
>>>
>>>    struct coll *
>>> @@ -1094,66 +1095,6 @@ sql_column_collation(struct space_def *def,
>>> uint32_t column, uint32_t *coll_id)
>>>        return space->format->fields[column].coll;
>>>    }
>>>
>>> -struct key_def*
>>> -sql_index_key_def(struct Index *idx)
>>> -{
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
>>> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -    assert(space != NULL);
>>> -    struct index *index = space_index(space, index_id);
>>> -    assert(index != NULL && index->def != NULL);
>>> -    return index->def->key_def;
>>> -}
>>> -
>>> -struct coll *
>>> -sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
>>> -{
>>> -    assert(idx != NULL);
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -
>>> -    assert(column < idx->nColumn);
>>> -    /*
>>> -     * If space is still under construction, or it is
>>> -     * an ephemeral space, then fetch collation from
>>> -     * SQL internal structure.
>>> -     */
>>> -    if (space == NULL) {
>>> -        assert(column < idx->nColumn);
>>> -        *coll_id = idx->coll_id_array[column];
>>> -        return idx->coll_array[column];
>>> -    }
>>> -
>>> -    struct key_def *key_def = sql_index_key_def(idx);
>>> -    assert(key_def != NULL && key_def->part_count >= column);
>>> -    *coll_id = key_def->parts[column].coll_id;
>>> -    return key_def->parts[column].coll;
>>> -}
>>> -
>>> -enum sort_order
>>> -sql_index_column_sort_order(Index *idx, uint32_t column)
>>> -{
>>> -    assert(idx != NULL);
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -
>>> -    assert(column < idx->nColumn);
>>> -    /*
>>> -     * If space is still under construction, or it is
>>> -     * an ephemeral space, then fetch collation from
>>> -     * SQL internal structure.
>>> -     */
>>> -    if (space == NULL) {
>>> -        assert(column < idx->nColumn);
>>> -        return idx->sort_order[column];
>>> -    }
>>> -
>>> -    struct key_def *key_def = sql_index_key_def(idx);
>>> -    assert(key_def != NULL && key_def->part_count >= column);
>>> -    return key_def->parts[column].sort_order;
>>> -}
>>> -
>>>    struct ExprList *
>>>    space_checks_expr_list(uint32_t space_id)
>>>    {
>>> @@ -1337,14 +1278,16 @@ createTableStmt(sqlite3 * db, Table * p)
>>>        return zStmt;
>>>    }
>>>
>>> -/* Return true if value x is found any of the first nCol entries of aiCol[]
>>> - */
>>>    static int
>>> -hasColumn(const i16 * aiCol, int nCol, int x)
>>> +hasColumn(const struct key_part *key_parts, int nCol, uint32_t fieldno)
>> 6. Please, find a better name for this function and make small new doxygen-stryle comment.
>>   >   {
> Done. Also changed return type to 'bool'
>>> -    while (nCol-- > 0)
>>> -        if (x == *(aiCol++))
>>> +    int i = 0;
>>> +    while (i < nCol) {
>>> +        if (fieldno == key_parts->fieldno)
>>>                return 1;
>>> +        key_parts++;
>>> +        i++;
>>> +    }
>>>        return 0;
>>>    }
>>>
>>> @@ -1364,13 +1307,12 @@ static void
>>>    convertToWithoutRowidTable(Parse * pParse, Table * pTab)
>>>    {
>>>        Index *pPk;
>>> -    int i, j;
>>>        sqlite3 *db = pParse->db;
>>>
>>>        /* Mark every PRIMARY KEY column as NOT NULL (except for imposter
>>> tables)
>>>         */
>>>        if (!db->init.imposterTable) {
>>> -        for (i = 0; i < (int)pTab->def->field_count; i++) {
>>> +        for (uint32_t i = 0; i < pTab->def->field_count; i++) {
>>>                if (pTab->aCol[i].is_primkey) {
>>>                    pTab->def->fields[i].nullable_action
>>>                        = ON_CONFLICT_ACTION_ABORT;
>>> @@ -1408,14 +1350,28 @@ convertToWithoutRowidTable(Parse * pParse, Table
>>> * pTab)
>>>             * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY
>>> KEY(a,b,c,d)".  Later
>>>             * code assumes the PRIMARY KEY contains no repeated columns.
>>>             */
>>> -        for (i = j = 1; i < pPk->nColumn; i++) {
>>> -            if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
>>> -                pPk->nColumn--;
>>> -            } else {
>>> -                pPk->aiColumn[j++] = pPk->aiColumn[i];
>>> +
>>> +        struct key_part *parts = pPk->def->key_def->parts;
>>> +        uint32_t part_count = pPk->def->key_def->part_count;
>>> +        uint32_t new_part_count = part_count;
>>> +
>>> +        for (uint32_t i = 1; i < part_count; i++) {
>>> +            if (hasColumn(parts, i, parts[i].fieldno)){
>> 7. Please, compound directly with 0 via != operator.
> Not actual.
>>
>>> +                new_part_count--;
>>> +                bool is_found = false;
>>> +                for (uint32_t j = i + 1; j < part_count; j++){
>>> +                    if (!(hasColumn(parts, j,
>> 8. same
> Not actual, function returns bool now
>>
>>> +                            parts[j].fieldno))) {
>>> +                        parts[i] = parts[j];
>>> +                        is_found = true;
>>> +                        break;
>>> +                    }
>>> +                }
>>> +                if (!(is_found))
>> 9. extra braces?
> Fixed.
>>
>>> +                    break;
>>>                }
>>>            }
>>> -        pPk->nColumn = j;
>>> +        pPk->def->key_def->part_count = new_part_count;
>>>        }
>>>        assert(pPk != 0);
>>>    }
>>> @@ -1497,7 +1453,7 @@ createIndex(Parse * pParse, Index * pIndex, int
>>> iSpaceId, int iIndexId,
>>>        }
>>>        sqlite3VdbeAddOp4(v,
>>>                  OP_String8, 0, iFirstCol + 2, 0,
>>> -              sqlite3DbStrDup(pParse->db, pIndex->zName),
>>> +              sqlite3DbStrDup(pParse->db, pIndex->def->name),
>>>                  P4_DYNAMIC);
>>>        sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
>>>                  P4_STATIC);
>>> @@ -1534,7 +1490,7 @@ makeIndexSchemaRecord(Parse * pParse,
>>>
>>>        sqlite3VdbeAddOp4(v,
>>>                  OP_String8, 0, iFirstCol, 0,
>>> -              sqlite3DbStrDup(pParse->db, pIndex->zName),
>>> +              sqlite3DbStrDup(pParse->db, pIndex->def->name),
>>>                  P4_DYNAMIC);
>>>
>>>        if (pParse->pNewTable) {
>>> @@ -2463,15 +2419,16 @@ sqlite3RefillIndex(Parse * pParse, Index *
>>> pIndex, int memRootPage)
>>>        } else {
>>>            tnum = pIndex->tnum;
>>>        }
>>> -    struct key_def *def = key_def_dup(sql_index_key_def(pIndex));
>>> +    struct key_def *def = key_def_dup(pIndex->def->key_def);
>>>        if (def == NULL) {
>>>            sqlite3OomFault(db);
>>>            return;
>>>        }
>>>        /* Open the sorter cursor if we are to use one. */
>>>        iSorter = pParse->nTab++;
>>> -    sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
>>> -              (char *)def, P4_KEYDEF);
>>> +    sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
>>> +              pIndex->def->key_def->part_count, (char *)def,
>>> +              P4_KEYDEF);
>>>
>>>        /* Open the table. Loop through all rows of the table, inserting index
>>>         * records into the sorter.
>>> @@ -2504,7 +2461,8 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex,
>>> int memRootPage)
>>>            sqlite3VdbeGoto(v, j2);
>>>            addr2 = sqlite3VdbeCurrentAddr(v);
>>>            sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
>>> -                     regRecord, pIndex->nColumn);
>>> +                     regRecord,
>>> +                     pIndex->def->key_def->part_count);
>>>            VdbeCoverage(v);
>>>            parser_emit_unique_constraint(pParse, ON_CONFLICT_ACTION_ABORT,
>>>                              pIndex);
>>> @@ -2542,24 +2500,13 @@ sqlite3AllocateIndexObject(sqlite3 * db,    /*
>>> Database connection */
>>>        int nByte;        /* Bytes of space for Index object + arrays */
>>>
>>>        nByte = ROUND8(sizeof(Index)) +            /* Index structure   */
>>> -        ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
>>> -        ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
>>> -        ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
>>> -           sizeof(i16) * nCol +            /* Index.aiColumn */
>>> -           sizeof(enum sort_order) * nCol); /* Index.sort_order */
>>> +        ROUND8(sizeof(LogEst) * (nCol + 1));    /* Index.aiRowLogEst */
>>>        p = sqlite3DbMallocZero(db, nByte + nExtra);
>>>        if (p) {
>>>            char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
>>> -        p->coll_array = (struct coll **)pExtra;
>>> -        pExtra += ROUND8(sizeof(struct coll **) * nCol);
>>> -        p->coll_id_array = (uint32_t *) pExtra;
>>> -        pExtra += ROUND8(sizeof(uint32_t) * nCol);
>>>            p->aiRowLogEst = (LogEst *) pExtra;
>>>            pExtra += sizeof(LogEst) * (nCol + 1);
>>> -        p->aiColumn = (i16 *) pExtra;
>>>            pExtra += sizeof(i16) * nCol;
>>> -        p->sort_order = (enum sort_order *) pExtra;
>>> -        p->nColumn = nCol;
>>>            *ppExtra = ((char *)p) + nByte;
>>>        }
>>>        return p;
>>> @@ -2648,18 +2595,136 @@ addIndexToTable(Index * pIndex, Table * pTab)
>>>        }
>>>    }
>>>
>>> -bool
>>> -index_is_unique(Index *idx)
>>> +static void
>>> +append_string_part(struct region *r, const char *str,
>>> +          size_t *total_sql_size, Parse *parse)
>>>    {
>>> -    assert(idx != NULL);
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
>>> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -    assert(space != NULL);
>>> -    struct index *tnt_index = space_index(space, index_id);
>>> -    assert(tnt_index != NULL);
>> 10. please, calculate size_t strlen and reuse it.
>>
>>> +    char * str_part = region_alloc(r, strlen(str> +    if (str_part == NULL){
>>> +        diag_set(OutOfMemory, strlen(str),
>>> +             "region_alloc", "str_part");
>>> +        parse->rc = SQL_TARANTOOL_ERROR;
>>> +        parse->nErr++;
>>> +    }
>> 11. You'll write to NULL ptr on region_alloc failure.
>> 12. I also don't like function interface: struct Parse already contain region that you specify as a first argument.
>> You better change it's signature to have only STRUCT Parser as a first argument or to  return -1 on failure (later, rc |= append_string_part... rc |= append_string_part.. if (rc != 0) ..)
>> 13. I also suggest to make a good doxygen-style comment.
>>
> Fixed, added comment.
>>> +    memcpy(str_part, str, strlen(str));
>>> +    *total_sql_size += strlen(str);
>>> +}
>>> +
>>> +static void
>>> +set_index_def(Parse *parse, Index *index, Table *table, uint32_t iid,
>>> +          const char *name, uint32_t name_len, int on_error,
>>> +          struct ExprList *expr_list, u8 idx_type)
>>> +{
>>> +    struct space_def *space_def = table->def;
>>> +    struct index_opts opts;
>>> +    index_opts_create(&opts);
>>> +    opts.is_unique = on_error != ON_CONFLICT_ACTION_NONE;
>>> +
>>> +    struct key_def *key_def = key_def_new(expr_list->nExpr);
>>> +    if (key_def == NULL) {
>>> +        parse->rc = SQL_TARANTOOL_ERROR;
>>> +        parse->nErr++;
>>> +        goto cleanup;
>>> +    }
>>> +
>>> +    /*
>>> +     * Build initial parts of SQL statement.
>>> +     */
>> 14. Please, make it a single line.
> Done.
>>
>>> +
>>> +    struct region *r = &parse->region;
>>> +    size_t total_sql_size = 0;
>>> +
>>> +    if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>>> +        append_string_part(r, "CREATE INDEX ", &total_sql_size,
>>> +                   parse);
>>> +        append_string_part(r, name, &total_sql_size, parse);
>>> +        append_string_part(r, " ON ", &total_sql_size, parse);
>>> +        append_string_part(r, space_def->name, &total_sql_size,
>>> +                   parse);
>>> +        append_string_part(r, " (", &total_sql_size, parse);
>>> +    }
>>> +
>>> +    for (int i = 0; i < expr_list->nExpr; i++) {
>>> +        Expr *expr = expr_list->a[i].pExpr;
>>> +        sql_resolve_self_reference(parse, table, NC_IdxExpr, expr, 0);
>>> +        if (parse->nErr > 0)
>>> +            goto cleanup;
>>> +
>>> +        Expr *column_expr = sqlite3ExprSkipCollate(expr);
>>> +        if (column_expr->op != TK_COLUMN) {
>>> +            sqlite3ErrorMsg(parse,
>>> +                    "functional indexes aren't supported "
>>> +                    "in the current version");
>>> +            goto cleanup;
>>> +        }
>>> +
>>> +        uint32_t fieldno = column_expr->iColumn;
>>> +        uint32_t coll_id;
>>> +        struct coll *coll;
>>> +        if (expr->op == TK_COLLATE) {
>>> +            coll = sql_get_coll_seq(parse, expr->u.zToken,
>>> +                        &coll_id);
>>> +
>>> +            if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>>> +                append_string_part(r, name,
>>> +                           &total_sql_size, parse);
>>> +                append_string_part(r, " COLLATE ",
>>> +                           &total_sql_size, parse);
>>> +                const char *coll_name = expr->u.zToken;
>>> +                append_string_part(r, coll_name,
>>> +                           &total_sql_size, parse);
>>> +                append_string_part(r, ", ",
>>> +                           &total_sql_size, parse);
>>> +            }
>>> +        } else {
>>> +            coll = sql_column_collation(space_def, fieldno,
>>> +                            &coll_id);
>>> +            if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>>> +                append_string_part(r, name,
>>> +                           &total_sql_size, parse);
>>> +                append_string_part(r, ", ",
>>> +                           &total_sql_size, parse);
>>> +            }
>>> +        }
>>>
>>> -    return tnt_index->def->opts.is_unique;
>>> +        /*
>>> +        * Tarantool: DESC indexes are not supported so far.
>>> +        * See gh-3016.
>>> +        */
>>> +        key_def_set_part(key_def, i, fieldno,
>>> +                 space_def->fields[fieldno].type,
>>> +                 space_def->fields[fieldno].nullable_action,
>>> +                 coll, coll_id, SORT_ORDER_ASC);
>>> +    }
>>> +
>>> +    if (parse->nErr > 0) {
>>> +        index->def = NULL;
>>> +        goto cleanup;
>>> +    }
>>> +
>>> +    if (idx_type == SQLITE_IDXTYPE_APPDEF) {
>>> +        memcpy(region_alloc(r, 1), "\0", 1);
>>> +        total_sql_size += 1;
>>> +        opts.sql = region_join(r, total_sql_size);
>>> +
>>> +        /*
>>> +         * fix last ", " with ")\0" to finish the statement.
>> 14, Please, start with capital letter. And also out-of-comment-type-margin.
> Fxd.
>>> +         */
>>> +        opts.sql[total_sql_size - 3] = ')';
>>> +        opts.sql[total_sql_size - 2] = '\0';
>> 15. Why don't you memcpy(&opts.sql[total_sql_size - 3], ")\0", 2); ? And why do you need two null-terminators? Could the string be shotter?
> Sorry, fixed it all.
>>> +    }
>>> +
>>> +    struct key_def *pk_key_def;
>>> +    if (idx_type == SQLITE_IDXTYPE_APPDEF)
>>> +        pk_key_def = table->pIndex->def->key_def;
>>> +    else
>>> +        pk_key_def = NULL;
>>> +
>>> +    index->def = index_def_new(space_def->id, iid, name, name_len,
>>> +                   TREE, &opts, key_def, pk_key_def);
>>> +    cleanup:
>>> +        if (key_def != NULL)
>>> +            key_def_delete(key_def);
>> 16. Invalid indent. cleanup shoud be at the beginnig of the line.
> Fxd.
>>
>>>    }
>>>
>>>    void
>>> @@ -2668,16 +2733,14 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>             int on_error, struct Token *start, struct Expr *where,
>>>             enum sort_order sort_order, bool if_not_exist, u8 idx_type)
>>>    {
>>> -    Table *pTab = 0;    /* Table to be indexed */
>>> -    Index *pIndex = 0;    /* The index to be created */
>>> -    char *zName = 0;    /* Name of the index */
>>> -    int nName;        /* Number of characters in zName */
>>> -    int i, j;
>>> +    Table *pTab = NULL;    /* Table to be indexed */
>>> +    Index *pIndex = NULL;    /* The index to be created */
>>> +    char *name = NULL;    /* Name of the index */
>>> +    int name_len;        /* Number of characters in zName */
>> 17. Please, write new code in Tarantool style. I mean snake_style for variables names and comments on the line before terminated with dot.  /* Table to be indexed. */
>> struct Table *table = NULL;
>>    >       DbFixer sFix;        /* For assigning database names to pTable */
> Fixed.
>>>        sqlite3 *db = parse->db;
>>> -    struct ExprList_item *col_listItem;    /* For looping over col_list */
>>>        int nExtra = 0;        /* Space allocated for zExtra[] */
>>> -    char *zExtra = 0;    /* Extra space after the Index object */
>>> +    char *zExtra = NULL;    /* Extra space after the Index object */
>>>        struct session *user_session = current_session();
>>>
>>>        if (db->mallocFailed || parse->nErr > 0) {
>>> @@ -2749,24 +2812,24 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>         * our own name.
>>>         */
>>>        if (token) {
>>> -        zName = sqlite3NameFromToken(db, token);
>>> -        if (zName == 0)
>>> +        name = sqlite3NameFromToken(db, token);
>>> +        if (name == NULL)
>>>                goto exit_create_index;
>>>            assert(token->z != 0);
>>>            if (!db->init.busy) {
>>> -            if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
>>> +            if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
>>>                    NULL) {
>>>                    sqlite3ErrorMsg(parse,
>>>                            "there is already a table named %s",
>>> -                        zName);
>>> +                        name);
>>>                    goto exit_create_index;
>>>                }
>>>            }
>>> -        if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
>>> +        if (sqlite3HashFind(&pTab->idxHash, name) != NULL) {
>>>                if (!if_not_exist) {
>>>                    sqlite3ErrorMsg(parse,
>>>                            "index %s.%s already exists",
>>> -                        pTab->def->name, zName);
>>> +                        pTab->def->name, name);
>>>                } else {
>>>                    assert(!db->init.busy);
>>>                }
>>> @@ -2778,10 +2841,9 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>            for (pLoop = pTab->pIndex, n = 1; pLoop;
>>>                 pLoop = pLoop->pNext, n++) {
>>>            }
>>> -        zName =
>>> -            sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
>>> -                   n);
>>> -        if (zName == 0) {
>>> +        name = sqlite3MPrintf(db, "sqlite_autoindex_%s_%d",
>>> +                      pTab->def->name, n);
>>> +        if (name == NULL) {
>>>                goto exit_create_index;
>>>            }
>> 18. Extra braces.
> Fixed.
>>>        }
>>> @@ -2807,31 +2869,27 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>            sqlite3ExprListCheckLength(parse, col_list, "index");
>>>        }
>>>
>>> -    /* Figure out how many bytes of space are required to store explicitly
>>> -     * specified collation sequence names.
>>> -     */
>>> -    for (i = 0; i < col_list->nExpr; i++) {
>>> -        Expr *pExpr = col_list->a[i].pExpr;
>>> -        assert(pExpr != 0);
>>> -        if (pExpr->op == TK_COLLATE) {
>>> -            nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
>>> -        }
>>> -    }
>>> -
>>>        /*
>>>         * Allocate the index structure.
>>>         */
>>> -    nName = sqlite3Strlen30(zName);
>>> +    name_len = sqlite3Strlen30(name);
>>> +
>>> +    if (name_len > BOX_NAME_MAX) {
>>> +        sqlite3ErrorMsg(parse,
>>> +                "%s.%s exceeds indexes' names length limit",
>>> +                pTab->def->name, name);
>>> +        goto exit_create_index;
>>> +    }
>>> +
>>> +    if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
>>> +        goto exit_create_index;
>>> +
>>>        pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
>>> -                        nName + nExtra + 1, &zExtra);
>>> +                        name_len + nExtra + 1, &zExtra);
>>>        if (db->mallocFailed) {
>>>            goto exit_create_index;
>>>        }
>>>        assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
>>> -    assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
>>> -    pIndex->zName = zExtra;
>>> -    zExtra += nName + 1;
>>> -    memcpy(pIndex->zName, zName, nName + 1);
>>>        pIndex->pTable = pTab;
>>>        pIndex->onError = (u8) on_error;
>>>        /*
>>> @@ -2846,7 +2904,6 @@ sql_create_index(struct Parse *parse, struct Token
>>> *token,
>>>            pIndex->idxType = idx_type;
>>>        }
>>>        pIndex->pSchema = db->pSchema;
>>> -    pIndex->nColumn = col_list->nExpr;
>>>        /* Tarantool have access to each column by any index */
>>>        if (where) {
>>>            sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
>>> @@ -2855,59 +2912,27 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>            where = NULL;
>>>        }
>>>
>>> -    /* Analyze the list of expressions that form the terms of the index and
>>> -     * report any errors.  In the common case where the expression is
>>> exactly
>>> -     * a table column, store that column in aiColumn[].
>>> -     *
>>> +    /*
>>>         * TODO: Issue a warning if two or more columns of the index are
>>> identical.
>>>         * TODO: Issue a warning if the table primary key is used as part
>>> of the
>>>         * index key.
>>>         */
>>> -    for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
>>> -         i++, col_listItem++) {
>>> -        Expr *pCExpr;    /* The i-th index expression */
>>> -        sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
>>> -                       col_listItem->pExpr, NULL);
>>> -        if (parse->nErr > 0)
>>> -            goto exit_create_index;
>>> -        pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
>>> -        if (pCExpr->op != TK_COLUMN) {
>>> -            sqlite3ErrorMsg(parse,
>>> -                    "functional indexes aren't supported "
>>> -                    "in the current version");
>>> -            goto exit_create_index;
>>> -        } else {
>>> -            j = pCExpr->iColumn;
>>> -            assert(j <= 0x7fff);
>>> -            if (j < 0) {
>>> -                j = pTab->iPKey;
>>> -            }
>>> -            pIndex->aiColumn[i] = (i16) j;
>>> -        }
>>> -        struct coll *coll;
>>> -        uint32_t id;
>>> -        if (col_listItem->pExpr->op == TK_COLLATE) {
>>> -            const char *coll_name = col_listItem->pExpr->u.zToken;
>>> -            coll = sql_get_coll_seq(parse, coll_name, &id);
>>>
>>> -            if (coll == NULL &&
>>> -                sqlite3StrICmp(coll_name, "binary") != 0) {
>>> -                goto exit_create_index;
>>> -            }
>>> -        } else if (j >= 0) {
>>> -            coll = sql_column_collation(pTab->def, j, &id);
>>> -        } else {
>>> -            id = COLL_NONE;
>>> -            coll = NULL;
>>> -        }
>>> -        pIndex->coll_array[i] = coll;
>>> -        pIndex->coll_id_array[i] = id;
>>> +    uint32_t max_iid = 0;
>>> +    for (Index *index = pTab->pIndex; index; index = index->pNext) {
>>> +        max_iid = max_iid > index->def->iid ?
>>> +              max_iid :
>>> +              index->def->iid + 1;
>> 19. Not like this...
> Fixed.
>>
>>> +    }
>>>
>>> -        /* Tarantool: DESC indexes are not supported so far.
>>> -         * See gh-3016.
>>> -         */
>>> -        pIndex->sort_order[i] = SORT_ORDER_ASC;
>>> +    set_index_def(parse, pIndex, pTab, max_iid, name, name_len, on_error,
>>> +              col_list, idx_type);
>>> +
>>> +    if (pIndex->def == NULL ||
>>> +        !index_def_is_valid(pIndex->def, pTab->def->name)) {
>>> +        goto exit_create_index;
>>>        }
>> 20. Extra braces.
> Fixed.
>>
>>> +
>>>        if (pTab == parse->pNewTable) {
>>>            /* This routine has been called to create an automatic index as a
>>>             * result of a PRIMARY KEY or UNIQUE clause on a column
>>> definition, or
>>> @@ -2932,25 +2957,27 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>             */
>>>            Index *pIdx;
>>>            for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
>>> -            int k;
>>> +            uint32_t k;
>>>                assert(IsUniqueIndex(pIdx));
>>>                assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
>>>                assert(IsUniqueIndex(pIndex));
>>>
>>> -            if (pIdx->nColumn != pIndex->nColumn)
>>> +            if (pIdx->def->key_def->part_count !=
>>> +                pIndex->def->key_def->part_count) {
>>>                    continue;
>> 21. same
> Fixed.
>>> -            for (k = 0; k < pIdx->nColumn; k++) {
>>> -                assert(pIdx->aiColumn[k] >= 0);
>>> -                if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
>>> +            }
>>> +            for (k = 0; k < pIdx->def->key_def->part_count; k++) {
>>> +                if (pIdx->def->key_def->parts[k].fieldno !=
>>> + pIndex->def->key_def->parts[k].fieldno) {
>>>                        break;
>>> +                }
>> 22. same
> Fixed.
>>
>>>                    struct coll *coll1, *coll2;
>>> -                uint32_t id;
>>> -                coll1 = sql_index_collation(pIdx, k, &id);
>>> -                coll2 = sql_index_collation(pIndex, k, &id);
>>> +                coll1 = pIdx->def->key_def->parts[k].coll;
>>> +                coll2 = pIndex->def->key_def->parts[k].coll;
>>>                    if (coll1 != coll2)
>>>                        break;
>>>                }
>>> -            if (k == pIdx->nColumn) {
>>> +            if (k == pIdx->def->key_def->part_count) {
>>>                    if (pIdx->onError != pIndex->onError) {
>>>                        /* This constraint creates the same index as a
>>> previous
>>>                         * constraint specified somewhere in the CREATE
>>> TABLE statement.
>>> @@ -2984,7 +3011,7 @@ sql_create_index(struct Parse *parse, struct Token
>>> *token,
>>>        assert(parse->nErr == 0);
>>>        if (db->init.busy) {
>>>            Index *p;
>>> -        p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
>>> +        p = sqlite3HashInsert(&pTab->idxHash, pIndex->def->name, pIndex);
>>>            if (p) {
>>>                assert(p == pIndex);    /* Malloc must have failed */
>>>                sqlite3OomFault(db);
>>> @@ -3082,44 +3109,7 @@ sql_create_index(struct Parse *parse, struct
>>> Token *token,
>>>        sql_expr_delete(db, where, false);
>>>        sql_expr_list_delete(db, col_list);
>>>        sqlite3SrcListDelete(db, tbl_name);
>>> -    sqlite3DbFree(db, zName);
>>> -}
>>> -
>>> -/**
>>> - * Return number of columns in given index.
>>> - * If space is ephemeral, use internal
>>> - * SQL structure to fetch the value.
>>> - */
>>> -uint32_t
>>> -index_column_count(const Index *idx)
>>> -{
>>> -    assert(idx != NULL);
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -    /* It is impossible to find an ephemeral space by id. */
>>> -    if (space == NULL)
>>> -        return idx->nColumn;
>>> -
>>> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
>>> -    struct index *index = space_index(space, index_id);
>>> -    assert(index != NULL);
>>> -    return index->def->key_def->part_count;
>>> -}
>>> -
>>> -/** Return true if given index is unique and not nullable. */
>>> -bool
>>> -index_is_unique_not_null(const Index *idx)
>>> -{
>>> -    assert(idx != NULL);
>>> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
>>> -    struct space *space = space_by_id(space_id);
>>> -    assert(space != NULL);
>>> -
>>> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
>>> -    struct index *index = space_index(space, index_id);
>>> -    assert(index != NULL);
>>> -    return (index->def->opts.is_unique &&
>>> -        !index->def->key_def->is_nullable);
>>> +    sqlite3DbFree(db, name);
>>>    }
>>>
>>>    void
>>> @@ -3745,9 +3735,9 @@ parser_emit_unique_constraint(struct Parse *parser,
>>>        const struct space_def *def = index->pTable->def;
>>>        StrAccum err_accum;
>>>        sqlite3StrAccumInit(&err_accum, parser->db, 0, 0, 200);
>>> -    for (int j = 0; j < index->nColumn; ++j) {
>>> -        assert(index->aiColumn[j] >= 0);
>>> -        const char *col_name = def->fields[index->aiColumn[j]].name;
>>> +    struct key_part *part = index->def->key_def->parts;
>>> +    for (uint32_t j = 0; j < index->def->key_def->part_count; ++j,
>>> part++) {
>>> +        const char *col_name = def->fields[part->fieldno].name;
>>>            if (j != 0)
>>>                sqlite3StrAccumAppend(&err_accum, ", ", 2);
>>>            sqlite3XPrintf(&err_accum, "%s.%s", def->name, col_name);
>>> @@ -3768,11 +3758,11 @@ static bool
>>>    collationMatch(struct coll *coll, struct Index *index)
>>>    {
>>>        assert(coll != NULL);
>>> -    for (int i = 0; i < index->nColumn; i++) {
>>> -        uint32_t id;
>>> -        struct coll *idx_coll = sql_index_collation(index, i, &id);
>>> -        assert(idx_coll != 0 || index->aiColumn[i] < 0);
>>> -        if (index->aiColumn[i] >= 0 && coll == idx_coll)
>>> +    struct key_part *part = index->def->key_def->parts;
>>> +    for (uint32_t i = 0; i < index->def->key_def->part_count; i++,
>>> part++) {
>>> +        struct coll *idx_coll = part->coll;
>>> +        assert(idx_coll != NULL);
>>> +        if (coll == idx_coll)
>>>                return true;
>>>        }
>>>        return false;
>>> diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
>>> index 8b13f6077..931a15a60 100644
>>> --- a/src/box/sql/delete.c
>>> +++ b/src/box/sql/delete.c
>>> @@ -269,11 +269,12 @@ sql_table_delete_from(struct Parse *parse, struct
>>> SrcList *tab_list,
>>>
>>>            /* Extract the primary key for the current row */
>>>            if (!is_view) {
>>> -            for (int i = 0; i < pk_len; i++) {
>>> +            struct key_part *part = pk_def->parts;
>>> +            for (int i = 0; i < pk_len; i++, part++) {
>>>                    struct space_def *def = space->def;
>>>                    sqlite3ExprCodeGetColumnOfTable(v, def,
>>>                                    tab_cursor,
>>> -                                pk_def->parts[i].fieldno,
>>> +                                part->fieldno,
>>>                                    reg_pk + i);
>>>                }
>>>            } else {
>>> @@ -569,13 +570,14 @@ sql_generate_index_key(struct Parse *parse, struct
>>> Index *index, int cursor,
>>>                *part_idx_label = 0;
>>>            }
>>>        }
>>> -    int col_cnt = index_column_count(index);
>>> +    int col_cnt = index->def->key_def->part_count;
>>>        int reg_base = sqlite3GetTempRange(parse, col_cnt);
>>>        if (prev != NULL && (reg_base != reg_prev ||
>>>                     prev->pPartIdxWhere != NULL))
>>>            prev = NULL;
>>>        for (int j = 0; j < col_cnt; j++) {
>>> -        if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]) {
>>> +        if (prev != NULL && prev->def->key_def->parts[j].fieldno ==
>>> + index->def->key_def->parts[j].fieldno) {
>> 23. Extra whitespace in tab area (visible in git diff).
> Fixed.
>>>                /*
>>>                 * This column was already computed by the
>>>                 * previous index.
>>> diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
>>> index f03c7a3cd..b752084d4 100644
>>> --- a/src/box/sql/expr.c
>>> +++ b/src/box/sql/expr.c
>>> @@ -2423,20 +2423,24 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing
>>> context */
>>>                     pIdx = pIdx->pNext) {
>>>                    Bitmask colUsed; /* Columns of the index used */
>>>                    Bitmask mCol;    /* Mask for the current column */
>>> -                if (pIdx->nColumn < nExpr)
>>> +                uint32_t part_count = pIdx->def->key_def->
>>> +                    part_count;
>>> +                struct key_part *parts = pIdx->def->key_def->
>>> +                    parts;
>>> +                if ((int)part_count < nExpr)
>>>                        continue;
>>>                    /* Maximum nColumn is BMS-2, not BMS-1, so that we can
>>> compute
>>>                     * BITMASK(nExpr) without overflowing
>>>                     */
>>> -                testcase(pIdx->nColumn == BMS - 2);
>>> -                testcase(pIdx->nColumn == BMS - 1);
>>> -                if (pIdx->nColumn >= BMS - 1)
>>> +                testcase(part_count == BMS - 2);
>>> +                testcase(>part_count == BMS - 1);
>> 24. What the testcase is? Why ">part_count" is a valid construction? Drop them all.
> Fxd.
>>
>>> +                if (part_count >= BMS - 1)
>>>                        continue;
>>>                    if (mustBeUnique) {
>>> -                    if (pIdx->nColumn > nExpr
>>> -                        || (pIdx->nColumn > nExpr
>>> -                        && !index_is_unique(pIdx))) {
>>> -                            continue;    /* This index is not unique
>>> over the IN RHS columns */
>>> +                    if ((int)part_count > nExpr
>>> +                        || !pIdx->def->opts.is_unique) {
>>> +                        /* This index is not unique over the IN RHS
>>> columns */
>>> +                        continue;
>>>                        }
>>>                    }
>>>
>>> @@ -2450,12 +2454,13 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing
>>> context */
>>>                        int j;
>>>
>>>                        for (j = 0; j < nExpr; j++) {
>>> -                        if (pIdx->aiColumn[j] !=
>>> -                            pRhs->iColumn) {
>>> +                        if ((int) parts[j].fieldno
>>> +                            != pRhs->iColumn) {
>>>                                continue;
>>>                            }
>>> -                        struct coll *idx_coll;
>>> -                        idx_coll = sql_index_collation(pIdx, j, &id);
>>> +
>>> +                        struct coll *idx_coll =
>>> +                                 parts[j].coll;
>>>                            if (pReq != NULL &&
>>>                                pReq != idx_coll) {
>>>                                continue;
>>> @@ -2484,18 +2489,17 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing
>>> context */
>>>                                  0, 0, 0,
>>>                                  sqlite3MPrintf(db,
>>>                                  "USING INDEX %s FOR IN-OPERATOR",
>>> -                              pIdx->zName),
>>> +                              pIdx->def->name),
>>>                                  P4_DYNAMIC);
>>>                        struct space *space =
>>> space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
>>>                        vdbe_emit_open_cursor(pParse, iTab,
>>>                                      pIdx->tnum, space);
>>> -                    VdbeComment((v, "%s", pIdx->zName));
>>> +                    VdbeComment((v, "%s", pIdx->def->name));
>>>                        assert(IN_INDEX_INDEX_DESC ==
>>>                               IN_INDEX_INDEX_ASC + 1);
>>>                        eType = IN_INDEX_INDEX_ASC +
>>> -                        sql_index_column_sort_order(pIdx,
>>> -                                        0);
>>> +                        parts[0].sort_order;
>>>
>>>                        if (prRhsHasNull) {
>>>    #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
>>> @@ -2517,7 +2521,7 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing
>>> context */
>>>                                /* Tarantool: Check for null is performed
>>> on first key of the index.  */
>>>                                sqlite3SetHasNullFlag(v,
>>>                                              iTab,
>>> -                                          pIdx->aiColumn[0],
>>> +                                          parts[0].fieldno,
>>>                                              *prRhsHasNull);
>>>                            }
>>>                        }
>>> @@ -3148,12 +3152,12 @@ sqlite3ExprCodeIN(Parse * pParse,    /* Parsing
>>> and code generating context */
>>>            struct Index *pk = sqlite3PrimaryKeyIndex(tab);
>>>            assert(pk);
>>>
>>> +        uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
>>>            enum affinity_type affinity =
>>> -            tab->def->fields[pk->aiColumn[0]].affinity;
>>> -        if (pk->nColumn == 1
>>> +            tab->def->fields[fieldno].affinity;
>>> +        if (pk->def->key_def->part_count == 1
>>>                && affinity == AFFINITY_INTEGER
>>> -            && pk->aiColumn[0] < nVector) {
>>> -            int reg_pk = rLhs + pk->aiColumn[0];
>>> +            && (int) fieldno < nVector) { int reg_pk = rLhs + (int)fieldno;
>>>                sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
>>>            }
>>>        }
>>> @@ -3485,7 +3489,7 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,
>>> /* The parsing context */
>>>                       int regOut    /* Store the index column value in
>>> this register */
>>>        )
>>>    {
>>> -    i16 iTabCol = pIdx->aiColumn[iIdxCol];
>>> +    i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
>>>        sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
>>>                        iTabCur, iTabCol, regOut);
>>>    }
>>> diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
>>> index e3fff37fe..c5fec3161 100644
>>> --- a/src/box/sql/fkey.c
>>> +++ b/src/box/sql/fkey.c
>>> @@ -257,8 +257,8 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse
>>> context to store any error in */
>>>        }
>>>
>>>        for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
>>> -        int nIdxCol = index_column_count(pIdx);
>>> -        if (nIdxCol == nCol && index_is_unique(pIdx)
>>> +        int nIdxCol = pIdx->def->key_def->part_count;
>> 25. same as 17
> Fxd.
>>
>>> +        if (nIdxCol == nCol && pIdx->def->opts.is_unique
>>>                && pIdx->pPartIdxWhere == 0) {
>>>                /* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the
>>> right number
>>>                 * of columns. If each indexed column corresponds to a
>>> foreign key
>>> @@ -287,8 +287,10 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse
>>> context to store any error in */
>>>                     * the default collation sequences for each column.
>>>                     */
>>>                    int i, j;
>>> -                for (i = 0; i < nCol; i++) {
>>> -                    i16 iCol = pIdx->aiColumn[i];    /* Index of column
>>> in parent tbl */
>> 26. Move comment to prev. line, terminate it with dot.
> Fixed.
>>
>>> +                struct key_part *part =
>>> +                    pIdx->def->key_def->parts;
>>> +                for (i = 0; i < nCol; i++, part++) {
>>> +                    i16 iCol = (int) part->fieldno;    /* Index of
>>> column in parent tbl */
>>>                        char *zIdxCol;    /* Name of indexed column */
>>>
>>>                        if (iCol < 0)
>>> @@ -303,9 +305,7 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse
>>> context to store any error in */
>>>                        def_coll = sql_column_collation(pParent->def,
>>>                                        iCol,
>>>                                        &id);
>>> -                    struct coll *coll =
>>> -                        sql_index_collation(pIdx, i,
>>> -                                    &id);
>>> +                    struct coll *coll = part->coll;
>>>                        if (def_coll != coll)
>>>                            break;
>>>
>>> @@ -465,13 +465,15 @@ fkLookupParent(Parse * pParse,    /* Parse context */
>>>                    for (i = 0; i < nCol; i++) {
>>>                        int iChild = aiCol[i] + 1 + regData;
>>>                        int iParent =
>>> -                        pIdx->aiColumn[i] + 1 + regData;
>>> -                    assert(pIdx->aiColumn[i] >= 0);
>>> +                        (int) pIdx->def->key_def->parts[i].fieldno
>>> +                        + 1 + regData;
>>>                        assert(aiCol[i] != pTab->iPKey);
>>> -                    if (pIdx->aiColumn[i] == pTab->iPKey) {
>>> +                    if ((int)pIdx->def->key_def->
>>> +                        parts[i].fieldno == pTab->iPKey) {
>>>                            /* The parent key is a composite key that
>>> includes the IPK column */
>>>                            iParent = regData;
>>>                        }
>>> +
>>>                        sqlite3VdbeAddOp3(v, OP_Ne, iChild,
>>>                                  iJump, iParent);
>>>                        VdbeCoverage(v);
>>> @@ -623,7 +625,7 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>>>        Vdbe *v = sqlite3GetVdbe(pParse);
>>>
>>>        assert(pIdx == 0 || pIdx->pTable == pTab);
>>> -    assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
>>> +    assert(pIdx == 0 || (int) pIdx->def->key_def->part_count ==
>>> pFKey->nCol);
>> 27. pIdx == NULL
> Fxd.
>>
>>>        assert(pIdx != 0);
>>>
>>>        if (nIncr < 0) {
>>> @@ -647,7 +649,8 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>>>            i16 iCol;    /* Index of column in child table */
>>>            const char *zCol;    /* Name of column in child table */
>>>
>>> -        iCol = pIdx ? pIdx->aiColumn[i] : -1;
>>> +        iCol = pIdx != NULL ?
>>> +               pIdx->def->key_def->parts[i].fieldno : -1;
>>>            pLeft = exprTableRegister(pParse, pTab, regData, iCol);
>>>            iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
>>>            assert(iCol >= 0);
>>> @@ -672,10 +675,9 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>>>            Expr *pEq, *pAll = 0;
>>>            Index *pPk = sqlite3PrimaryKeyIndex(pTab);
>>>            assert(pIdx != 0);
>>> -        int col_count = index_column_count(pPk);
>>> +        int col_count = pPk->def->key_def->part_count;
>>>            for (i = 0; i < col_count; i++) {
>>> -            i16 iCol = pIdx->aiColumn[i];
>>> -            assert(iCol >= 0);
>>> +            i16 iCol = (int) pIdx->def->key_def->parts[i].fieldno;
>> 28. same as 17.
> Fixed.
>>>                pLeft = exprTableRegister(pParse, pTab, regData, iCol);
>>>                pRight =
>>>                    exprTableColumn(db, pTab->def,
>>> @@ -982,7 +984,6 @@ sqlite3FkCheck(Parse * pParse,    /* Parse context */
>>>                if (aiCol[i] == pTab->iPKey) {
>>>                    aiCol[i] = -1;
>>>                }
>>> -            assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
>>>            }
>>>
>>>            pParse->nTab++;
>>> @@ -1116,10 +1117,10 @@ sqlite3FkOldmask(Parse * pParse,    /* Parse
>>> context */
>>>                Index *pIdx = 0;
>>>                sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
>>>                if (pIdx) {
>>> -                int nIdxCol = index_column_count(pIdx);
>>> +                int nIdxCol = pIdx->def->key_def->part_count;
>> 29. same as 17.
> Fixed.
>>>                    for (i = 0; i < nIdxCol; i++) {
>>> -                    assert(pIdx->aiColumn[i] >= 0);
>>> -                    mask |= COLUMN_MASK(pIdx->aiColumn[i]);
>>> +                    mask |= COLUMN_MASK(pIdx->def->
>>> +                        key_def->parts[i].fieldno);
>>>                    }
>>>                }
>>>            }
>>> @@ -1254,11 +1255,12 @@ fkActionTrigger(Parse * pParse,    /* Parse
>>> context */
>>>                       || (pTab->iPKey >= 0
>>>                       && pTab->iPKey <
>>>                          (int)pTab->def->field_count));
>>> -            assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
>>> +
>>> +            uint32_t fieldno = pIdx != NULL ?
>>> + pIdx->def->key_def->parts[i].fieldno :
>>> +                       (uint32_t)pTab->iPKey;
>>>                sqlite3TokenInit(&tToCol,
>>> -                     pTab->def->fields[pIdx ? pIdx->
>>> -                            aiColumn[i] : pTab->iPKey].
>>> -                     name);
>>> +                     pTab->def->fields[fieldno].name);
>>>                sqlite3TokenInit(&tFromCol,
>>>                         pFKey->pFrom->def->fields[
>>>                            iFromCol].name);
>>> diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
>>> index 70555c3ec..b535763e9 100644
>>> --- a/src/box/sql/insert.c
>>> +++ b/src/box/sql/insert.c
>>> @@ -90,14 +90,14 @@ sqlite3IndexAffinityStr(sqlite3 *db, Index *index)
>>>         * sqliteDeleteIndex() when the Index structure itself is
>>>         * cleaned up.
>>>         */
>>> -    int column_count = index_column_count(index);
>>> +    int column_count = index->def->key_def->part_count;
>>>        index->zColAff = (char *) sqlite3DbMallocRaw(0, column_count + 1);
>>>        if (index->zColAff == NULL) {
>>>            sqlite3OomFault(db);
>>>            return NULL;
>>>        }
>>>        for (int n = 0; n < column_count; n++) {
>>> -        uint16_t x = index->aiColumn[n];
>>> +        uint16_t x = index->def->key_def->parts[n].fieldno;
>>>            index->zColAff[n] = index->pTable->def->fields[x].affinity;
>>>        }
>>>        index->zColAff[column_count] = 0;
>>> @@ -647,7 +647,7 @@ sqlite3Insert(Parse * pParse,    /* Parser context */
>>>                 pIdx = pIdx->pNext, i++) {
>>>                assert(pIdx);
>>>                aRegIdx[i] = ++pParse->nMem;
>>> -            pParse->nMem += index_column_count(pIdx);
>>> +            pParse->nMem += pIdx->def->key_def->part_count;
>>>            }
>>>        }
>>>
>>> @@ -1089,7 +1089,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,
>>>       /* The parser context */
>>>        nCol = def->field_count;
>>>
>>>        pPk = sqlite3PrimaryKeyIndex(pTab);
>>> -    nPkField = index_column_count(pPk);
>>> +    nPkField = pPk->def->key_def->part_count;
>>>
>>>        /* Record that this module has started */
>>>        VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
>>> @@ -1253,10 +1253,10 @@ sqlite3GenerateConstraintChecks(Parse *
>>> pParse,        /* The parser context */
>>>             * the insert or update.  Store that record in the aRegIdx[ix]
>>> register
>>>             */
>>>            regIdx = aRegIdx[ix] + 1;
>>> -        int nIdxCol = (int) index_column_count(pIdx);
>>> +        int nIdxCol = (int) pIdx->def->key_def->part_count;
>> 30. same as 17
> Fixed.
>>
>>>            if (uniqueByteCodeNeeded) {
>>>                for (i = 0; i < nIdxCol; ++i) {
>>> -                int fieldno = pIdx->aiColumn[i];
>>> +                int fieldno = pIdx->def->key_def->parts[i].fieldno;
>>>                    int reg;
>>>                    /*
>>>                     * OP_SCopy copies value in
>>> @@ -1284,8 +1284,12 @@ sqlite3GenerateConstraintChecks(Parse *
>>> pParse,        /* The parser context */
>>>                /* If PK is marked as INTEGER, use it as strict type,
>>>                 * not as affinity. Emit code for type checking */
>>>                if (nIdxCol == 1) {
>>> -                reg_pk = regNewData + 1 + pIdx->aiColumn[0];
>>> -                if (pTab->zColAff[pIdx->aiColumn[0]] ==
>>> +                reg_pk = regNewData + 1 +
>>> + pIdx->def->key_def->parts[0].fieldno;
>>> +
>>> +                int fieldno = (int) pIdx->def->key_def->
>>> +                    parts[0].fieldno;
>>> +                if (pTab->zColAff[fieldno] ==
>>>                        AFFINITY_INTEGER) {
>>>                        int skip_if_null = sqlite3VdbeMakeLabel(v);
>>>                        if ((pTab->tabFlags & TF_Autoincrement) != 0) {
>>> @@ -1303,7 +1307,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,
>>>       /* The parser context */
>>>
>>>                sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
>>>                          def->field_count, aRegIdx[ix]);
>>> -            VdbeComment((v, "for %s", pIdx->zName));
>>> +            VdbeComment((v, "for %s", pIdx->def->name));
>>>            }
>>>
>>>            /* In an UPDATE operation, if this index is the PRIMARY KEY
>>> @@ -1391,7 +1395,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,
>>>       /* The parser context */
>>>            if (uniqueByteCodeNeeded) {
>>>                sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
>>>                             addrUniqueOk, regIdx,
>>> -                         index_column_count(pIdx));
>>> + pIdx->def->key_def->part_count);
>>>            }
>>>            VdbeCoverage(v);
>>>
>>> @@ -1401,14 +1405,13 @@ sqlite3GenerateConstraintChecks(Parse *
>>> pParse,        /* The parser context */
>>>                                     nPkField);
>>>            if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
>>>                int x;
>>> -            int nPkCol = index_column_count(pPk);
>>> +            int nPkCol = pPk->def->key_def->part_count;
>> 31. same as 17
> Fixed.
>>
>>>                /* Extract the PRIMARY KEY from the end of the index entry and
>>>                 * store it in registers regR..regR+nPk-1
>>>                 */
>>>                if (pIdx != pPk) {
>>>                    for (i = 0; i < nPkCol; i++) {
>>> -                    assert(pPk->aiColumn[i] >= 0);
>>> -                    x = pPk->aiColumn[i];
>>> +                    x = pPk->def->key_def->parts[i].fieldno;
>>>                        sqlite3VdbeAddOp3(v, OP_Column,
>>>                                  iThisCur, x, regR + i);
>>>                        VdbeComment((v, "%s.%s", def->name,
>>> @@ -1430,10 +1433,10 @@ sqlite3GenerateConstraintChecks(Parse *
>>> pParse,        /* The parser context */
>>>                              regIdx : regR);
>>>
>>>                    for (i = 0; i < nPkCol; i++) {
>>> -                    uint32_t id;
>>> -                    char *p4 = (char *)sql_index_collation(pPk, i, &id);
>>> -                    x = pPk->aiColumn[i];
>>> -                    assert(x >= 0);
>>> +                    char *p4 = (char *) pPk->def->key_def->parts[i].coll;
>>> +                    x = pPk->def->key_def->parts[i].fieldno;
>>> +                    if (pPk->tnum==0)
>>> +                        x = -1;
>>>                        if (i == (nPkCol - 1)) {
>>>                            addrJump = addrUniqueOk;
>>>                            op = OP_Eq;
>>> @@ -1610,8 +1613,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,    /*
>>> Parsing context */
>>>                IsPrimaryKeyIndex(pIdx) ||        /* Condition 2 */
>>>                sqlite3FkReferences(pTab) ||    /* Condition 3 */
>>>                /* Condition 4 */
>>> -            (index_is_unique(pIdx) && pIdx->onError !=
>>> -             ON_CONFLICT_ACTION_DEFAULT &&
>>> +            (pIdx->def->opts.is_unique &&
>>> +             pIdx->onError != ON_CONFLICT_ACTION_DEFAULT &&
>>>                 /* Condition 4.1 */
>>>                 pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
>>>                 /* Condition 4.2 */
>>> @@ -1629,7 +1632,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,    /*
>>> Parsing context */
>>>                              space_ptr_reg);
>>>                    sql_vdbe_set_p4_key_def(pParse, pIdx);
>>>                    sqlite3VdbeChangeP5(v, p5);
>>> -                VdbeComment((v, "%s", pIdx->zName));
>>> +                VdbeComment((v, "%s", pIdx->def->name));
>>>                }
>>>            }
>>>        }
>>> @@ -1666,27 +1669,23 @@ xferCompatibleIndex(Index * pDest, Index * pSrc)
>>>        uint32_t i;
>>>        assert(pDest && pSrc);
>>>        assert(pDest->pTable != pSrc->pTable);
>>> -    uint32_t nDestCol = index_column_count(pDest);
>>> -    uint32_t nSrcCol = index_column_count(pSrc);
>>> +    uint32_t nDestCol = pDest->def->key_def->part_count;
>>> +    uint32_t nSrcCol = pSrc->def->key_def->part_count;
>> 32. same as 17
> Fixed.
>>
>>>        if (nDestCol != nSrcCol) {
>>>            return 0;    /* Different number of columns */
>>>        }
>>>        if (pDest->onError != pSrc->onError) {
>>>            return 0;    /* Different conflict resolution strategies */
>>>        }
>>> -    for (i = 0; i < nSrcCol; i++) {
>>> -        if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
>>> +    struct key_part *src_part = pSrc->def->key_def->parts;
>>> +    struct key_part *dest_part = pDest->def->key_def->parts;
>>> +    for (i = 0; i < nSrcCol; i++, src_part++, dest_part++) {
>>> +        if (src_part->fieldno != dest_part->fieldno)
>>>                return 0;    /* Different columns indexed */
>>> -        }
>>> -        if (sql_index_column_sort_order(pSrc, i) !=
>>> -            sql_index_column_sort_order(pDest, i)) {
>>> +        if (src_part->sort_order != dest_part->sort_order)
>>>                return 0;    /* Different sort orders */
>>> -        }
>>> -        uint32_t id;
>>> -        if (sql_index_collation(pSrc, i, &id) !=
>>> -            sql_index_collation(pDest, i, &id)) {
>>> +        if (src_part->coll != dest_part->coll)
>>>                return 0;    /* Different collating sequences */
>>> -        }
>>>        }
>>>        if (sqlite3ExprCompare(pSrc->pPartIdxWhere, pDest->pPartIdxWhere,
>>> -1)) {
>>>            return 0;    /* Different WHERE clauses */
>>> @@ -1858,16 +1857,15 @@ xferOptimization(Parse * pParse,    /* Parser
>>> context */
>>>            }
>>>        }
>>>        for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
>>> -        if (index_is_unique(pDestIdx)) {
>>> +        if (pDestIdx->def->opts.is_unique)
>>>                destHasUniqueIdx = 1;
>>> -        }
>>>            for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) {
>>>                if (xferCompatibleIndex(pDestIdx, pSrcIdx))
>>>                    break;
>>>            }
>>> -        if (pSrcIdx == 0) {
>>> -            return 0;    /* pDestIdx has no corresponding index in pSrc */
>>> -        }
>>> +        /* pDestIdx has no corresponding index in pSrc */
>>> +        if (pSrcIdx == 0)
>> 33. Terminate comment with dot, pSrcIdx == NULL
> Fixed.
>>> +            return 0;
>>>        }
>>>        /* Get server checks. */
>>>        ExprList *pCheck_src = space_checks_expr_list(
>>> @@ -1943,12 +1941,12 @@ xferOptimization(Parse * pParse,    /* Parser
>>> context */
>>>            struct space *src_space =
>>> space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
>>>            vdbe_emit_open_cursor(pParse, iSrc, pSrcIdx->tnum, src_space);
>>> -        VdbeComment((v, "%s", pSrcIdx->zName));
>>> +        VdbeComment((v, "%s", pSrcIdx->def->name));
>>>            struct space *dest_space =
>>> space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
>>>            vdbe_emit_open_cursor(pParse, iDest, pDestIdx->tnum, dest_space);
>>>            sqlite3VdbeChangeP5(v, OPFLAG_BULKCSR);
>>> -        VdbeComment((v, "%s", pDestIdx->zName));
>>> +        VdbeComment((v, "%s", pDestIdx->def->name));
>>>            addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
>>>            VdbeCoverage(v);
>>>            sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
>>> diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
>>> index 5fb29c75c..7067a5ab1 100644
>>> --- a/src/box/sql/pragma.c
>>> +++ b/src/box/sql/pragma.c
>>> @@ -370,7 +370,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First
>>> part of [schema.]id field */
>>>                    k = 1;
>>>                } else {
>>>                    for (k = 1; k <= def->field_count &&
>>> -                     pk->aiColumn[k - 1] != (int) i; ++k) {
>>> +                     pk->def->key_def->parts[k - 1].fieldno
>>> +                     != i; ++k) {
>>>                    }
>>>                }
>>>                bool is_nullable = def->fields[i].is_nullable;
>>> @@ -414,7 +415,7 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First
>>> part of [schema.]id field */
>>>                        size_t avg_tuple_size_idx =
>>>                            sql_index_tuple_size(space, idx);
>>>                        sqlite3VdbeMultiLoad(v, 2, "sii",
>>> -                                 pIdx->zName,
>>> +                                 pIdx->def->name,
>>>                                     avg_tuple_size_idx,
>>>                                     index_field_tuple_est(pIdx, 0));
>>>                        sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
>>> @@ -443,11 +444,13 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /*
>>> First part of [schema.]id field */
>>>                             */
>>>                            pParse->nMem = 3;
>>>                        }
>>> -                    mx = index_column_count(pIdx);
>>> +                    mx = pIdx->def->key_def->part_count;
>>>                        assert(pParse->nMem <=
>>>                               pPragma->nPragCName);
>>> -                    for (i = 0; i < mx; i++) {
>>> -                        i16 cnum = pIdx->aiColumn[i];
>>> +                    struct key_part *part =
>>> +                        pIdx->def->key_def->parts;
>>> +                    for (i = 0; i < mx; i++, part++) {
>>> +                        i16 cnum = (int) part->fieldno;
>>>                            assert(pIdx->pTable);
>>>                            sqlite3VdbeMultiLoad(v, 1,
>>>                                         "iis", i,
>>> @@ -461,19 +464,18 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /*
>>> First part of [schema.]id field */
>>>                                         name);
>>>                            if (pPragma->iArg) {
>>>                                const char *c_n;
>>> -                            uint32_t id;
>>> +                            uint32_t id =
>>> +                                part->coll_id;
>>>                                struct coll *coll =
>>> -                                sql_index_collation(pIdx, i, &id);
>>> +                                part->coll;
>>>                                if (coll != NULL)
>>>                                    c_n = coll_by_id(id)->name;
>>>                                else
>>>                                    c_n = "BINARY";
>>> -                            enum sort_order sort_order;
>>> -                            sort_order = sql_index_column_sort_order(pIdx,
>>> -                                                 i);
>>>                                sqlite3VdbeMultiLoad(v,
>>>                                             4,
>>>                                             "isi",
>>> +                                         part->
>>>                                             sort_order,
>>>                                             c_n,
>>>                                             i <
>>> @@ -503,10 +505,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First
>>> part of [schema.]id field */
>>>                                { "c", "u", "pk" };
>>>                            sqlite3VdbeMultiLoad(v, 1,
>>>                                         "isisi", i,
>>> -                                     pIdx->
>>> -                                     zName,
>>> -                                     index_is_unique
>>> -                                     (pIdx),
>>> +                                     pIdx->def->name,
>>> + pIdx->def->opts.is_unique,
>>>                                         azOrigin
>>>                                         [pIdx->
>>>                                          idxType],
>>> diff --git a/src/box/sql/select.c b/src/box/sql/select.c
>>> index 368bcd6f0..c7c186d9d 100644
>>> --- a/src/box/sql/select.c
>>> +++ b/src/box/sql/select.c
>>> @@ -4367,7 +4367,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct
>>> SrcList_item *pFrom)
>>>            char *zIndexedBy = pFrom->u1.zIndexedBy;
>>>            Index *pIdx;
>>>            for (pIdx = pTab->pIndex;
>>> -             pIdx && strcmp(pIdx->zName, zIndexedBy);
>>> +             pIdx && strcmp(pIdx->def->name, zIndexedBy);
>>>                 pIdx = pIdx->pNext) ;
>>>            if (!pIdx) {
>>>                sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
>>> diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
>>> index 47360fa5b..f696591fd 100644
>>> --- a/src/box/sql/sqliteInt.h
>>> +++ b/src/box/sql/sqliteInt.h
>>> @@ -2070,27 +2070,17 @@ struct UnpackedRecord {
>>>     * Each SQL index is represented in memory by an
>>>     * instance of the following structure.
>>>     *
>>> - * The columns of the table that are to be indexed are described
>>> - * by the aiColumn[] field of this structure.  For example, suppose
>>> - * we have the following table and index:
>>> - *
>>> - *     CREATE TABLE Ex1(c1 int, c2 int, c3 text);
>>> - *     CREATE INDEX Ex2 ON Ex1(c3,c1);
>>> - *
>>> - * In the Table structure describing Ex1, nCol==3 because there are
>>> - * three columns in the table.  In the Index structure describing
>>> - * Ex2, nColumn==2 since 2 of the 3 columns of Ex1 are indexed.
>>> - * The value of aiColumn is {2, 0}.  aiColumn[0]==2 because the
>>> - * first column to be indexed (c3) has an index of 2 in Ex1.aCol[].
>>> - * The second column to be indexed (c1) has an index of 0 in
>>> - * Ex1.aCol[], hence Ex2.aiColumn[1]==0.
>>> - *
>>> - * The Index.onError field determines whether or not the indexed columns
>>> - * must be unique and what to do if they are not.  When Index.onError=
>>> - * ON_CONFLICT_ACTION_NONE, it means this is not a unique index.
>>> - * Otherwise it is a unique index and the value of Index.onError indicate
>>> - * the which conflict resolution algorithm to employ whenever an attempt
>>> - * is made to insert a non-unique element.
>>> + * Indexes name, corresponding space_id, type (in tarantool
>>> + * sense - HASH, TREE, etc) are stored in index definition - in
>>> + * Index.def.
>>> + * SQL statement which created the index and 'is_unique' flag are
>>> + * stored in Index.def.opts. Information about index parts (part
>>> + * count, corresponding space fields' numbers, parts' collations
>>> + * and sort orders, etc) are stored in Index.def.key_def.parts
>>> + *
>>> + * Index.onError indicate the which conflict resolution algorithm
>>> + * to employ whenever an attempt is made to insert a non-unique
>>> + * element in unique index.
>>>     *
>>>     * While parsing a CREATE TABLE or CREATE INDEX statement in order to
>>>     * generate VDBE code (as opposed to reading from Tarantool's _space
>>> @@ -2101,26 +2091,18 @@ struct UnpackedRecord {
>>>     * program is executed). See convertToWithoutRowidTable() for details.
>>>     */
>>>    struct Index {
>>> -    char *zName;        /* Name of this index */
>>> -    i16 *aiColumn;        /* Which columns are used by this index.  1st
>>> is 0 */
>>>        LogEst *aiRowLogEst;    /* From ANALYZE: Est. rows selected by
>>> each column */
>>>        Table *pTable;        /* The SQL table being indexed */
>>>        char *zColAff;        /* String defining the affinity of each
>>> column */
>>>        Index *pNext;        /* The next index associated with the same
>>> table */
>>>        Schema *pSchema;    /* Schema containing this index */
>>> -    /** Sorting order for each column. */
>>> -    enum sort_order *sort_order;
>>> -    /** Array of collation sequences for index. */
>>> -    struct coll **coll_array;
>>> -    /** Array of collation identifiers. */
>>> -    uint32_t *coll_id_array;
>>>        Expr *pPartIdxWhere;    /* WHERE clause for partial indices */
>>>        int tnum;        /* DB Page containing root of this index */
>>> -    u16 nColumn;        /* Number of columns stored in the index */
>>>        u8 onError;        /* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
>>>                     * or _NONE
>>>                     */
>>>        unsigned idxType:2;    /* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE
>>> INDEX */
>>> +    struct index_def *def;
>> 34.
>>>>>> + struct index_def *def;
>>>>>> };>> 10. Will you keep an informative tarantool-style comment here?
>>> Ok, added comment.
>> I still can't see a comment for index_def field in Index struct in your patch.
>> /** Smth like this. */
> All the information about index_def are in comment before struct Index 
> definiton.
> I am not sure, that something must be added before 'index_def' line.
>>>    };
>>>
>>>    /**
>>> @@ -3546,34 +3528,6 @@ void sqlite3AddCollateType(Parse *, Token *);
>>>     */
>>>    struct coll *
>>>    sql_column_collation(struct space_def *def, uint32_t column, uint32_t
>>> *coll_id);
>>> -/**
>>> - * Return name of given column collation from index.
>>> - *
>>> - * @param idx Index which is used to fetch column.
>>> - * @param column Number of column.
>>> - * @param[out] coll_id Collation identifier.
>>> - * @retval Pointer to collation.
>>> - */
>>> -struct coll *
>>> -sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
>>> -
>>> -/**
>>> - * Return key_def of provided struct Index.
>>> - * @param idx Pointer to `struct Index` object.
>>> - * @retval Pointer to `struct key_def`.
>>> - */
>>> -struct key_def*
>>> -sql_index_key_def(struct Index *idx);
>>> -
>>> -/**
>>> - * Return sort order of given column from index.
>>> - *
>>> - * @param idx Index which is used to fetch column.
>>> - * @param column Number of column.
>>> - * @retval Sort order of requested column.
>>> - */
>>> -enum sort_order
>>> -sql_index_column_sort_order(Index *idx, uint32_t column);
>>>
>>>    void sqlite3EndTable(Parse *, Token *, Token *, Select *);
>>>
>>> @@ -3661,8 +3615,6 @@ void sqlite3SrcListAssignCursors(Parse *, SrcList *);
>>>    void sqlite3IdListDelete(sqlite3 *, IdList *);
>>>    void sqlite3SrcListDelete(sqlite3 *, SrcList *);
>>>    Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
>>> -bool
>>> -index_is_unique(Index *);
>>>
>>>    /**
>>>     * Create a new index for an SQL table.  name is the name of the
>>> @@ -4381,8 +4333,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
>>>    int
>>>    sql_analysis_load(struct sqlite3 *db);
>>>
>>> -uint32_t
>>> -index_column_count(const Index *);
>>>    bool
>>>    index_is_unique_not_null(const Index *);
>>>    void sqlite3RegisterLikeFunctions(sqlite3 *, int);
>>> diff --git a/src/box/sql/trigger.c b/src/box/sql/trigger.c
>>> index 042226cde..ad8e2438f 100644
>>> --- a/src/box/sql/trigger.c
>>> +++ b/src/box/sql/trigger.c
>>> @@ -873,8 +873,6 @@ codeRowTrigger(Parse * pParse,    /* Current parse
>>> context */
>>>        pSubParse->pToplevel = pTop;
>>>        pSubParse->eTriggerOp = pTrigger->op;
>>>        pSubParse->nQueryLoop = pParse->nQueryLoop;
>>> -    struct region *region = &fiber()->gc;
>>> -    pSubParse->region_initial_size = region_used(region);
>>>
>>>        v = sqlite3GetVdbe(pSubParse);
>>>        if (v) {
>>> diff --git a/src/box/sql/update.c b/src/box/sql/update.c
>>> index 10385eb78..fc479fb05 100644
>>> --- a/src/box/sql/update.c
>>> +++ b/src/box/sql/update.c
>>> @@ -238,14 +238,14 @@ sqlite3Update(Parse * pParse,        /* The parser
>>> context */
>>>         */
>>>        for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
>>>            int reg;
>>> -        int nIdxCol = index_column_count(pIdx);
>>> +        int nIdxCol = pIdx->def->key_def->part_count;
>> 35. Same as 17
> Fxd.
>>
>>>            if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
>>>                reg = ++pParse->nMem;
>>>                pParse->nMem += nIdxCol;
>>>            } else {
>>>                reg = 0;
>>>                for (i = 0; i < nIdxCol; i++) {
>>> -                i16 iIdxCol = pIdx->aiColumn[i];
>>> +                i16 iIdxCol = pIdx->def->key_def->parts[i].fieldno;
>> 36. Same as 17
> Fxd.
>>
>>>                    if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
>>>                        reg = ++pParse->nMem;
>>>                        pParse->nMem += nIdxCol;
>>> @@ -307,7 +307,7 @@ sqlite3Update(Parse * pParse,        /* The parser
>>> context */
>>>            nPk = nKey;
>>>        } else {
>>>            assert(pPk != 0);
>>> -        nPk = index_column_count(pPk);
>>> +        nPk = pPk->def->key_def->part_count;
>> 37. Same as 17
> Fxd.
>>>        }
>>>        iPk = pParse->nMem + 1;
>>>        pParse->nMem += nPk;
>>> @@ -334,9 +334,9 @@ sqlite3Update(Parse * pParse,        /* The parser
>>> context */
>>>            }
>>>        } else {
>>>            for (i = 0; i < nPk; i++) {
>>> -            assert(pPk->aiColumn[i] >= 0);
>>>                sqlite3ExprCodeGetColumnOfTable(v, def, iDataCur,
>>> -                            pPk->aiColumn[i],
>>> +                            pPk->def->key_def->
>>> +                                parts[i].fieldno,
>>>                                iPk + i);
>>>            }
>>>        }
>>> diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
>>> index 679bd0bc1..520b309d9 100644
>>> --- a/src/box/sql/vdbeaux.c
>>> +++ b/src/box/sql/vdbeaux.c
>>> @@ -1165,7 +1165,7 @@ sql_vdbe_set_p4_key_def(struct Parse *parse,
>>> struct Index *idx)
>>>        struct Vdbe *v = parse->pVdbe;
>>>        assert(v != NULL);
>>>        assert(idx != NULL);
>>> -    struct key_def *def = key_def_dup(sql_index_key_def(idx));
>>> +    struct key_def *def = key_def_dup(idx->def->key_def);
>>>        if (def == NULL)
>>>            sqlite3OomFault(parse->db);
>>>        else
>>> diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
>>> index f408b7701..51b5d516e 100644
>>> --- a/src/box/sql/vdbemem.c
>>> +++ b/src/box/sql/vdbemem.c
>>> @@ -1087,7 +1087,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
>>>                Index *pIdx = p->pIdx;    /* Index being probed */
>>>                int nByte;    /* Bytes of space to allocate */
>>>                int i;    /* Counter variable */
>>> -            int nCol = index_column_count(pIdx);
>>> +            int nCol = pIdx->def->key_def->part_count;
>> 38. Same as 17..
> Fxd.
>>>                nByte = sizeof(Mem) * nCol +
>>>                    ROUND8(sizeof(UnpackedRecord));
>>> @@ -1095,7 +1095,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
>>>                    (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
>>>                if (pRec == NULL)
>>>                    return NULL;
>>> -            pRec->key_def = key_def_dup(sql_index_key_def(pIdx));
>>> +            pRec->key_def = key_def_dup(pIdx->def->key_def);
>>>                if (pRec->key_def == NULL) {
>>>                    sqlite3DbFree(db, pRec);
>>>                    sqlite3OomFault(db);
>>> diff --git a/src/box/sql/where.c b/src/box/sql/where.c
>>> index e6c34f34a..3f95c4243 100644
>>> --- a/src/box/sql/where.c
>>> +++ b/src/box/sql/where.c
>>> @@ -372,13 +372,19 @@ whereScanInit(WhereScan * pScan,    /* The
>>> WhereScan object being initialized */
>>>        pScan->is_column_seen = false;
>>>        if (pIdx) {
>>>            int j = iColumn;
>>> -        iColumn = pIdx->aiColumn[j];
>>> +        iColumn = pIdx->def->key_def->parts[j].fieldno;
>>> +        /*
>>> +         * pIdx->tnum == 0 means that pIdx is a fake
>>> +         * integer primary key index
>>> +         */
>>> +        if (pIdx->tnum == 0)
>>> +            iColumn = -1;
>>> +
>>>            if (iColumn >= 0) {
>>>                char affinity =
>>> pIdx->pTable->def->fields[iColumn].affinity;
>>>                pScan->idxaff = affinity;
>>> -            uint32_t id;
>>> -            pScan->coll = sql_index_collation(pIdx, j, &id);
>>> +            pScan->coll = pIdx->def->key_def->parts[j].coll;
>>>                pScan->is_column_seen = true;
>>>            }
>>>        }
>>> @@ -541,47 +547,24 @@ findIndexCol(Parse * pParse,    /* Parse context */
>>>             Index * pIdx,    /* Index to match column of */
>>>             int iCol)        /* Column of index to match */
>>>    {
>>> +    struct key_part *part_to_match = &pIdx->def->key_def->parts[iCol];
>>>        for (int i = 0; i < pList->nExpr; i++) {
>>>            Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
>>> -        if (p->op == TK_COLUMN &&
>>> -            p->iColumn == pIdx->aiColumn[iCol] &&
>>> -            p->iTable == iBase) {
>>> +        if (p->op == TK_COLUMN && p->iTable == iBase &&
>>> +            p->iColumn == (int) part_to_match->fieldno) {
>>>                bool is_found;
>>>                uint32_t id;
>>>                struct coll *coll = sql_expr_coll(pParse,
>>>                                  pList->a[i].pExpr,
>>>                                  &is_found, &id);
>>> -            if (is_found &&
>>> -                coll == sql_index_collation(pIdx, iCol, &id)) {
>>> +            if (is_found && coll == part_to_match->coll)
>>>                    return i;
>>> -            }
>>>            }
>>>        }
>>>
>>>        return -1;
>>>    }
>>>
>>> -/*
>>> - * Return TRUE if the iCol-th column of index pIdx is NOT NULL
>>> - */
>>> -static int
>>> -indexColumnNotNull(Index * pIdx, int iCol)
>>> -{
>>> -    int j;
>>> -    assert(pIdx != 0);
>>> -    assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
>>> -    j = pIdx->aiColumn[iCol];
>>> -    if (j >= 0) {
>>> -        return !pIdx->pTable->def->fields[j].is_nullable;
>>> -    } else if (j == (-1)) {
>>> -        return 1;
>>> -    } else {
>>> -        assert(j == (-2));
>>> -        return 0;    /* Assume an indexed expression can always yield a
>>> NULL */
>>> -
>>> -    }
>>> -}
>>> -
>>>    /*
>>>     * Return true if the DISTINCT expression-list passed as the third
>>> argument
>>>     * is redundant.
>>> @@ -633,9 +616,9 @@ isDistinctRedundant(Parse * pParse,        /*
>>> Parsing context */
>>>         *      contain a "col=X" term are subject to a NOT NULL constraint.
>>>         */
>>>        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
>>> -        if (!index_is_unique(pIdx))
>>> +        if (!pIdx->def->opts.is_unique)
>>>                continue;
>>> -        int col_count = index_column_count(pIdx);
>>> +        int col_count = pIdx->def->key_def->part_count;
>>>            for (i = 0; i < col_count; i++) {
>>>                if (0 ==
>>>                    sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
>>> @@ -643,11 +626,12 @@ isDistinctRedundant(Parse * pParse, /* Parsing
>>> context */
>>>                    if (findIndexCol
>>>                        (pParse, pDistinct, iBase, pIdx, i) < 0)
>>>                        break;
>>> -                if (indexColumnNotNull(pIdx, i) == 0)
>>> +                uint32_t j = pIdx->def->key_def->parts[i].fieldno;
>>> +                if (pIdx->pTable->def->fields[j].is_nullable)
>>>                        break;
>>>                }
>>>            }
>>> -        if (i == (int)index_column_count(pIdx)) {
>>> +        if (i == (int) pIdx->def->key_def->part_count) {
>>>                /* This index implies that the DISTINCT qualifier is
>>> redundant. */
>>>                return 1;
>>>            }
>>> @@ -1184,7 +1168,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
>>>    char
>>>    sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
>>>    {
>>> -    assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
>>> +    assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
>>>        if (!pIdx->zColAff) {
>>>            if (sqlite3IndexAffinityStr(db, pIdx) == 0)
>>>                return AFFINITY_BLOB;
>>> @@ -1246,13 +1230,12 @@ whereRangeSkipScanEst(Parse * pParse,     /*
>>> Parsing & code generating context */
>>>        int nUpper = index->def->opts.stat->sample_count + 1;
>>>        int rc = SQLITE_OK;
>>>        u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
>>> -    uint32_t id;
>>>
>>>        sqlite3_value *p1 = 0;    /* Value extracted from pLower */
>>>        sqlite3_value *p2 = 0;    /* Value extracted from pUpper */
>>>        sqlite3_value *pVal = 0;    /* Value extracted from record */
>>>
>>> -    struct coll *pColl = sql_index_collation(p, nEq, &id);
>>> +    struct coll *pColl = p->def->key_def->parts[nEq].coll;
>> 39. Same as 17
> Fxd.
>>>        if (pLower) {
>>>            rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
>>>                               aff, &p1);
>>> @@ -1448,7 +1431,7 @@ whereRangeScanEst(Parse * pParse,    /* Parsing &
>>> code generating context */
>>>                       || (pLower->eOperator & (WO_GT | WO_GE)) != 0);
>>>                assert(pUpper == 0
>>>                       || (pUpper->eOperator & (WO_LT | WO_LE)) != 0);
>>> -            if (sql_index_column_sort_order(p, nEq) !=
>>> +            if (p->def->key_def->parts[nEq].sort_order !=
>>>                    SORT_ORDER_ASC) {
>>>                    /* The roles of pLower and pUpper are swapped for a
>>> DESC index */
>>>                    SWAP(pLower, pUpper);
>>> @@ -1598,7 +1581,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing &
>>> code generating context */
>>>        int bOk;
>>>
>>>        assert(nEq >= 1);
>>> -    assert(nEq <= (int)index_column_count(p));
>>> +    assert(nEq <= (int) p->def->key_def->part_count);
>>>        assert(pBuilder->nRecValid < nEq);
>>>
>>>        /* If values are not available for all fields of the index to the left
>>> @@ -1619,7 +1602,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing &
>>> code generating context */
>>>
>>>        whereKeyStats(pParse, p, pRec, 0, a);
>>>        WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
>>> -              p->zName, nEq - 1, (int)a[1]));
>>> +              p->def->name, nEq - 1, (int)a[1]));
>>>        *pnRow = a[1];
>>>
>>>        return rc;
>>> @@ -1751,7 +1734,7 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
>>>                   pItem->zAlias ? pItem->zAlias : pTab->def->name);
>>>    #endif
>>>        const char *zName;
>>> -    if (p->pIndex && (zName = p->pIndex->zName) != 0) {
>>> +    if (p->pIndex && (zName = p->pIndex->def->name) != 0) {
>>>            if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
>>>                int i = sqlite3Strlen30(zName) - 1;
>>>                while (zName[i] != '_')
>>> @@ -2314,7 +2297,7 @@ whereRangeVectorLen(Parse * pParse,    /* Parsing
>>> context */
>>>        int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
>>>        int i;
>>>
>>> -    nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
>>> +    nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
>>>        for (i = 1; i < nCmp; i++) {
>>>            /* Test if comparison i of pTerm is compatible with column (i+nEq)
>>>             * of the index. If not, exit the loop.
>>> @@ -2335,13 +2318,11 @@ whereRangeVectorLen(Parse * pParse,    /*
>>> Parsing context */
>>>             * order of the index column is the same as the sort order of the
>>>             * leftmost index column.
>>>             */
>>> -        if (pLhs->op != TK_COLUMN
>>> -            || pLhs->iTable != iCur
>>> -            || pLhs->iColumn != pIdx->aiColumn[i + nEq]
>>> -            || sql_index_column_sort_order(pIdx, i + nEq) !=
>>> -               sql_index_column_sort_order(pIdx, nEq)) {
>>> +        if (pLhs->op != TK_COLUMN || pLhs->iTable != iCur
>>> +            || pLhs->iColumn != (int)pIdx->def->key_def->parts[i +
>>> nEq].fieldno
>>> +            || pIdx->def->key_def->parts[i + nEq].sort_order !=
>>> + pIdx->def->key_def->parts[nEq].sort_order)
>>>                break;
>>> -        }
>>>
>>>            aff = sqlite3CompareAffinity(pRhs, sqlite3ExprAffinity(pLhs));
>>>            idxaff =
>>> @@ -2353,7 +2334,7 @@ whereRangeVectorLen(Parse * pParse,    /* Parsing
>>> context */
>>>            pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
>>>            if (pColl == 0)
>>>                break;
>>> -            if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
>>> +        if (pIdx->def->key_def->parts[(i + nEq)].coll != pColl)
>>>                break;
>>>        }
>>>        return i;
>>> @@ -2396,13 +2377,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder *
>>> pBuilder,    /* The WhereLoop factory */
>>>        LogEst rSize;        /* Number of rows in the table */
>>>        LogEst rLogSize;    /* Logarithm of table size */
>>>        WhereTerm *pTop = 0, *pBtm = 0;    /* Top and bottom range
>>> constraints */
>>> -    uint32_t nProbeCol = index_column_count(pProbe);
>>> +    uint32_t nProbeCol = pProbe->def->key_def->part_count;
>>>
>>>        pNew = pBuilder->pNew;
>>>        if (db->mallocFailed)
>>>            return SQLITE_NOMEM_BKPT;
>>>        WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
>>> -               pProbe->zName, pNew->nEq));
>>> +               pProbe->def->name, pNew->nEq));
>>>
>>>        assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
>>>        if (pNew->wsFlags & WHERE_BTM_LIMIT) {
>>> @@ -2452,8 +2433,9 @@ whereLoopAddBtreeIndex(WhereLoopBuilder *
>>> pBuilder,    /* The WhereLoop factory */
>>>            LogEst nOutUnadjusted;    /* nOut before IN() and WHERE
>>> adjustments */
>>>            int nIn = 0;
>>>            int nRecValid = pBuilder->nRecValid;
>>> +        uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
>>>            if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
>>> -            && indexColumnNotNull(pProbe, saved_nEq)
>>> +            && !pProbe->pTable->def->fields[j].is_nullable
>>>                ) {
>>>                continue;    /* ignore IS [NOT] NULL constraints on NOT
>>> NULL columns */
>>>            }
>>> @@ -2523,14 +2505,16 @@ whereLoopAddBtreeIndex(WhereLoopBuilder *
>>> pBuilder,    /* The WhereLoop factory */
>>>                                 */
>>>                }
>>>            } else if (eOp & WO_EQ) {
>>> -            int iCol = pProbe->aiColumn[saved_nEq];
>>> +            int iCol = pProbe->def->key_def->parts[saved_nEq].fieldno;
>>>                pNew->wsFlags |= WHERE_COLUMN_EQ;
>>>                assert(saved_nEq == pNew->nEq);
>>> -            if ((iCol > 0 && nInMul == 0
>>> -                && saved_nEq == nProbeCol - 1)
>>> -                ) {
>>> -                if (iCol >= 0 &&
>>> -                    !index_is_unique_not_null(pProbe)) {
>>> +            if ((iCol > 0 && nInMul == 0 &&
>>> +                 saved_nEq == nProbeCol - 1)) {
>>> +                bool index_is_unique_not_null =
>>> +                    pProbe->def->key_def->is_nullable &&
>>> +                    pProbe->def->opts.is_unique;
>>> +                if (pProbe->tnum != 0 &&
>>> +                    !index_is_unique_not_null) {
>>>                        pNew->wsFlags |= WHERE_UNQ_WANTED;
>>>                    } else {
>>>                        pNew->wsFlags |= WHERE_ONEROW;
>>> @@ -2592,8 +2576,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder *
>>> pBuilder,    /* The WhereLoop factory */
>>>                assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
>>>
>>>                assert(pNew->nOut == saved_nOut);
>>> -            if (pTerm->truthProb <= 0
>>> -                && pProbe->aiColumn[saved_nEq] >= 0) {
>>> +            if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
>>>                    assert((eOp & WO_IN) || nIn == 0);
>>>                    testcase(eOp & WO_IN);
>>>                    pNew->nOut += pTerm->truthProb;
>>> @@ -2749,7 +2732,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder *
>>> pBuilder,    /* The WhereLoop factory */
>>>        }
>>>
>>>        WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
>>> -               pProbe->zName, saved_nEq, rc));
>>> +               pProbe->def->name, saved_nEq, rc));
>>>        return rc;
>>>    }
>>>
>>> @@ -2792,7 +2775,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
>>>    {
>>>        ExprList *pOB;
>>>        int ii, jj;
>>> -    int nIdxCol = index_column_count(pIndex);
>>> +    int nIdxCol = pIndex->def->key_def->part_count;
>>>        if (index_is_unordered(pIndex))
>>>            return 0;
>>>        if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
>>> @@ -2803,7 +2786,8 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
>>>                if (pExpr->iColumn < 0)
>>>                    return 1;
>>>                for (jj = 0; jj < nIdxCol; jj++) {
>>> -                if (pExpr->iColumn == pIndex->aiColumn[jj])
>>> +                if (pExpr->iColumn == (int)
>>> + pIndex->def->key_def->parts[jj].fieldno)
>>>                        return 1;
>>>                }
>>>            }
>>> @@ -2882,7 +2866,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,
>>> /* WHERE clause information */
>>>        Index *pProbe;        /* An index we are evaluating */
>>>        Index sPk;        /* A fake index object for the primary key */
>>>        LogEst aiRowEstPk[2];    /* The aiRowLogEst[] value for the sPk
>>> index */
>>> -    i16 aiColumnPk = -1;    /* The aColumn[] value for the sPk index */
>>>        SrcList *pTabList;    /* The FROM clause */
>>>        struct SrcList_item *pSrc;    /* The FROM clause btree term to add */
>>>        WhereLoop *pNew;    /* Template WhereLoop object */
>>> @@ -2913,11 +2896,29 @@ whereLoopAddBtree(WhereLoopBuilder *
>>> pBuilder,    /* WHERE clause information */
>>>             */
>>>            Index *pFirst;    /* First of real indices on the table */
>>>            memset(&sPk, 0, sizeof(Index));
>>> -        sPk.nColumn = 1;
>>> -        sPk.aiColumn = &aiColumnPk;
>>>            sPk.aiRowLogEst = aiRowEstPk;
>>>            sPk.onError = ON_CONFLICT_ACTION_REPLACE;
>>>            sPk.pTable = pTab;
>>> +
>>> +        struct key_def *key_def = key_def_new(1);
>>> +        if (key_def == NULL)
>>> +            return SQLITE_ERROR;
>>> +
>>> +        key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
>>> +                 ON_CONFLICT_ACTION_ABORT,
>>> +                 NULL, COLL_NONE, SORT_ORDER_ASC);
>>> +
>>> +        struct index_opts index_opts = index_opts_default;
>>> +
>>> +        sPk.def = index_def_new(pTab->def->id, 0, "primary",
>>> +                    sizeof("primary") - 1, TREE, &index_opts,
>>> +                    key_def, NULL);
>>> +        key_def_delete(key_def);
>>> +
>>> +        if (sPk.def == NULL) {
>>> +            return SQLITE_ERROR;
>>> +        }
>> 39. Extra braces.
> Fixed.
>>> +
>>>            aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
>>>            aiRowEstPk[1] = 0;
>>>            pFirst = pSrc->pTab->pIndex;
>>> @@ -3392,8 +3393,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,
>>> /* The WHERE clause */
>>>                       index_is_unordered(pIndex)) {
>>>                    return 0;
>>>                } else {
>>> -                nColumn = index_column_count(pIndex);
>>> -                isOrderDistinct = index_is_unique(pIndex);
>>> +                nColumn = pIndex->def->key_def->part_count;
>>> +                isOrderDistinct = pIndex->def->opts.is_unique;
>>>                }
>>>
>>>                /* Loop through all columns of the index and deal with the
>>> ones
>>> @@ -3454,9 +3455,10 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,
>>> /* The WHERE clause */
>>>                     * (revIdx) for the j-th column of the index.
>>>                     */
>>>                    if (pIndex != NULL) {
>>> -                    iColumn = pIndex->aiColumn[j];
>>> -                    revIdx = sql_index_column_sort_order(pIndex,
>>> -                                         j);
>>> +                    iColumn = pIndex->def->key_def->
>>> +                        parts[j].fieldno;
>>> +                    revIdx = pIndex->def->key_def->
>>> +                        parts[j].sort_order;
>>>                        if (iColumn == pIndex->pTable->iPKey)
>>>                            iColumn = -1;
>>>                    } else {
>>> @@ -3506,8 +3508,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,
>>> /* The WHERE clause */
>>>                                          pOrderBy->a[i].pExpr,
>>>                                          &is_found, &id);
>>>                            struct coll *idx_coll =
>>> -                            sql_index_collation(pIndex,
>>> -                                        j, &id);
>>> + pIndex->def->key_def->parts[j].coll;
>>>                            if (is_found &&
>>>                                coll != idx_coll)
>>>                                continue;
>>> @@ -4785,7 +4786,7 @@ sqlite3WhereBegin(Parse * pParse,    /* The parser
>>> context */
>>>                        sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);    /* Hint
>>> to COMDB2 */
>>>                    }
>>>                    if (pIx != NULL)
>>> -                    VdbeComment((v, "%s", pIx->zName));
>>> +                    VdbeComment((v, "%s", pIx->def->name));
>>>                    else
>>>                        VdbeComment((v, "%s", idx_def->name));
>>>    #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
>>> @@ -4918,7 +4919,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
>>>            if (pLevel->addrSkip) {
>>>                sqlite3VdbeGoto(v, pLevel->addrSkip);
>>>                VdbeComment((v, "next skip-scan on %s",
>>> -                     pLoop->pIndex->zName));
>>> +                     pLoop->pIndex->def->name));
>>>                sqlite3VdbeJumpHere(v, pLevel->addrSkip);
>>>                sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
>>>            }
>>> diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
>>> index eaab0b657..a04013835 100644
>>> --- a/src/box/sql/wherecode.c
>>> +++ b/src/box/sql/wherecode.c
>>> @@ -48,7 +48,7 @@
>>>    static const char *
>>>    explainIndexColumnName(Index * pIdx, int i)
>>>    {
>>> -    i = pIdx->aiColumn[i];
>>> +    i = pIdx->def->key_def->parts[i].fieldno;
>>>        return pIdx->pTable->def->fields[i].name;
>>>    }
>>>
>>> @@ -243,7 +243,7 @@ sqlite3WhereExplainOneScan(Parse * pParse, /* Parse
>>> context */
>>>                if (zFmt) {
>>>                    sqlite3StrAccumAppend(&str, " USING ", 7);
>>>                    if (pIdx != NULL)
>>> -                    sqlite3XPrintf(&str, zFmt, pIdx->zName);
>>> +                    sqlite3XPrintf(&str, zFmt, pIdx->def->name);
>>>                    else if (idx_def != NULL)
>>>                        sqlite3XPrintf(&str, zFmt, idx_def->name);
>>>                    else
>>> @@ -488,7 +488,7 @@ codeEqualityTerm(Parse * pParse,    /* The parsing
>>> context */
>>>            int *aiMap = 0;
>>>
>>>            if (pLoop->pIndex != 0 &&
>>> -            sql_index_column_sort_order(pLoop->pIndex, iEq)) {
>>> + pLoop->pIndex->def->key_def->parts[iEq].sort_order) {
>>>                testcase(iEq == 0);
>>>                testcase(bRev);
>>>                bRev = !bRev;
>>> @@ -736,7 +736,7 @@ codeAllEqualityTerms(Parse * pParse,    /* Parsing
>>> context */
>>>            sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
>>>            VdbeCoverageIf(v, bRev == 0);
>>>            VdbeCoverageIf(v, bRev != 0);
>>> -        VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
>>> +        VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
>>>            j = sqlite3VdbeAddOp0(v, OP_Goto);
>>>            pLevel->addrSkip =
>>>                sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
>>> @@ -746,7 +746,8 @@ codeAllEqualityTerms(Parse * pParse,    /* Parsing
>>> context */
>>>            sqlite3VdbeJumpHere(v, j);
>>>            for (j = 0; j < nSkip; j++) {
>>>                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
>>> -                      pIdx->aiColumn[j], regBase + j);
>>> + pIdx->def->key_def->parts[j].fieldno,
>>> +                      regBase + j);
>>>                VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
>>>            }
>>>        }
>>> @@ -1275,12 +1276,12 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                   || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
>>>            int nIdxCol;
>>>            if (pIdx != NULL)
>>> -            nIdxCol = index_column_count(pIdx);
>>> +            nIdxCol = pIdx->def->key_def->part_count;
>>>            else
>>>                nIdxCol = idx_def->key_def->part_count;
>>>            if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
>>>                && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
>>> -            j = pIdx->aiColumn[nEq];
>>> +            j = pIdx->def->key_def->parts[nEq].fieldno;
>>>                /* Allow seek for column with `NOT NULL` == false attribute.
>>>                 * If a column may contain NULL-s, the comparator installed
>>>                 * by Tarantool is prepared to seek using a NULL value.
>>> @@ -1291,8 +1292,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                 * FYI: entries in an index are ordered as follows:
>>>                 *      NULL, ... NULL, min_value, ...
>>>                 */
>>> -            if (j >= 0 &&
>>> - pIdx->pTable->def->fields[j].is_nullable) {
>>> +            if (pIdx->pTable->def->fields[j].is_nullable) {
>>>                    assert(pLoop->nSkip == 0);
>>>                    bSeekPastNull = 1;
>>>                    nExtraReg = 1;
>>> @@ -1331,14 +1331,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                    assert((bRev & ~1) == 0);
>>>                    pLevel->iLikeRepCntr <<= 1;
>>>                    pLevel->iLikeRepCntr |=
>>> -                    bRev ^ (sql_index_column_sort_order(pIdx, nEq) ==
>>> +                    bRev ^ (pIdx->def->key_def->
>>> +                          parts[nEq].sort_order ==
>>>                            SORT_ORDER_DESC);
>>>                }
>>>    #endif
>>>                if (pRangeStart == 0) {
>>> -                j = pIdx->aiColumn[nEq];
>>> -                if (j >= 0 &&
>>> - pIdx->pTable->def->fields[j].is_nullable)
>>> +                j = pIdx->def->key_def->parts[nEq].fieldno;
>>> +                if (pIdx->pTable->def->fields[j].is_nullable)
>>>                        bSeekPastNull = 1;
>>>                }
>>>            }
>>> @@ -1350,7 +1350,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>             * start and end terms (pRangeStart and pRangeEnd).
>>>             */
>>>            if ((nEq < nIdxCol &&
>>> -             bRev == (sql_index_column_sort_order(pIdx, nEq) ==
>>> +             bRev == (pIdx->def->key_def->parts[nEq].sort_order ==
>>>                      SORT_ORDER_ASC)) ||
>>>                (bRev && nIdxCol == nEq)) {
>>>                SWAP(pRangeEnd, pRangeStart);
>>> @@ -1433,13 +1433,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                }
>>>            } else {
>>>                pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
>>> +            uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
>>>                affinity =
>>> - pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
>>> + pIdx->pTable->def->fields[fieldno].affinity;
>>>            }
>>>
>>>            int nPkCol;
>>>            if (pk != NULL)
>>> -            nPkCol = index_column_count(pk);
>>> +            nPkCol = pk->def->key_def->part_count;
>>>            else
>>>                nPkCol = idx_pk->key_def->part_count;
>>>            if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
>>> @@ -1450,8 +1451,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                 */
>>>                int limit = pRangeStart == NULL ? nEq : nEq + 1;
>>>                for (int i = 0; i < limit; i++) {
>>> -                if ((pIdx != NULL && pIdx->aiColumn[i] ==
>>> -                     pk->aiColumn[0]) ||
>>> +                if ((pIdx != NULL &&
>>> + pIdx->def->key_def->parts[i].fieldno ==
>>> +                     pk->def->key_def->parts[0].fieldno) ||
>>>                        (idx_pk != NULL &&
>>>                         idx_def->key_def->parts[i].fieldno ==
>>>                         idx_pk->key_def->parts[0].fieldno)) {
>>> @@ -1563,10 +1565,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                /* pIdx is a covering index.  No need to access the main
>>> table. */
>>>            }  else if (iCur != iIdxCur) {
>>>                Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
>>> -            int nPkCol = index_column_count(pPk);
>>> +            int nPkCol = pPk->def->key_def->part_count;
>> 41. Same as 17
> Fixed.
>>
>>>                int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
>>>                for (j = 0; j < nPkCol; j++) {
>>> -                k = pPk->aiColumn[j];
>>> +                k = pPk->def->key_def->parts[j].fieldno;
>>>                    sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
>>>                              iKeyReg + j);
>>>                }
>>> @@ -1671,7 +1673,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>             */
>>>            if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
>>>                Index *pPk = sqlite3PrimaryKeyIndex(pTab);
>>> -            int nPkCol = index_column_count(pPk);
>>> +            int nPkCol = pPk->def->key_def->part_count;
>> 42. Same as 17
> Fxd.
>>
>>>                regRowset = pParse->nTab++;
>>>                sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
>>>                          regRowset, nPkCol);
>>> @@ -1775,13 +1777,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo *
>>> pWInfo,    /* Complete information about t
>>>                            int iSet =
>>>                                ((ii == pOrWc->nTerm - 1) ? -1 : ii);
>>>                            Index *pPk = sqlite3PrimaryKeyIndex (pTab);
>>> -                        int nPk = index_column_count(pPk);
>>> +                        int nPk = pPk->def->key_def->part_count;
>>>                            int iPk;
>>>
>>>                            /* Read the PK into an array of temp registers. */
>>>                            r = sqlite3GetTempRange(pParse, nPk);
>>>                            for (iPk = 0; iPk < nPk; iPk++) {
>>> -                            int iCol = pPk->aiColumn[iPk];
>>> +                            int iCol = pPk->def->
>> 43. Same as 17
> Fixed.
>>> +                                key_def->
>>> +                                parts[iPk].
>>> +                                fieldno;
>>>                                sqlite3ExprCodeGetColumnToReg
>>>                                    (pParse, pTab->def,
>>>                                     iCol, iCur,
>>>
> Here is the patch:
> ---
> Branch:https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where 
> 
> Issue:https://github.com/tarantool/tarantool/issues/3369
> 
> sql: add index_def to Index
> 
> Now every sqlite struct Index is created with tnt struct
> index_def inside. This allows us to use tnt index_def
> in work with sqlite indexes in the same manner as with
> tnt index and is a step to remove sqlite Index with
> tnt index.
> Fields coll_array, coll_id_array, aiColumn, sort_order
> and zName are removed from Index. All usages of this
> fields changed to usage of corresponding index_def
> fields.
> index_is_unique(), sql_index_collation() and
> index_column_count() are removed with calls of
> index_def corresponding fields.
> 
> Closes: #3369
> ---
>   src/box/sql.c           |  54 ++--
>   src/box/sql/analyze.c   |  79 +++---
>   src/box/sql/build.c     | 642 
> ++++++++++++++++++++++++------------------------
>   src/box/sql/delete.c    |  10 +-
>   src/box/sql/expr.c      |  48 ++--
>   src/box/sql/fkey.c      | 113 +++++----
>   src/box/sql/insert.c    | 129 +++++-----
>   src/box/sql/pragma.c    |  28 +--
>   src/box/sql/select.c    |   2 +-
>   src/box/sql/sqliteInt.h |  76 +-----
>   src/box/sql/update.c    |  39 +--
>   src/box/sql/vdbeaux.c   |   2 +-
>   src/box/sql/vdbemem.c   |  12 +-
>   src/box/sql/where.c     | 162 ++++++------
>   src/box/sql/wherecode.c | 104 ++++----
>   15 files changed, 742 insertions(+), 758 deletions(-)
> 
> diff --git a/src/box/sql.c b/src/box/sql.c
> index 11353150e..24e37652e 100644
> --- a/src/box/sql.c
> +++ b/src/box/sql.c
> @@ -1452,8 +1452,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, 
> void *buf)
> 
>       /* If table's PK is single column which is INTEGER, then
>        * treat it as strict type, not affinity.  */
> -    if (pk_idx && pk_idx->nColumn == 1) {
> -        int pk = pk_idx->aiColumn[0];
> +    if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
> +        int pk = pk_idx->def->key_def->parts[0].fieldno;
>           if (def->fields[pk].type == FIELD_TYPE_INTEGER)
>               pk_forced_int = pk;
>       }
> @@ -1564,20 +1564,19 @@ tarantoolSqlite3MakeTableOpts(Table *pTable, 
> const char *zSql, char *buf)
>    */
>   int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
>   {
> -    struct space_def *def = pIndex->pTable->def;
> -    assert(def != NULL);
> +    struct field_def *fields = pIndex->pTable->def->fields;
> +    struct key_def *key_def = pIndex->def->key_def;
>       const struct Enc *enc = get_enc(buf);
> -    struct SqliteIndex *primary_index;
> -    char *base = buf, *p;
> -    int pk_forced_int = -1;
> -
> -    primary_index = sqlite3PrimaryKeyIndex(pIndex->pTable);
> +    char *base = buf;
> +    uint32_t pk_forced_int = UINT32_MAX;
> +    struct SqliteIndex *primary_index =
> +        sqlite3PrimaryKeyIndex(pIndex->pTable);
> 
>       /* If table's PK is single column which is INTEGER, then
>        * treat it as strict type, not affinity.  */
> -    if (primary_index->nColumn == 1) {
> -        int pk = primary_index->aiColumn[0];
> -        if (def->fields[pk].type == FIELD_TYPE_INTEGER)
> +    if (primary_index->def->key_def->part_count == 1) {
> +        int pk = primary_index->def->key_def->parts[0].fieldno;
> +        if (fields[pk].type == FIELD_TYPE_INTEGER)
>               pk_forced_int = pk;
>       }
> 
> @@ -1587,46 +1586,45 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex 
> *pIndex, void *buf)
>        * primary key columns. Query planner depends on this particular
>        * data layout.
>        */
> -    int i, n = pIndex->nColumn;
> -
> -    p = enc->encode_array(base, n);
> -    for (i = 0; i < n; i++) {
> -        int col = pIndex->aiColumn[i];
> -        assert(def->fields[col].is_nullable ==
> - action_is_nullable(def->fields[col].nullable_action));
> +    struct key_part *part = key_def->parts;
> +    char *p = enc->encode_array(base, key_def->part_count);
> +    for (uint32_t i = 0; i < key_def->part_count; ++i, ++part) {
> +        uint32_t col = part->fieldno;
> +        assert(fields[col].is_nullable ==
> +               action_is_nullable(fields[col].nullable_action));
>           const char *t;
>           if (pk_forced_int == col) {
>               t = "integer";
>           } else {
> -            enum affinity_type affinity = def->fields[col].affinity;
> -            t = convertSqliteAffinity(affinity,
> -                          def->fields[col].is_nullable);
> +            t = convertSqliteAffinity(fields[col].affinity,
> +                          fields[col].is_nullable);
>           }
>           /* do not decode default collation */
> -        uint32_t cid = pIndex->coll_id_array[i];
> +        uint32_t cid = part->coll_id;
>           p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
>           p = enc->encode_str(p, "type", sizeof("type")-1);
>           p = enc->encode_str(p, t, strlen(t));
>           p = enc->encode_str(p, "field", sizeof("field")-1);
>           p = enc->encode_uint(p, col);
>           if (cid != COLL_NONE) {
> -            p = enc->encode_str(p, "collation", sizeof("collation")-1);
> +            p = enc->encode_str(p, "collation",
> +                        sizeof("collation") - 1);
>               p = enc->encode_uint(p, cid);
>           }
>           p = enc->encode_str(p, "is_nullable", 11);
> -        p = enc->encode_bool(p, def->fields[col].is_nullable);
> +        p = enc->encode_bool(p, fields[col].is_nullable);
>           p = enc->encode_str(p, "nullable_action", 15);
>           const char *action_str =
> - on_conflict_action_strs[def->fields[col].nullable_action];
> +            on_conflict_action_strs[fields[col].nullable_action];
>           p = enc->encode_str(p, action_str, strlen(action_str));
> 
>           p = enc->encode_str(p, "sort_order", 10);
> -        enum sort_order sort_order = pIndex->sort_order[i];
> +        enum sort_order sort_order = part->sort_order;
>           assert(sort_order < sort_order_MAX);
>           const char *sort_order_str = sort_order_strs[sort_order];
>           p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
>       }
> -    return (int)(p - base);
> +    return p - base;
>   }
> 
>   /*
> diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
> index 5f73f026e..1e9d8ea36 100644
> --- a/src/box/sql/analyze.c
> +++ b/src/box/sql/analyze.c
> @@ -848,8 +848,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>       for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
>           int addrRewind;    /* Address of "OP_Rewind iIdxCur" */
>           int addrNextRow;    /* Address of "next_row:" */
> -        const char *zIdxName;    /* Name of the index */
> -        int nColTest;    /* Number of columns to test for changes */
> +        const char *idx_name;    /* Name of the index */
> 
>           if (pOnlyIdx && pOnlyIdx != pIdx)
>               continue;
> @@ -857,17 +856,16 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>            * names. Thus, for the sake of clarity, use
>            * instead more familiar table name.
>            */
> -        if (IsPrimaryKeyIndex(pIdx)) {
> -            zIdxName = pTab->def->name;
> -        } else {
> -            zIdxName = pIdx->zName;
> -        }
> -        nColTest = index_column_count(pIdx);
> +        if (IsPrimaryKeyIndex(pIdx))
> +            idx_name = pTab->def->name;
> +        else
> +            idx_name = pIdx->def->name;
> +        int part_count = pIdx->def->key_def->part_count;
> 
>           /* Populate the register containing the index name. */
> -        sqlite3VdbeLoadString(v, regIdxname, zIdxName);
> +        sqlite3VdbeLoadString(v, regIdxname, idx_name);
>           VdbeComment((v, "Analysis for %s.%s", pTab->def->name,
> -            zIdxName));
> +                idx_name));
> 
>           /*
>            * Pseudo-code for loop that calls stat_push():
> @@ -906,7 +904,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>            * when building a record to insert into the sample column of
>            * the _sql_stat4 table).
>            */
> -        pParse->nMem = MAX(pParse->nMem, regPrev + nColTest);
> +        pParse->nMem = MAX(pParse->nMem, regPrev + part_count);
> 
>           /* Open a read-only cursor on the index being analyzed. */
>           struct space *space =
> @@ -917,7 +915,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>           sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
>                     space_ptr_reg);
>           sql_vdbe_set_p4_key_def(pParse, pIdx);
> -        VdbeComment((v, "%s", pIdx->zName));
> +        VdbeComment((v, "%s", pIdx->def->name));
> 
>           /* Invoke the stat_init() function. The arguments are:
>            *
> @@ -930,8 +928,8 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>            * The third argument is only used for STAT4
>            */
>           sqlite3VdbeAddOp2(v, OP_Count, iIdxCur, regStat4 + 3);
> -        sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 1);
> -        sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 2);
> +        sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 1);
> +        sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 2);
>           sqlite3VdbeAddOp4(v, OP_Function0, 0, regStat4 + 1, regStat4,
>                     (char *)&statInitFuncdef, P4_FUNCDEF);
>           sqlite3VdbeChangeP5(v, 3);
> @@ -949,11 +947,11 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>           sqlite3VdbeAddOp2(v, OP_Integer, 0, regChng);
>           addrNextRow = sqlite3VdbeCurrentAddr(v);
> 
> -        if (nColTest > 0) {
> +        if (part_count > 0) {
>               int endDistinctTest = sqlite3VdbeMakeLabel(v);
>               int *aGotoChng;    /* Array of jump instruction addresses */
>               aGotoChng =
> -                sqlite3DbMallocRawNN(db, sizeof(int) * nColTest);
> +                sqlite3DbMallocRawNN(db, sizeof(int) * part_count);
>               if (aGotoChng == 0)
>                   continue;
> 
> @@ -969,7 +967,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>                */
>               sqlite3VdbeAddOp0(v, OP_Goto);
>               addrNextRow = sqlite3VdbeCurrentAddr(v);
> -            if (nColTest == 1 && index_is_unique(pIdx)) {
> +            if (part_count == 1 && pIdx->def->opts.is_unique) {
>                   /* For a single-column UNIQUE index, once we have 
> found a non-NULL
>                    * row, we know that all the rest will be distinct, so 
> skip
>                    * subsequent distinctness tests.
> @@ -978,13 +976,12 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>                             endDistinctTest);
>                   VdbeCoverage(v);
>               }
> -            for (i = 0; i < nColTest; i++) {
> -                uint32_t id;
> -                struct coll *coll =
> -                    sql_index_collation(pIdx, i, &id);
> +            struct key_part *part = pIdx->def->key_def->parts;
> +            for (i = 0; i < part_count; ++i, ++part) {
> +                struct coll *coll = part->coll;
>                   sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
>                   sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
> -                          pIdx->aiColumn[i], regTemp);
> +                          part->fieldno, regTemp);
>                   aGotoChng[i] =
>                       sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
>                                 regPrev + i, (char *)coll,
> @@ -992,7 +989,7 @@ analyzeOneTable(Parse * pParse,    /* Parser context */
>                   sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
>                   VdbeCoverage(v);
>               }
> -            sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regChng);
> +            sqlite3VdbeAddOp2(v, OP_Integer, part_count, regChng);
>               sqlite3VdbeGoto(v, endDistinctTest);
> 
>               /*
> @@ -1003,10 +1000,11 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>                *  ...
>                */
>               sqlite3VdbeJumpHere(v, addrNextRow - 1);
> -            for (i = 0; i < nColTest; i++) {
> +            for (i = 0; i < part_count; i++) {
>                   sqlite3VdbeJumpHere(v, aGotoChng[i]);
>                   sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
> -                          pIdx->aiColumn[i],
> +                          pIdx->def->key_def->
> +                              parts[i].fieldno,
>                             regPrev + i);
>               }
>               sqlite3VdbeResolveLabel(v, endDistinctTest);
> @@ -1022,19 +1020,18 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>            */
>           assert(regKey == (regStat4 + 2));
>           Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
> -        int j, k, regKeyStat;
> -        int nPkColumn = (int)index_column_count(pPk);
> -        regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
> -        for (j = 0; j < nPkColumn; j++) {
> -            k = pPk->aiColumn[j];
> -            assert(k >= 0 && k < (int)pTab->def->field_count);
> -            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat + j);
> -            VdbeComment((v, "%s",
> - pTab->def->fields[pPk->aiColumn[j]].name));
> +        int pk_part_count = (int) pPk->def->key_def->part_count;
> +        int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
> +        for (int j = 0; j < pk_part_count; ++j) {
> +            int k = pPk->def->key_def->parts[j].fieldno;
> +            assert(k >= 0 && k < (int) pTab->def->field_count);
> +            sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
> +                      regKeyStat + j);
> +            VdbeComment((v, "%s", pTab->def->fields[k].name));
>           }
>           sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
> -                  nPkColumn, regKey);
> -        sqlite3ReleaseTempRange(pParse, regKeyStat, nPkColumn);
> +                  pk_part_count, regKey);
> +        sqlite3ReleaseTempRange(pParse, regKeyStat, pk_part_count);
> 
>           assert(regChng == (regStat4 + 1));
>           sqlite3VdbeAddOp4(v, OP_Function0, 1, regStat4, regTemp,
> @@ -1057,11 +1054,11 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>           int regDLt = regStat1 + 2;
>           int regSample = regStat1 + 3;
>           int regCol = regStat1 + 4;
> -        int regSampleKey = regCol + nColTest;
> +        int regSampleKey = regCol + part_count;
>           int addrNext;
>           int addrIsNull;
> 
> -        pParse->nMem = MAX(pParse->nMem, regCol + nColTest);
> +        pParse->nMem = MAX(pParse->nMem, regCol + part_count);
> 
>           addrNext = sqlite3VdbeCurrentAddr(v);
>           callStatGet(v, regStat4, STAT_GET_KEY, regSampleKey);
> @@ -1077,12 +1074,12 @@ analyzeOneTable(Parse * pParse,    /* Parser 
> context */
>            * be taken
>            */
>           VdbeCoverageNeverTaken(v);
> -        for (i = 0; i < nColTest; i++) {
> +        for (i = 0; i < part_count; i++) {
>               sqlite3ExprCodeLoadIndexColumn(pParse, pIdx,
>                                        iTabCur, i,
>                                        regCol + i);
>           }
> -        sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nColTest,
> +        sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, part_count,
>                     regSample);
>           sqlite3VdbeAddOp3(v, OP_MakeRecord, regTabname, 6, regTemp);
>           sqlite3VdbeAddOp2(v, OP_IdxReplace, iStatCur + 1, regTemp);
> @@ -1146,7 +1143,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index * 
> pOnlyIdx)
>       iStatCur = pParse->nTab;
>       pParse->nTab += 3;
>       if (pOnlyIdx) {
> -        openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
> +        openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
>       } else {
>           openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
>       }
> diff --git a/src/box/sql/build.c b/src/box/sql/build.c
> index 0da7d805b..2c82644e6 100644
> --- a/src/box/sql/build.c
> +++ b/src/box/sql/build.c
> @@ -241,6 +241,8 @@ static void
>   freeIndex(sqlite3 * db, Index * p)
>   {
>       sql_expr_delete(db, p->pPartIdxWhere, false);
> +    if (p->def != NULL)
> +        index_def_delete(p->def);
>       sqlite3DbFree(db, p->zColAff);
>       sqlite3DbFree(db, p);
>   }
> @@ -259,7 +261,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index * 
> pIndex)
> 
>       struct session *user_session = current_session();
> 
> -    pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
> +    pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
> +                   pIndex->def->name, 0);
>       if (ALWAYS(pIndex)) {
>           if (pIndex->pTable->pIndex == pIndex) {
>               pIndex->pTable->pIndex = pIndex->pNext;
> @@ -376,7 +379,7 @@ deleteTable(sqlite3 * db, Table * pTable)
>           pNext = pIndex->pNext;
>           assert(pIndex->pSchema == pTable->pSchema);
>           if ((db == 0 || db->pnBytesFreed == 0)) {
> -            char *zName = pIndex->zName;
> +            char *zName = pIndex->def->name;
>               TESTONLY(Index *
>                    pOld =) sqlite3HashInsert(&pTable->idxHash,
>                                  zName, 0);
> @@ -1041,7 +1044,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>       Table *p = pParse->pNewTable;
>       if (p == NULL)
>           return;
> -    int i = p->def->field_count - 1;
> +    uint32_t i = p->def->field_count - 1;
>       sqlite3 *db = pParse->db;
>       char *zColl = sqlite3NameFromToken(db, pToken);
>       if (!zColl)
> @@ -1049,22 +1052,20 @@ sqlite3AddCollateType(Parse * pParse, Token * 
> pToken)
>       uint32_t *id = &p->def->fields[i].coll_id;
>       p->aCol[i].coll = sql_get_coll_seq(pParse, zColl, id);
>       if (p->aCol[i].coll != NULL) {
> -        Index *pIdx;
>           /* If the column is declared as "<name> PRIMARY KEY COLLATE 
> <type>",
>            * then an index may have been created on this column before the
>            * collation type was added. Correct this if it is the case.
>            */
> -        for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
> -            assert(pIdx->nColumn == 1);
> -            if (pIdx->aiColumn[0] == i) {
> -                id = &pIdx->coll_id_array[0];
> -                pIdx->coll_array[0] =
> +        for (struct Index *pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
> +            assert(pIdx->def->key_def->part_count == 1);
> +            if (pIdx->def->key_def->parts[0].fieldno == i) {
> +                pIdx->def->key_def->parts[0].coll_id = *id;
> +                pIdx->def->key_def->parts[0].coll =
>                       sql_column_collation(p->def, i, id);
>               }
>           }
> -    } else {
> -        sqlite3DbFree(db, zColl);
>       }
> +    sqlite3DbFree(db, zColl);
>   }
> 
>   struct coll *
> @@ -1094,66 +1095,6 @@ sql_column_collation(struct space_def *def, 
> uint32_t column, uint32_t *coll_id)
>       return space->format->fields[column].coll;
>   }
> 
> -struct key_def*
> -sql_index_key_def(struct Index *idx)
> -{
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
> -    struct space *space = space_by_id(space_id);
> -    assert(space != NULL);
> -    struct index *index = space_index(space, index_id);
> -    assert(index != NULL && index->def != NULL);
> -    return index->def->key_def;
> -}
> -
> -struct coll *
> -sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
> -{
> -    assert(idx != NULL);
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
> -    struct space *space = space_by_id(space_id);
> -
> -    assert(column < idx->nColumn);
> -    /*
> -     * If space is still under construction, or it is
> -     * an ephemeral space, then fetch collation from
> -     * SQL internal structure.
> -     */
> -    if (space == NULL) {
> -        assert(column < idx->nColumn);
> -        *coll_id = idx->coll_id_array[column];
> -        return idx->coll_array[column];
> -    }
> -
> -    struct key_def *key_def = sql_index_key_def(idx);
> -    assert(key_def != NULL && key_def->part_count >= column);
> -    *coll_id = key_def->parts[column].coll_id;
> -    return key_def->parts[column].coll;
> -}
> -
> -enum sort_order
> -sql_index_column_sort_order(Index *idx, uint32_t column)
> -{
> -    assert(idx != NULL);
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
> -    struct space *space = space_by_id(space_id);
> -
> -    assert(column < idx->nColumn);
> -    /*
> -     * If space is still under construction, or it is
> -     * an ephemeral space, then fetch collation from
> -     * SQL internal structure.
> -     */
> -    if (space == NULL) {
> -        assert(column < idx->nColumn);
> -        return idx->sort_order[column];
> -    }
> -
> -    struct key_def *key_def = sql_index_key_def(idx);
> -    assert(key_def != NULL && key_def->part_count >= column);
> -    return key_def->parts[column].sort_order;
> -}
> -
>   struct ExprList *
>   space_checks_expr_list(uint32_t space_id)
>   {
> @@ -1337,15 +1278,26 @@ createTableStmt(sqlite3 * db, Table * p)
>       return zStmt;
>   }
> 
> -/* Return true if value x is found any of the first nCol entries of aiCol[]
> +/**
> + * Look for duplicates for the given index key part in index key
> + * prefix. Prefix is formed the following way:
> + * (first_key_part, ..., key_part_just_before_given). Index key
> + * parts are duplicates of each other if their 'fieldno' are the
> + * same.
> + *
> + * @param key_parts index key parts.
> + * @param part_index index of part, for which we look for
> + *       duplicates.
> + * @retval is there a duplicate part in prefix or not.
>    */
> -static int
> -hasColumn(const i16 * aiCol, int nCol, int x)
> +static bool
> +is_part_duplicated(const struct key_part *key_parts, uint32_t part_index)
>   {
> -    while (nCol-- > 0)
> -        if (x == *(aiCol++))
> -            return 1;
> -    return 0;
> +    for (uint32_t i = 0; i < part_index; i++) {
> +         if (key_parts[i].fieldno == key_parts[part_index].fieldno)
> +             return true;
> +    }
> +    return false;
>   }
> 
>   /*
> @@ -1364,13 +1316,12 @@ static void
>   convertToWithoutRowidTable(Parse * pParse, Table * pTab)
>   {
>       Index *pPk;
> -    int i, j;
>       sqlite3 *db = pParse->db;
> 
>       /* Mark every PRIMARY KEY column as NOT NULL (except for imposter 
> tables)
>        */
>       if (!db->init.imposterTable) {
> -        for (i = 0; i < (int)pTab->def->field_count; i++) {
> +        for (uint32_t i = 0; i < pTab->def->field_count; i++) {
>               if (pTab->aCol[i].is_primkey) {
>                   pTab->def->fields[i].nullable_action
>                       = ON_CONFLICT_ACTION_ABORT;
> @@ -1404,18 +1355,33 @@ convertToWithoutRowidTable(Parse * pParse, Table 
> * pTab)
>           pPk = sqlite3PrimaryKeyIndex(pTab);
> 
>           /*
> -         * Remove all redundant columns from the PRIMARY KEY. For 
> example, change
> -         * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY 
> KEY(a,b,c,d)".  Later
> -         * code assumes the PRIMARY KEY contains no repeated columns.
> +         * Remove all redundant columns from the PRIMARY
> +         * KEY. For example, change
> +         * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just
> +         * "PRIMARY KEY(a,b,c,d)".  Later code assumes the
> +         * PRIMARY KEY contains no repeated columns.
>            */
> -        for (i = j = 1; i < pPk->nColumn; i++) {
> -            if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
> -                pPk->nColumn--;
> -            } else {
> -                pPk->aiColumn[j++] = pPk->aiColumn[i];
> +
> +        struct key_part *parts = pPk->def->key_def->parts;
> +        uint32_t part_count = pPk->def->key_def->part_count;
> +        uint32_t new_part_count = part_count;
> +
> +        for (uint32_t i = 1; i < part_count; i++) {
> +            if (is_part_duplicated(parts, i)) {
> +                new_part_count--;
> +                bool is_found = false;
> +                for (uint32_t j = i + 1; j < part_count; j++) {
> +                    if (!is_part_duplicated(parts, j)) {
> +                        parts[i] = parts[j];
> +                        is_found = true;
> +                        break;
> +                    }
> +                }
> +                if (!is_found)
> +                    break;
>               }
>           }
> -        pPk->nColumn = j;
> +        pPk->def->key_def->part_count = new_part_count;
>       }
>       assert(pPk != 0);
>   }
> @@ -1497,7 +1463,7 @@ createIndex(Parse * pParse, Index * pIndex, int 
> iSpaceId, int iIndexId,
>       }
>       sqlite3VdbeAddOp4(v,
>                 OP_String8, 0, iFirstCol + 2, 0,
> -              sqlite3DbStrDup(pParse->db, pIndex->zName),
> +              sqlite3DbStrDup(pParse->db, pIndex->def->name),
>                 P4_DYNAMIC);
>       sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
>                 P4_STATIC);
> @@ -1534,7 +1500,7 @@ makeIndexSchemaRecord(Parse * pParse,
> 
>       sqlite3VdbeAddOp4(v,
>                 OP_String8, 0, iFirstCol, 0,
> -              sqlite3DbStrDup(pParse->db, pIndex->zName),
> +              sqlite3DbStrDup(pParse->db, pIndex->def->name),
>                 P4_DYNAMIC);
> 
>       if (pParse->pNewTable) {
> @@ -2463,15 +2429,16 @@ sqlite3RefillIndex(Parse * pParse, Index * 
> pIndex, int memRootPage)
>       } else {
>           tnum = pIndex->tnum;
>       }
> -    struct key_def *def = key_def_dup(sql_index_key_def(pIndex));
> +    struct key_def *def = key_def_dup(pIndex->def->key_def);
>       if (def == NULL) {
>           sqlite3OomFault(db);
>           return;
>       }
>       /* Open the sorter cursor if we are to use one. */
>       iSorter = pParse->nTab++;
> -    sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
> -              (char *)def, P4_KEYDEF);
> +    sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
> +              pIndex->def->key_def->part_count, (char *)def,
> +              P4_KEYDEF);
> 
>       /* Open the table. Loop through all rows of the table, inserting index
>        * records into the sorter.
> @@ -2502,7 +2469,8 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, 
> int memRootPage)
>           sqlite3VdbeGoto(v, j2);
>           addr2 = sqlite3VdbeCurrentAddr(v);
>           sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
> -                     regRecord, pIndex->nColumn);
> +                     regRecord,
> +                     pIndex->def->key_def->part_count);
>           VdbeCoverage(v);
>           parser_emit_unique_constraint(pParse, ON_CONFLICT_ACTION_ABORT,
>                             pIndex);
> @@ -2531,34 +2499,20 @@ sqlite3RefillIndex(Parse * pParse, Index * 
> pIndex, int memRootPage)
>    */
>   Index *
>   sqlite3AllocateIndexObject(sqlite3 * db,    /* Database connection */
> -               i16 nCol,    /* Total number of columns in the index */
> -               int nExtra,    /* Number of bytes of extra space to alloc */
> -               char **ppExtra    /* Pointer to the "extra" space */
> +               i16 nCol    /* Total number of columns in the index */
>       )
>   {
>       Index *p;        /* Allocated index object */
>       int nByte;        /* Bytes of space for Index object + arrays */
> 
>       nByte = ROUND8(sizeof(Index)) +            /* Index structure   */
> -        ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
> -        ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
> -        ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
> -           sizeof(i16) * nCol +            /* Index.aiColumn */
> -           sizeof(enum sort_order) * nCol); /* Index.sort_order */
> -    p = sqlite3DbMallocZero(db, nByte + nExtra);
> +        ROUND8(sizeof(LogEst) * (nCol + 1));    /* Index.aiRowLogEst */
> +    p = sqlite3DbMallocZero(db, nByte);
>       if (p) {
>           char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
> -        p->coll_array = (struct coll **)pExtra;
> -        pExtra += ROUND8(sizeof(struct coll **) * nCol);
> -        p->coll_id_array = (uint32_t *) pExtra;
> -        pExtra += ROUND8(sizeof(uint32_t) * nCol);
>           p->aiRowLogEst = (LogEst *) pExtra;
>           pExtra += sizeof(LogEst) * (nCol + 1);
> -        p->aiColumn = (i16 *) pExtra;
>           pExtra += sizeof(i16) * nCol;
> -        p->sort_order = (enum sort_order *) pExtra;
> -        p->nColumn = nCol;
> -        *ppExtra = ((char *)p) + nByte;
>       }
>       return p;
>   }
> @@ -2646,18 +2600,142 @@ addIndexToTable(Index * pIndex, Table * pTab)
>       }
>   }
> 
> -bool
> -index_is_unique(Index *idx)
> -{
> -    assert(idx != NULL);
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
> -    struct space *space = space_by_id(space_id);
> -    assert(space != NULL);
> -    struct index *tnt_index = space_index(space, index_id);
> -    assert(tnt_index != NULL);
> +/**
> + * Allocate memory on parser region and copy given string (part of the
> + * sql statement) into the allocated memory. Add size of the
> + * given string to 'total_sql_size' in order to keep size of sql
> + * statement string being hold.
> + *
> + * @param parse Parse context.
> + * @param str String (a part of sql statement) to be copied.
> + * @param total_sql_size Current size of the whole sql statement.
> + */
> +static void
> +append_string_part(Parse *parse, const char *str, size_t *total_sql_size)
> +{
> +    const size_t str_len = strlen(str);
> +    char * str_part = region_alloc(&parse->region, str_len);
> +    if (str_part == NULL) {
> +        diag_set(OutOfMemory, str_len,
> +             "region_alloc", "str_part");
> +        parse->rc = SQL_TARANTOOL_ERROR;
> +        parse->nErr++;
> +        return;
> +    }
> +    memcpy(str_part, str, str_len);
> +    *total_sql_size += str_len;
> +}
> +
> +static void
> +set_index_def(Parse *parse, Index *index, Table *table, uint32_t iid,
> +          const char *name, uint32_t name_len, int on_error,
> +          struct ExprList *expr_list, u8 idx_type)
> +{
> +    struct space_def *space_def = table->def;
> +    struct index_opts opts;
> +    index_opts_create(&opts);
> +    opts.is_unique = on_error != ON_CONFLICT_ACTION_NONE;
> +
> +    struct key_def *key_def = key_def_new(expr_list->nExpr);
> +    if (key_def == NULL) {
> +        parse->rc = SQL_TARANTOOL_ERROR;
> +        parse->nErr++;
> +        goto cleanup;
> +    }
> +
> +    /* Build initial parts of SQL statement.  */
> 
> -    return tnt_index->def->opts.is_unique;
> +    size_t total_sql_size = 0;
> +
> +    if (idx_type == SQLITE_IDXTYPE_APPDEF) {
> +        append_string_part(parse, "CREATE INDEX ", &total_sql_size);
> +        append_string_part(parse, name, &total_sql_size);
> +        append_string_part(parse, " ON ", &total_sql_size);
> +        append_string_part(parse, space_def->name, &total_sql_size);
> +        append_string_part(parse, " (", &total_sql_size);
> +    }
> +
> +    for (int i = 0; i < expr_list->nExpr; i++) {
> +        Expr *expr = expr_list->a[i].pExpr;
> +        sql_resolve_self_reference(parse, table, NC_IdxExpr, expr, 0);
> +        if (parse->nErr > 0)
> +            goto cleanup;
> +
> +        Expr *column_expr = sqlite3ExprSkipCollate(expr);
> +        if (column_expr->op != TK_COLUMN) {
> +            sqlite3ErrorMsg(parse,
> +                    "functional indexes aren't supported "
> +                    "in the current version");
> +            goto cleanup;
> +        }
> +
> +        uint32_t fieldno = column_expr->iColumn;
> +        uint32_t coll_id;
> +        struct coll *coll;
> +        if (expr->op == TK_COLLATE) {
> +            coll = sql_get_coll_seq(parse, expr->u.zToken,
> +                        &coll_id);
> +
> +            if (idx_type == SQLITE_IDXTYPE_APPDEF) {
> +                const char *column_name =
> +                    column_expr->u.zToken;
> +                append_string_part(parse, column_name,
> +                           &total_sql_size);
> +                append_string_part(parse, " COLLATE ",
> +                           &total_sql_size);
> +                const char *coll_name = expr->u.zToken;
> +                append_string_part(parse, coll_name,
> +                           &total_sql_size);
> +                append_string_part(parse, ", ",
> +                           &total_sql_size);
> +            }
> +        } else {
> +            coll = sql_column_collation(space_def, fieldno,
> +                            &coll_id);
> +            if (idx_type == SQLITE_IDXTYPE_APPDEF) {
> +                const char *column_name =
> +                    column_expr->u.zToken;
> +                append_string_part(parse, column_name,
> +                           &total_sql_size);
> +                append_string_part(parse, ", ",
> +                           &total_sql_size);
> +            }
> +        }
> +
> +        /*
> +        * Tarantool: DESC indexes are not supported so far.
> +        * See gh-3016.
> +        */
> +        key_def_set_part(key_def, i, fieldno,
> +                 space_def->fields[fieldno].type,
> +                 space_def->fields[fieldno].nullable_action,
> +                 coll, coll_id, SORT_ORDER_ASC);
> +    }
> +
> +    if (parse->nErr > 0) {
> +        index->def = NULL;
> +        goto cleanup;
> +    }
> +
> +    if (idx_type == SQLITE_IDXTYPE_APPDEF) {
> +        opts.sql = region_join(&parse->region, total_sql_size);
> +        /*
> +         * fix last ", " with ")\0" to finish statement.
> +         */
> +        memcpy(&opts.sql[total_sql_size - 2], ")\0", 2);
> +    }
> +
> +    struct key_def *pk_key_def;
> +    if (idx_type == SQLITE_IDXTYPE_APPDEF)
> +        pk_key_def = table->pIndex->def->key_def;
> +    else
> +        pk_key_def = NULL;
> +
> +    index->def = index_def_new(space_def->id, iid, name, name_len,
> +                   TREE, &opts, key_def, pk_key_def);
> + cleanup:
> +    if (key_def != NULL)
> +        key_def_delete(key_def);
>   }
> 
>   void
> @@ -2666,26 +2744,26 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>            int on_error, struct Token *start, struct Expr *where,
>            enum sort_order sort_order, bool if_not_exist, u8 idx_type)
>   {
> -    Table *pTab = 0;    /* Table to be indexed */
> -    Index *pIndex = 0;    /* The index to be created */
> -    char *zName = 0;    /* Name of the index */
> -    int nName;        /* Number of characters in zName */
> -    int i, j;
> -    DbFixer sFix;        /* For assigning database names to pTable */
> +    /* Table to be indexed.  */
> +    Table *table = NULL;
> +    /* The index to be created.  */
> +    Index *index = NULL;
> +    /* Name of the index  */
> +    char *name = NULL;
> +    int name_len;
>       sqlite3 *db = parse->db;
> -    struct ExprList_item *col_listItem;    /* For looping over col_list */
> -    int nExtra = 0;        /* Space allocated for zExtra[] */
> -    char *zExtra = 0;    /* Extra space after the Index object */
>       struct session *user_session = current_session();
> 
>       if (db->mallocFailed || parse->nErr > 0) {
>           goto exit_create_index;
>       }
> -    /* Do not account nested operations: the count of such
> -     * operations depends on Tarantool data dictionary internals,
> -     * such as data layout in system spaces. Also do not account
> -     * PRIMARY KEY and UNIQUE constraint - they had been accounted
> -     * in CREATE TABLE already.
> +
> +    /*
> +     * Do not account nested operations: the count of such
> +     * operations depends on Tarantool data dictionary
> +     * internals, such as data layout in system spaces. Also
> +     * do not account PRIMARY KEY and UNIQUE constraint - they
> +     * had been accounted in CREATE TABLE already.
>        */
>       if (!parse->nested && idx_type == SQLITE_IDXTYPE_APPDEF) {
>           Vdbe *v = sqlite3GetVdbe(parse);
> @@ -2696,39 +2774,46 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>       assert(db->pSchema != NULL);
> 
>       /*
> -     * Find the table that is to be indexed.  Return early if not found.
> +     * Find the table that is to be indexed.
> +     * Return early if not found.
>        */
>       if (tbl_name != NULL) {
> 
> -        /* Use the two-part index name to determine the database
> -         * to search for the table. 'Fix' the table name to this db
> -         * before looking up the table.
> +        /*
> +         * Use the two-part index name to determine the
> +         * database to search for the table. 'Fix' the
> +         * table name to this db before looking up the
> +         * table.
>            */
>           assert(token && token->z);
> 
> -        sqlite3FixInit(&sFix, parse, "index", token);
> -        if (sqlite3FixSrcList(&sFix, tbl_name)) {
> -            /* Because the parser constructs tbl_name from a single 
> identifier,
> +        DbFixer db_fixer;
> +        sqlite3FixInit(&db_fixer, parse, "index", token);
> +        if (sqlite3FixSrcList(&db_fixer, tbl_name)) {
> +
> +            /*
> +             * Because the parser constructs tbl_name
> +             * from a single identifier,
>                * sqlite3FixSrcList can never fail.
>                */
>               assert(0);
>           }
> -        pTab = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
> -        assert(db->mallocFailed == 0 || pTab == 0);
> -        if (pTab == 0)
> +        table = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
> +        assert(db->mallocFailed == 0 || table == NULL);
> +        if (table == NULL)
>               goto exit_create_index;
> -        sqlite3PrimaryKeyIndex(pTab);
> +        sqlite3PrimaryKeyIndex(table);
>       } else {
>           assert(token == NULL);
>           assert(start == NULL);
> -        pTab = parse->pNewTable;
> -        if (!pTab)
> +        table = parse->pNewTable;
> +        if (table == NULL)
>               goto exit_create_index;
>       }
> 
> -    assert(pTab != 0);
> +    assert(table != NULL);
>       assert(parse->nErr == 0);
> -    if (pTab->def->opts.is_view) {
> +    if (table->def->opts.is_view) {
>           sqlite3ErrorMsg(parse, "views may not be indexed");
>           goto exit_create_index;
>       }
> @@ -2747,24 +2832,24 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>        * our own name.
>        */
>       if (token) {
> -        zName = sqlite3NameFromToken(db, token);
> -        if (zName == 0)
> +        name = sqlite3NameFromToken(db, token);
> +        if (name == NULL)
>               goto exit_create_index;
>           assert(token->z != 0);
>           if (!db->init.busy) {
> -            if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
> +            if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
>                   NULL) {
>                   sqlite3ErrorMsg(parse,
>                           "there is already a table named %s",
> -                        zName);
> +                        name);
>                   goto exit_create_index;
>               }
>           }
> -        if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
> +        if (sqlite3HashFind(&table->idxHash, name) != NULL) {
>               if (!if_not_exist) {
>                   sqlite3ErrorMsg(parse,
>                           "index %s.%s already exists",
> -                        pTab->def->name, zName);
> +                        table->def->name, name);
>               } else {
>                   assert(!db->init.busy);
>               }
> @@ -2773,15 +2858,13 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>       } else {
>           int n;
>           Index *pLoop;
> -        for (pLoop = pTab->pIndex, n = 1; pLoop;
> +        for (pLoop = table->pIndex, n = 1; pLoop;
>                pLoop = pLoop->pNext, n++) {
>           }
> -        zName =
> -            sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", pTab->def->name,
> -                   n);
> -        if (zName == 0) {
> +        name = sqlite3MPrintf(db, "sqlite_autoindex_%s_%d",
> +                      table->def->name, n);
> +        if (name == NULL)
>               goto exit_create_index;
> -        }
>       }
> 
>       /*
> @@ -2792,8 +2875,8 @@ sql_create_index(struct Parse *parse, struct Token 
> *token,
>        */
>       if (col_list == NULL) {
>           Token prevCol;
> -        uint32_t last_field = pTab->def->field_count - 1;
> -        sqlite3TokenInit(&prevCol, pTab->def->fields[last_field].name);
> +        uint32_t last_field = table->def->field_count - 1;
> +        sqlite3TokenInit(&prevCol, table->def->fields[last_field].name);
>           col_list = sql_expr_list_append(parse->db, NULL,
>                           sqlite3ExprAlloc(db, TK_ID,
>                                    &prevCol, 0));
> @@ -2805,33 +2888,26 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>           sqlite3ExprListCheckLength(parse, col_list, "index");
>       }
> 
> -    /* Figure out how many bytes of space are required to store explicitly
> -     * specified collation sequence names.
> -     */
> -    for (i = 0; i < col_list->nExpr; i++) {
> -        Expr *pExpr = col_list->a[i].pExpr;
> -        assert(pExpr != 0);
> -        if (pExpr->op == TK_COLLATE) {
> -            nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
> -        }
> -    }
> +    /* Allocate the index structure.  */
> +    name_len = sqlite3Strlen30(name);
> 
> -    /*
> -     * Allocate the index structure.
> -     */
> -    nName = sqlite3Strlen30(zName);
> -    pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
> -                        nName + nExtra + 1, &zExtra);
> -    if (db->mallocFailed) {
> +    if (name_len > BOX_NAME_MAX) {
> +        sqlite3ErrorMsg(parse,
> +                "%s.%s exceeds indexes' names length limit",
> +                table->def->name, name);
>           goto exit_create_index;
>       }
> -    assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
> -    assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
> -    pIndex->zName = zExtra;
> -    zExtra += nName + 1;
> -    memcpy(pIndex->zName, zName, nName + 1);
> -    pIndex->pTable = pTab;
> -    pIndex->onError = (u8) on_error;
> +
> +    if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
> +        goto exit_create_index;
> +
> +    index = sqlite3AllocateIndexObject(db, col_list->nExpr);
> +    if (db->mallocFailed)
> +        goto exit_create_index;
> +
> +    assert(EIGHT_BYTE_ALIGNMENT(index->aiRowLogEst));
> +    index->pTable = table;
> +    index->onError = (u8) on_error;
>       /*
>        * Don't make difference between UNIQUE indexes made by user
>        * using CREATE INDEX statement and those created during
> @@ -2839,74 +2915,40 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>        */
>       if (idx_type == SQLITE_IDXTYPE_APPDEF &&
>           on_error != ON_CONFLICT_ACTION_NONE) {
> -        pIndex->idxType = SQLITE_IDXTYPE_UNIQUE;
> +        index->idxType = SQLITE_IDXTYPE_UNIQUE;
>       } else {
> -        pIndex->idxType = idx_type;
> +        index->idxType = idx_type;
>       }
> -    pIndex->pSchema = db->pSchema;
> -    pIndex->nColumn = col_list->nExpr;
> +    index->pSchema = db->pSchema;
>       /* Tarantool have access to each column by any index */
>       if (where) {
> -        sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
> +        sql_resolve_self_reference(parse, table, NC_PartIdx, where,
>                          NULL);
> -        pIndex->pPartIdxWhere = where;
> +        index->pPartIdxWhere = where;
>           where = NULL;
>       }
> 
> -    /* Analyze the list of expressions that form the terms of the index and
> -     * report any errors.  In the common case where the expression is 
> exactly
> -     * a table column, store that column in aiColumn[].
> -     *
> -     * TODO: Issue a warning if two or more columns of the index are 
> identical.
> -     * TODO: Issue a warning if the table primary key is used as part 
> of the
> -     * index key.
> +    /*
> +     * TODO: Issue a warning if two or more columns of the
> +     * index are identical.
> +     * TODO: Issue a warning if the table primary key is used
> +     * as part of the index key.
>        */
> -    for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
> -         i++, col_listItem++) {
> -        Expr *pCExpr;    /* The i-th index expression */
> -        sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
> -                       col_listItem->pExpr, NULL);
> -        if (parse->nErr > 0)
> -            goto exit_create_index;
> -        pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
> -        if (pCExpr->op != TK_COLUMN) {
> -            sqlite3ErrorMsg(parse,
> -                    "functional indexes aren't supported "
> -                    "in the current version");
> -            goto exit_create_index;
> -        } else {
> -            j = pCExpr->iColumn;
> -            assert(j <= 0x7fff);
> -            if (j < 0) {
> -                j = pTab->iPKey;
> -            }
> -            pIndex->aiColumn[i] = (i16) j;
> -        }
> -        struct coll *coll;
> -        uint32_t id;
> -        if (col_listItem->pExpr->op == TK_COLLATE) {
> -            const char *coll_name = col_listItem->pExpr->u.zToken;
> -            coll = sql_get_coll_seq(parse, coll_name, &id);
> 
> -            if (coll == NULL &&
> -                sqlite3StrICmp(coll_name, "binary") != 0) {
> -                goto exit_create_index;
> -            }
> -        } else if (j >= 0) {
> -            coll = sql_column_collation(pTab->def, j, &id);
> -        } else {
> -            id = COLL_NONE;
> -            coll = NULL;
> -        }
> -        pIndex->coll_array[i] = coll;
> -        pIndex->coll_id_array[i] = id;
> -
> -        /* Tarantool: DESC indexes are not supported so far.
> -         * See gh-3016.
> -         */
> -        pIndex->sort_order[i] = SORT_ORDER_ASC;
> +    uint32_t max_iid = 0;
> +    for (Index *index = table->pIndex; index; index = index->pNext) {
> +        max_iid = max_iid > index->def->iid ?
> +              max_iid : index->def->iid + 1;
>       }
> -    if (pTab == parse->pNewTable) {
> +
> +    set_index_def(parse,  index, table, max_iid, name, name_len, on_error,
> +              col_list, idx_type);
> +
> +    if (index->def == NULL ||
> +        !index_def_is_valid(index->def, table->def->name))
> +        goto exit_create_index;
> +
> +    if (table == parse->pNewTable) {
>           /* This routine has been called to create an automatic index as a
>            * result of a PRIMARY KEY or UNIQUE clause on a column 
> definition, or
>            * a PRIMARY KEY or UNIQUE clause following the column 
> definitions.
> @@ -2929,27 +2971,27 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>            * considered distinct and both result in separate indices.
>            */
>           Index *pIdx;
> -        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
> -            int k;
> +        for (pIdx = table->pIndex; pIdx; pIdx = pIdx->pNext) {
> +            uint32_t k;
>               assert(IsUniqueIndex(pIdx));
>               assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
> -            assert(IsUniqueIndex(pIndex));
> +            assert(IsUniqueIndex(index));
> 
> -            if (pIdx->nColumn != pIndex->nColumn)
> +            if (pIdx->def->key_def->part_count !=
> +                index->def->key_def->part_count)
>                   continue;
> -            for (k = 0; k < pIdx->nColumn; k++) {
> -                assert(pIdx->aiColumn[k] >= 0);
> -                if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
> +            for (k = 0; k < pIdx->def->key_def->part_count; k++) {
> +                if (pIdx->def->key_def->parts[k].fieldno !=
> + index->def->key_def->parts[k].fieldno)
>                       break;
>                   struct coll *coll1, *coll2;
> -                uint32_t id;
> -                coll1 = sql_index_collation(pIdx, k, &id);
> -                coll2 = sql_index_collation(pIndex, k, &id);
> +                coll1 = pIdx->def->key_def->parts[k].coll;
> +                coll2 = index->def->key_def->parts[k].coll;
>                   if (coll1 != coll2)
>                       break;
>               }
> -            if (k == pIdx->nColumn) {
> -                if (pIdx->onError != pIndex->onError) {
> +            if (k == pIdx->def->key_def->part_count) {
> +                if (pIdx->onError != index->onError) {
>                       /* This constraint creates the same index as a 
> previous
>                        * constraint specified somewhere in the CREATE 
> TABLE statement.
>                        * However the ON CONFLICT clauses are different. 
> If both this
> @@ -2959,15 +3001,14 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>                        */
>                       if (!
>                           (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT
> -                         || pIndex->onError ==
> +                         || index->onError ==
>                            ON_CONFLICT_ACTION_DEFAULT)) {
>                           sqlite3ErrorMsg(parse,
>                                   "conflicting ON CONFLICT clauses 
> specified",
>                                   0);
>                       }
> -                    if (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT) {
> -                        pIdx->onError = pIndex->onError;
> -                    }
> +                    if (pIdx->onError == ON_CONFLICT_ACTION_DEFAULT)
> +                        pIdx->onError = index->onError;
>                   }
>                   if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
>                       pIdx->idxType = idx_type;
> @@ -2982,14 +3023,14 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>       assert(parse->nErr == 0);
>       if (db->init.busy) {
>           Index *p;
> -        p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
> +        p = sqlite3HashInsert(&table->idxHash, index->def->name, index);
>           if (p) {
> -            assert(p == pIndex);    /* Malloc must have failed */
> +            assert(p == index);    /* Malloc must have failed */
>               sqlite3OomFault(db);
>               goto exit_create_index;
>           }
>           user_session->sql_flags |= SQLITE_InternChanges;
> -        pIndex->tnum = db->init.newTnum;
> +        index->tnum = db->init.newTnum;
>       }
> 
>       /*
> @@ -3040,14 +3081,14 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
>                          ON_CONFLICT_ACTION_NONE ? "" : " UNIQUE",
>                          n, token->z);
> 
> -        iSpaceId = SQLITE_PAGENO_TO_SPACEID(pTab->tnum);
> +        iSpaceId = SQLITE_PAGENO_TO_SPACEID(table->tnum);
>           iIndexId = getNewIid(parse, iSpaceId, iCursor);
>           sqlite3VdbeAddOp1(v, OP_Close, iCursor);
> -        createIndex(parse, pIndex, iSpaceId, iIndexId, zStmt);
> +        createIndex(parse, index, iSpaceId, iIndexId, zStmt);
> 
>           /* consumes zStmt */
>           iFirstSchemaCol =
> -            makeIndexSchemaRecord(parse, pIndex, iSpaceId, iIndexId,
> +            makeIndexSchemaRecord(parse, index, iSpaceId, iIndexId,
>                         zStmt);
> 
>           /* Reparse the schema. Code an OP_Expire
> @@ -3070,54 +3111,17 @@ sql_create_index(struct Parse *parse, struct 
> Token *token,
> 
>       if (!db->init.busy && tbl_name != NULL)
>           goto exit_create_index;
> -    addIndexToTable(pIndex, pTab);
> -    pIndex = NULL;
> +    addIndexToTable(index, table);
> +    index = NULL;
> 
>       /* Clean up before exiting */
>    exit_create_index:
> -    if (pIndex)
> -        freeIndex(db, pIndex);
> +    if (index != NULL)
> +        freeIndex(db, index);
>       sql_expr_delete(db, where, false);
>       sql_expr_list_delete(db, col_list);
>       sqlite3SrcListDelete(db, tbl_name);
> -    sqlite3DbFree(db, zName);
> -}
> -
> -/**
> - * Return number of columns in given index.
> - * If space is ephemeral, use internal
> - * SQL structure to fetch the value.
> - */
> -uint32_t
> -index_column_count(const Index *idx)
> -{
> -    assert(idx != NULL);
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
> -    struct space *space = space_by_id(space_id);
> -    /* It is impossible to find an ephemeral space by id. */
> -    if (space == NULL)
> -        return idx->nColumn;
> -
> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
> -    struct index *index = space_index(space, index_id);
> -    assert(index != NULL);
> -    return index->def->key_def->part_count;
> -}
> -
> -/** Return true if given index is unique and not nullable. */
> -bool
> -index_is_unique_not_null(const Index *idx)
> -{
> -    assert(idx != NULL);
> -    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
> -    struct space *space = space_by_id(space_id);
> -    assert(space != NULL);
> -
> -    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
> -    struct index *index = space_index(space, index_id);
> -    assert(index != NULL);
> -    return (index->def->opts.is_unique &&
> -        !index->def->key_def->is_nullable);
> +    sqlite3DbFree(db, name);
>   }
> 
>   void
> @@ -3743,9 +3747,9 @@ parser_emit_unique_constraint(struct Parse *parser,
>       const struct space_def *def = index->pTable->def;
>       StrAccum err_accum;
>       sqlite3StrAccumInit(&err_accum, parser->db, 0, 0, 200);
> -    for (int j = 0; j < index->nColumn; ++j) {
> -        assert(index->aiColumn[j] >= 0);
> -        const char *col_name = def->fields[index->aiColumn[j]].name;
> +    struct key_part *part = index->def->key_def->parts;
> +    for (uint32_t j = 0; j < index->def->key_def->part_count; ++j, 
> part++) {
> +        const char *col_name = def->fields[part->fieldno].name;
>           if (j != 0)
>               sqlite3StrAccumAppend(&err_accum, ", ", 2);
>           sqlite3XPrintf(&err_accum, "%s.%s", def->name, col_name);
> @@ -3766,11 +3770,11 @@ static bool
>   collationMatch(struct coll *coll, struct Index *index)
>   {
>       assert(coll != NULL);
> -    for (int i = 0; i < index->nColumn; i++) {
> -        uint32_t id;
> -        struct coll *idx_coll = sql_index_collation(index, i, &id);
> -        assert(idx_coll != 0 || index->aiColumn[i] < 0);
> -        if (index->aiColumn[i] >= 0 && coll == idx_coll)
> +    struct key_part *part = index->def->key_def->parts;
> +    for (uint32_t i = 0; i < index->def->key_def->part_count; i++, 
> part++) {
> +        struct coll *idx_coll = part->coll;
> +        assert(idx_coll != NULL);
> +        if (coll == idx_coll)
>               return true;
>       }
>       return false;
> diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
> index 8b13f6077..980891cb2 100644
> --- a/src/box/sql/delete.c
> +++ b/src/box/sql/delete.c
> @@ -269,11 +269,12 @@ sql_table_delete_from(struct Parse *parse, struct 
> SrcList *tab_list,
> 
>           /* Extract the primary key for the current row */
>           if (!is_view) {
> -            for (int i = 0; i < pk_len; i++) {
> +            struct key_part *part = pk_def->parts;
> +            for (int i = 0; i < pk_len; i++, part++) {
>                   struct space_def *def = space->def;
>                   sqlite3ExprCodeGetColumnOfTable(v, def,
>                                   tab_cursor,
> -                                pk_def->parts[i].fieldno,
> +                                part->fieldno,
>                                   reg_pk + i);
>               }
>           } else {
> @@ -569,13 +570,14 @@ sql_generate_index_key(struct Parse *parse, struct 
> Index *index, int cursor,
>               *part_idx_label = 0;
>           }
>       }
> -    int col_cnt = index_column_count(index);
> +    int col_cnt = index->def->key_def->part_count;
>       int reg_base = sqlite3GetTempRange(parse, col_cnt);
>       if (prev != NULL && (reg_base != reg_prev ||
>                    prev->pPartIdxWhere != NULL))
>           prev = NULL;
>       for (int j = 0; j < col_cnt; j++) {
> -        if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]) {
> +        if (prev != NULL && prev->def->key_def->parts[j].fieldno ==
> + index->def->key_def->parts[j].fieldno) {
>               /*
>                * This column was already computed by the
>                * previous index.
> diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
> index 59e7cb4fd..a960c1cc1 100644
> --- a/src/box/sql/expr.c
> +++ b/src/box/sql/expr.c
> @@ -2405,20 +2405,24 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing 
> context */
>                    pIdx = pIdx->pNext) {
>                   Bitmask colUsed; /* Columns of the index used */
>                   Bitmask mCol;    /* Mask for the current column */
> -                if (pIdx->nColumn < nExpr)
> +                uint32_t part_count = pIdx->def->key_def->
> +                    part_count;
> +                struct key_part *parts = pIdx->def->key_def->
> +                    parts;
> +                if ((int)part_count < nExpr)
>                       continue;
>                   /* Maximum nColumn is BMS-2, not BMS-1, so that we can 
> compute
>                    * BITMASK(nExpr) without overflowing
>                    */
> -                testcase(pIdx->nColumn == BMS - 2);
> -                testcase(pIdx->nColumn == BMS - 1);
> -                if (pIdx->nColumn >= BMS - 1)
> +                testcase(part_count == BMS - 2);
> +                testcase(part_count == BMS - 1);
> +                if (part_count >= BMS - 1)
>                       continue;
>                   if (mustBeUnique) {
> -                    if (pIdx->nColumn > nExpr
> -                        || (pIdx->nColumn > nExpr
> -                        && !index_is_unique(pIdx))) {
> -                            continue;    /* This index is not unique 
> over the IN RHS columns */
> +                    if ((int)part_count > nExpr
> +                        || !pIdx->def->opts.is_unique) {
> +                        /* This index is not unique over the IN RHS 
> columns */
> +                        continue;
>                       }
>                   }
> 
> @@ -2432,12 +2436,13 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing 
> context */
>                       int j;
> 
>                       for (j = 0; j < nExpr; j++) {
> -                        if (pIdx->aiColumn[j] !=
> -                            pRhs->iColumn) {
> +                        if ((int) parts[j].fieldno
> +                            != pRhs->iColumn) {
>                               continue;
>                           }
> -                        struct coll *idx_coll;
> -                        idx_coll = sql_index_collation(pIdx, j, &id);
> +
> +                        struct coll *idx_coll =
> +                                 parts[j].coll;
>                           if (pReq != NULL &&
>                               pReq != idx_coll) {
>                               continue;
> @@ -2466,18 +2471,17 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing 
> context */
>                                 0, 0, 0,
>                                 sqlite3MPrintf(db,
>                                 "USING INDEX %s FOR IN-OPERATOR",
> -                              pIdx->zName),
> +                              pIdx->def->name),
>                                 P4_DYNAMIC);
>                       struct space *space =
> space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
>                       vdbe_emit_open_cursor(pParse, iTab,
>                                     pIdx->tnum, space);
> -                    VdbeComment((v, "%s", pIdx->zName));
> +                    VdbeComment((v, "%s", pIdx->def->name));
>                       assert(IN_INDEX_INDEX_DESC ==
>                              IN_INDEX_INDEX_ASC + 1);
>                       eType = IN_INDEX_INDEX_ASC +
> -                        sql_index_column_sort_order(pIdx,
> -                                        0);
> +                        parts[0].sort_order;
> 
>                       if (prRhsHasNull) {
>   #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
> @@ -2499,7 +2503,7 @@ sqlite3FindInIndex(Parse * pParse,    /* Parsing 
> context */
>                               /* Tarantool: Check for null is performed 
> on first key of the index.  */
>                               sqlite3SetHasNullFlag(v,
>                                             iTab,
> -                                          pIdx->aiColumn[0],
> +                                          parts[0].fieldno,
>                                             *prRhsHasNull);
>                           }
>                       }
> @@ -3130,12 +3134,12 @@ sqlite3ExprCodeIN(Parse * pParse,    /* Parsing 
> and code generating context */
>           struct Index *pk = sqlite3PrimaryKeyIndex(tab);
>           assert(pk);
> 
> +        uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
>           enum affinity_type affinity =
> -            tab->def->fields[pk->aiColumn[0]].affinity;
> -        if (pk->nColumn == 1
> +            tab->def->fields[fieldno].affinity;
> +        if (pk->def->key_def->part_count == 1
>               && affinity == AFFINITY_INTEGER
> -            && pk->aiColumn[0] < nVector) {
> -            int reg_pk = rLhs + pk->aiColumn[0];
> +            && (int) fieldno < nVector) { int reg_pk = rLhs + (int)fieldno;
>               sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
>           }
>       }
> @@ -3467,7 +3471,7 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,    
> /* The parsing context */
>                      int regOut    /* Store the index column value in 
> this register */
>       )
>   {
> -    i16 iTabCol = pIdx->aiColumn[iIdxCol];
> +    i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
>       sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
>                       iTabCur, iTabCol, regOut);
>   }
> diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
> index e3fff37fe..c14a70836 100644
> --- a/src/box/sql/fkey.c
> +++ b/src/box/sql/fkey.c
> @@ -214,7 +214,6 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>                int **paiCol    /* OUT: Map of index columns in pFKey */
>       )
>   {
> -    Index *pIdx = 0;    /* Value to return via *ppIdx */
>       int *aiCol = 0;        /* Value to return via *paiCol */
>       int nCol = pFKey->nCol;    /* Number of columns in parent key */
>       char *zKey = pFKey->aCol[0].zCol;    /* Name of left-most parent 
> key column */
> @@ -256,21 +255,29 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>           *paiCol = aiCol;
>       }
> 
> -    for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
> -        int nIdxCol = index_column_count(pIdx);
> -        if (nIdxCol == nCol && index_is_unique(pIdx)
> -            && pIdx->pPartIdxWhere == 0) {
> -            /* pIdx is a UNIQUE index (or a PRIMARY KEY) and has the 
> right number
> -             * of columns. If each indexed column corresponds to a 
> foreign key
> -             * column of pFKey, then this index is a winner.
> +    Index *index = NULL;
> +    for (index = pParent->pIndex; index; index = index->pNext) {
> +        int part_count = index->def->key_def->part_count;
> +        if (part_count == nCol && index->def->opts.is_unique
> +            && index->pPartIdxWhere == 0) {
> +            /*
> +             * pIdx is a UNIQUE index (or a PRIMARY KEY)
> +             * and has the right number of columns.
> +             * If each indexed column corresponds to a
> +             * foreign key column of pFKey, then this
> +             * index is a winner.
>                */
> 
>               if (zKey == 0) {
> -                /* If zKey is NULL, then this foreign key is implicitly 
> mapped to
> -                 * the PRIMARY KEY of table pParent. The PRIMARY KEY 
> index may be
> -                 * identified by the test.
> +                /*
> +                 * If zKey is NULL, then this
> +                 * foreign key is implicitly
> +                 * mapped to the PRIMARY KEY of
> +                 * table pParent. The PRIMARY KEY
> +                 * index may be identified by the
> +                 * test.
>                    */
> -                if (IsPrimaryKeyIndex(pIdx)) {
> +                if (IsPrimaryKeyIndex(index)) {
>                       if (aiCol) {
>                           int i;
>                           for (i = 0; i < nCol; i++)
> @@ -287,9 +294,16 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>                    * the default collation sequences for each column.
>                    */
>                   int i, j;
> -                for (i = 0; i < nCol; i++) {
> -                    i16 iCol = pIdx->aiColumn[i];    /* Index of column 
> in parent tbl */
> -                    char *zIdxCol;    /* Name of indexed column */
> +                struct key_part *part =
> +                    index->def->key_def->parts;
> +                for (i = 0; i < nCol; i++, part++) {
> +                    /*
> +                     * Index of column in
> +                     * parent table.
> +                     * */
> +                    i16 iCol = (int) part->fieldno;
> +                    /* Name of indexed column. */
> +                    char *zIdxCol;
> 
>                       if (iCol < 0)
>                           break;    /* No foreign keys against 
> expression indexes */
> @@ -303,9 +317,7 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>                       def_coll = sql_column_collation(pParent->def,
>                                       iCol,
>                                       &id);
> -                    struct coll *coll =
> -                        sql_index_collation(pIdx, i,
> -                                    &id);
> +                    struct coll *coll = part->coll;
>                       if (def_coll != coll)
>                           break;
> 
> @@ -332,7 +344,7 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>           }
>       }
> 
> -    if (!pIdx) {
> +    if (index == NULL) {
>           if (!pParse->disableTriggers) {
>               sqlite3ErrorMsg(pParse,
>                       "foreign key mismatch - \"%w\" referencing \"%w\"",
> @@ -342,7 +354,7 @@ sqlite3FkLocateIndex(Parse * pParse,    /* Parse 
> context to store any error in */
>           return 1;
>       }
> 
> -    *ppIdx = pIdx;
> +    *ppIdx = index;
>       return 0;
>   }
> 
> @@ -465,13 +477,15 @@ fkLookupParent(Parse * pParse,    /* Parse context */
>                   for (i = 0; i < nCol; i++) {
>                       int iChild = aiCol[i] + 1 + regData;
>                       int iParent =
> -                        pIdx->aiColumn[i] + 1 + regData;
> -                    assert(pIdx->aiColumn[i] >= 0);
> +                        (int) pIdx->def->key_def->parts[i].fieldno
> +                        + 1 + regData;
>                       assert(aiCol[i] != pTab->iPKey);
> -                    if (pIdx->aiColumn[i] == pTab->iPKey) {
> +                    if ((int)pIdx->def->key_def->
> +                        parts[i].fieldno == pTab->iPKey) {
>                           /* The parent key is a composite key that 
> includes the IPK column */
>                           iParent = regData;
>                       }
> +
>                       sqlite3VdbeAddOp3(v, OP_Ne, iChild,
>                                 iJump, iParent);
>                       VdbeCoverage(v);
> @@ -615,7 +629,6 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>       )
>   {
>       sqlite3 *db = pParse->db;    /* Database handle */
> -    int i;            /* Iterator variable */
>       Expr *pWhere = 0;    /* WHERE clause to scan with */
>       NameContext sNameContext;    /* Context used to resolve WHERE 
> clause */
>       WhereInfo *pWInfo;    /* Context used by sqlite3WhereXXX() */
> @@ -623,7 +636,7 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>       Vdbe *v = sqlite3GetVdbe(pParse);
> 
>       assert(pIdx == 0 || pIdx->pTable == pTab);
> -    assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
> +    assert(pIdx == 0 || (int) pIdx->def->key_def->part_count == 
> pFKey->nCol);
>       assert(pIdx != 0);
> 
>       if (nIncr < 0) {
> @@ -640,19 +653,20 @@ fkScanChildren(Parse * pParse,    /* Parse context */
>        * the parent key columns. The affinity of the parent key column 
> should
>        * be applied to each child key value before the comparison takes 
> place.
>        */
> -    for (i = 0; i < pFKey->nCol; i++) {
> +    for (int i = 0; i < pFKey->nCol; i++) {
>           Expr *pLeft;    /* Value from parent table row */
>           Expr *pRight;    /* Column ref to child table */
>           Expr *pEq;    /* Expression (pLeft = pRight) */
>           i16 iCol;    /* Index of column in child table */
> -        const char *zCol;    /* Name of column in child table */
> +        const char *column_name;
> 
> -        iCol = pIdx ? pIdx->aiColumn[i] : -1;
> +        iCol = pIdx != NULL ?
> +               (int) pIdx->def->key_def->parts[i].fieldno : -1;
>           pLeft = exprTableRegister(pParse, pTab, regData, iCol);
>           iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
>           assert(iCol >= 0);
> -        zCol = pFKey->pFrom->def->fields[iCol].name;
> -        pRight = sqlite3Expr(db, TK_ID, zCol);
> +        column_name = pFKey->pFrom->def->fields[iCol].name;
> +        pRight = sqlite3Expr(db, TK_ID, column_name);
>           pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
>           pWhere = sqlite3ExprAnd(db, pWhere, pEq);
>       }
> @@ -671,15 +685,16 @@ fkScanChildren(Parse * pParse,    /* Parse context */
> 
>           Expr *pEq, *pAll = 0;
>           Index *pPk = sqlite3PrimaryKeyIndex(pTab);
> -        assert(pIdx != 0);
> -        int col_count = index_column_count(pPk);
> -        for (i = 0; i < col_count; i++) {
> -            i16 iCol = pIdx->aiColumn[i];
> -            assert(iCol >= 0);
> -            pLeft = exprTableRegister(pParse, pTab, regData, iCol);
> +        assert(pIdx != NULL);
> +        uint32_t part_count = pPk->def->key_def->part_count;
> +        for (uint32_t i = 0; i < part_count; i++) {
> +            uint32_t fieldno =
> +                (int) pIdx->def->key_def->parts[i].fieldno;
> +            pLeft = exprTableRegister(pParse, pTab, regData,
> +                          fieldno);
>               pRight =
>                   exprTableColumn(db, pTab->def,
> -                        pSrc->a[0].iCursor, iCol);
> +                        pSrc->a[0].iCursor, fieldno);
>               pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
>               pAll = sqlite3ExprAnd(db, pAll, pEq);
>           }
> @@ -982,7 +997,6 @@ sqlite3FkCheck(Parse * pParse,    /* Parse context */
>               if (aiCol[i] == pTab->iPKey) {
>                   aiCol[i] = -1;
>               }
> -            assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
>           }
> 
>           pParse->nTab++;
> @@ -1107,19 +1121,19 @@ sqlite3FkOldmask(Parse * pParse,    /* Parse 
> context */
> 
>       if (user_session->sql_flags & SQLITE_ForeignKeys) {
>           FKey *p;
> -        int i;
>           for (p = pTab->pFKey; p; p = p->pNextFrom) {
> -            for (i = 0; i < p->nCol; i++)
> +            for (int i = 0; i < p->nCol; i++)
>                   mask |= COLUMN_MASK(p->aCol[i].iFrom);
>           }
>           for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
>               Index *pIdx = 0;
>               sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
> -            if (pIdx) {
> -                int nIdxCol = index_column_count(pIdx);
> -                for (i = 0; i < nIdxCol; i++) {
> -                    assert(pIdx->aiColumn[i] >= 0);
> -                    mask |= COLUMN_MASK(pIdx->aiColumn[i]);
> +            if (pIdx != NULL) {
> +                uint32_t part_count =
> +                    pIdx->def->key_def->part_count;
> +                for (uint32_t i = 0; i < part_count; i++) {
> +                    mask |= COLUMN_MASK(pIdx->def->
> +                        key_def->parts[i].fieldno);
>                   }
>               }
>           }
> @@ -1254,11 +1268,12 @@ fkActionTrigger(Parse * pParse,    /* Parse 
> context */
>                      || (pTab->iPKey >= 0
>                      && pTab->iPKey <
>                         (int)pTab->def->field_count));
> -            assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
> +
> +            uint32_t fieldno = pIdx != NULL ?
> + pIdx->def->key_def->parts[i].fieldno :
> +                       (uint32_t)pTab->iPKey;
>               sqlite3TokenInit(&tToCol,
> -                     pTab->def->fields[pIdx ? pIdx->
> -                            aiColumn[i] : pTab->iPKey].
> -                     name);
> +                     pTab->def->fields[fieldno].name);
>               sqlite3TokenInit(&tFromCol,
>                        pFKey->pFrom->def->fields[
>                           iFromCol].name);
> diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
> index a43f7b596..d3e7cfb93 100644
> --- a/src/box/sql/insert.c
> +++ b/src/box/sql/insert.c
> @@ -90,14 +90,14 @@ sqlite3IndexAffinityStr(sqlite3 *db, Index *index)
>        * sqliteDeleteIndex() when the Index structure itself is
>        * cleaned up.
>        */
> -    int column_count = index_column_count(index);
> +    int column_count = index->def->key_def->part_count;
>       index->zColAff = (char *) sqlite3DbMallocRaw(0, column_count + 1);
>       if (index->zColAff == NULL) {
>           sqlite3OomFault(db);
>           return NULL;
>       }
>       for (int n = 0; n < column_count; n++) {
> -        uint16_t x = index->aiColumn[n];
> +        uint16_t x = index->def->key_def->parts[n].fieldno;
>           index->zColAff[n] = index->pTable->def->fields[x].affinity;
>       }
>       index->zColAff[column_count] = 0;
> @@ -647,7 +647,7 @@ sqlite3Insert(Parse * pParse,    /* Parser context */
>                pIdx = pIdx->pNext, i++) {
>               assert(pIdx);
>               aRegIdx[i] = ++pParse->nMem;
> -            pParse->nMem += index_column_count(pIdx);
> +            pParse->nMem += pIdx->def->key_def->part_count;
>           }
>       }
> 
> @@ -1069,12 +1069,8 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>       Index *pIdx;        /* Pointer to one of the indices */
>       Index *pPk = 0;        /* The PRIMARY KEY index */
>       sqlite3 *db;        /* Database connection */
> -    int i;            /* loop counter */
> -    int ix;            /* Index loop counter */
> -    int nCol;        /* Number of columns */
>       int addr1;        /* Address of jump instruction */
>       int seenReplace = 0;    /* True if REPLACE is used to resolve INT 
> PK conflict */
> -    int nPkField;        /* Number of fields in PRIMARY KEY. */
>       u8 isUpdate;        /* True if this is an UPDATE operation */
>       u8 bAffinityDone = 0;    /* True if the OP_Affinity operation has 
> been run */
>       struct session *user_session = current_session();
> @@ -1086,10 +1082,8 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>       struct space_def *def = pTab->def;
>       /* This table is not a VIEW */
>       assert(!def->opts.is_view);
> -    nCol = def->field_count;
> 
>       pPk = sqlite3PrimaryKeyIndex(pTab);
> -    nPkField = index_column_count(pPk);
> 
>       /* Record that this module has started */
>       VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
> @@ -1099,8 +1093,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
>       enum on_conflict_action on_error;
>       /* Test all NOT NULL constraints.
>        */
> -    for (i = 0; i < nCol; i++) {
> -        if (i == pTab->iPKey) {
> +    for (uint32_t i = 0; i < def->field_count; i++) {
> +        if ((int) i == pTab->iPKey) {
>               continue;
>           }
>           if (aiChng && aiChng[i] < 0) {
> @@ -1109,7 +1103,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
>           }
>           if (def->fields[i].is_nullable ||
>               (pTab->tabFlags & TF_Autoincrement &&
> -             pTab->iAutoIncPKey == i))
> +             pTab->iAutoIncPKey == (int) i))
>               continue;    /* This column is allowed to be NULL */
> 
>           on_error = table_column_nullable_action(pTab, i);
> @@ -1179,7 +1173,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
>           else
>               on_error = ON_CONFLICT_ACTION_ABORT;
> 
> -        for (i = 0; i < checks->nExpr; i++) {
> +        for (int i = 0; i < checks->nExpr; i++) {
>               int allOk;
>               Expr *pExpr = checks->a[i].pExpr;
>               if (aiChng
> @@ -1206,13 +1200,16 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>           }
>       }
> 
> -    /* Test all UNIQUE constraints by creating entries for each UNIQUE
> -     * index and making sure that duplicate entries do not already exist.
> -     * Compute the revised record entries for indices as we go.
> +    /*
> +     * Test all UNIQUE constraints by creating entries for
> +     * each UNIQUE index and making sure that duplicate entries
> +     * do not already exist. Compute the revised record entries
> +     * for indices as we go.
>        *
>        * This loop also handles the case of the PRIMARY KEY index.
>        */
> -    for (ix = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, ix++) {
> +    pIdx = pTab->pIndex;
> +    for (int ix = 0; pIdx; pIdx = pIdx->pNext, ix++) {
>           int regIdx;    /* Range of registers hold conent for pIdx */
>           int regR;    /* Range of registers holding conflicting PK */
>           int iThisCur;    /* Cursor for this UNIQUE index */
> @@ -1253,10 +1250,11 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>            * the insert or update.  Store that record in the aRegIdx[ix] 
> register
>            */
>           regIdx = aRegIdx[ix] + 1;
> -        int nIdxCol = (int) index_column_count(pIdx);
> +        uint32_t part_count = (int) pIdx->def->key_def->part_count;
>           if (uniqueByteCodeNeeded) {
> -            for (i = 0; i < nIdxCol; ++i) {
> -                int fieldno = pIdx->aiColumn[i];
> +            for (uint32_t i = 0; i < part_count; ++i) {
> +                uint32_t fieldno =
> + pIdx->def->key_def->parts[i].fieldno;
>                   int reg;
>                   /*
>                    * OP_SCopy copies value in
> @@ -1267,11 +1265,10 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>                    * needed for proper UNIQUE
>                    * constraint handling.
>                    */
> -                if (fieldno == pTab->iPKey)
> +                if ((int) fieldno == pTab->iPKey)
>                       reg = regNewData;
>                   else
>                       reg = fieldno + regNewData + 1;
> -                assert(fieldno >= 0);
>                   sqlite3VdbeAddOp2(v, OP_SCopy, reg, regIdx + i);
>                   VdbeComment((v, "%s",
>                           def->fields[fieldno].name));
> @@ -1283,9 +1280,13 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>           if (IsPrimaryKeyIndex(pIdx)) {
>               /* If PK is marked as INTEGER, use it as strict type,
>                * not as affinity. Emit code for type checking */
> -            if (nIdxCol == 1) {
> -                reg_pk = regNewData + 1 + pIdx->aiColumn[0];
> -                if (pTab->zColAff[pIdx->aiColumn[0]] ==
> +            if (part_count == 1) {
> +                reg_pk = regNewData + 1 +
> + pIdx->def->key_def->parts[0].fieldno;
> +
> +                int fieldno = (int) pIdx->def->key_def->
> +                    parts[0].fieldno;
> +                if (pTab->zColAff[fieldno] ==
>                       AFFINITY_INTEGER) {
>                       int skip_if_null = sqlite3VdbeMakeLabel(v);
>                       if ((pTab->tabFlags & TF_Autoincrement) != 0) {
> @@ -1303,7 +1304,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
> 
>               sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
>                         def->field_count, aRegIdx[ix]);
> -            VdbeComment((v, "for %s", pIdx->zName));
> +            VdbeComment((v, "for %s", pIdx->def->name));
>           }
> 
>           /* In an UPDATE operation, if this index is the PRIMARY KEY
> @@ -1391,24 +1392,23 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>           if (uniqueByteCodeNeeded) {
>               sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
>                            addrUniqueOk, regIdx,
> -                         index_column_count(pIdx));
> + pIdx->def->key_def->part_count);
>           }
>           VdbeCoverage(v);
> 
> +        const uint32_t pk_part_count = pPk->def->key_def->part_count;
>           /* Generate code to handle collisions */
>           regR =
>               (pIdx == pPk) ? regIdx : sqlite3GetTempRange(pParse,
> -                                 nPkField);
> +                                 pk_part_count);
>           if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
>               int x;
> -            int nPkCol = index_column_count(pPk);
>               /* Extract the PRIMARY KEY from the end of the index entry and
>                * store it in registers regR..regR+nPk-1
>                */
>               if (pIdx != pPk) {
> -                for (i = 0; i < nPkCol; i++) {
> -                    assert(pPk->aiColumn[i] >= 0);
> -                    x = pPk->aiColumn[i];
> +                for (uint32_t i = 0; i < pk_part_count; i++) {
> +                    x = pPk->def->key_def->parts[i].fieldno;
>                       sqlite3VdbeAddOp3(v, OP_Column,
>                                 iThisCur, x, regR + i);
>                       VdbeComment((v, "%s.%s", def->name,
> @@ -1424,17 +1424,18 @@ sqlite3GenerateConstraintChecks(Parse * 
> pParse,        /* The parser context */
>                    * KEY values of this row before the update.
>                    */
>                   int addrJump =
> -                    sqlite3VdbeCurrentAddr(v) + nPkCol;
> +                    sqlite3VdbeCurrentAddr(v) +
> +                    pk_part_count;
>                   int op = OP_Ne;
>                   int regCmp = (IsPrimaryKeyIndex(pIdx) ?
>                             regIdx : regR);
> 
> -                for (i = 0; i < nPkCol; i++) {
> -                    uint32_t id;
> -                    char *p4 = (char *)sql_index_collation(pPk, i, &id);
> -                    x = pPk->aiColumn[i];
> -                    assert(x >= 0);
> -                    if (i == (nPkCol - 1)) {
> +                for (uint32_t i = 0; i < pk_part_count; i++) {
> +                    char *p4 = (char *) pPk->def->key_def->parts[i].coll;
> +                    x = pPk->def->key_def->parts[i].fieldno;
> +                    if (pPk->tnum==0)
> +                        x = -1;
> +                    if (i == (pk_part_count - 1)) {
>                           addrJump = addrUniqueOk;
>                           op = OP_Eq;
>                       }
> @@ -1482,7 +1483,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
>                                   NULL, NULL);
>               }
>               sql_generate_row_delete(pParse, pTab, pTrigger,
> -                        iDataCur, regR, nPkField, false,
> +                        iDataCur, regR, pk_part_count,
> +                        false,
>                           ON_CONFLICT_ACTION_REPLACE,
>                           pIdx == pPk ? ONEPASS_SINGLE :
>                           ONEPASS_OFF, -1);
> @@ -1492,7 +1494,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,    
>      /* The parser context */
>           }
>           sqlite3VdbeResolveLabel(v, addrUniqueOk);
>           if (regR != regIdx)
> -            sqlite3ReleaseTempRange(pParse, regR, nPkField);
> +            sqlite3ReleaseTempRange(pParse, regR, pk_part_count);
>       }
> 
>       *pbMayReplace = seenReplace;
> @@ -1610,8 +1612,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,    /* 
> Parsing context */
>               IsPrimaryKeyIndex(pIdx) ||        /* Condition 2 */
>               sqlite3FkReferences(pTab) ||    /* Condition 3 */
>               /* Condition 4 */
> -            (index_is_unique(pIdx) && pIdx->onError !=
> -             ON_CONFLICT_ACTION_DEFAULT &&
> +            (pIdx->def->opts.is_unique &&
> +             pIdx->onError != ON_CONFLICT_ACTION_DEFAULT &&
>                /* Condition 4.1 */
>                pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
>                /* Condition 4.2 */
> @@ -1629,7 +1631,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,    /* 
> Parsing context */
>                             space_ptr_reg);
>                   sql_vdbe_set_p4_key_def(pParse, pIdx);
>                   sqlite3VdbeChangeP5(v, p5);
> -                VdbeComment((v, "%s", pIdx->zName));
> +                VdbeComment((v, "%s", pIdx->def->name));
>               }
>           }
>       }
> @@ -1663,30 +1665,24 @@ int sqlite3_xferopt_count;
>   static int
>   xferCompatibleIndex(Index * pDest, Index * pSrc)
>   {
> -    uint32_t i;
>       assert(pDest && pSrc);
>       assert(pDest->pTable != pSrc->pTable);
> -    uint32_t nDestCol = index_column_count(pDest);
> -    uint32_t nSrcCol = index_column_count(pSrc);
> -    if (nDestCol != nSrcCol) {
> -        return 0;    /* Different number of columns */
> -    }
> +    uint32_t dest_idx_part_count = pDest->def->key_def->part_count;
> +    uint32_t src_idx_part_count = pSrc->def->key_def->part_count;
> +    if (dest_idx_part_count != src_idx_part_count)
> +        return 0;
>       if (pDest->onError != pSrc->onError) {
>           return 0;    /* Different conflict resolution strategies */
>       }
> -    for (i = 0; i < nSrcCol; i++) {
> -        if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
> +    struct key_part *src_part = pSrc->def->key_def->parts;
> +    struct key_part *dest_part = pDest->def->key_def->parts;
> +    for (uint32_t i = 0; i < src_idx_part_count; i++, src_part++, 
> dest_part++) {
> +        if (src_part->fieldno != dest_part->fieldno)
>               return 0;    /* Different columns indexed */
> -        }
> -        if (sql_index_column_sort_order(pSrc, i) !=
> -            sql_index_column_sort_order(pDest, i)) {
> +        if (src_part->sort_order != dest_part->sort_order)
>               return 0;    /* Different sort orders */
> -        }
> -        uint32_t id;
> -        if (sql_index_collation(pSrc, i, &id) !=
> -            sql_index_collation(pDest, i, &id)) {
> +        if (src_part->coll != dest_part->coll)
>               return 0;    /* Different collating sequences */
> -        }
>       }
>       if (sqlite3ExprCompare(pSrc->pPartIdxWhere, pDest->pPartIdxWhere, 
> -1)) {
>           return 0;    /* Different WHERE clauses */
> @@ -1858,16 +1854,15 @@ xferOptimization(Parse * pParse,    /* Parser 
> context */
>           }
>       }
>       for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
> -        if (index_is_unique(pDestIdx)) {
> +        if (pDestIdx->def->opts.is_unique)
>               destHasUniqueIdx = 1;
> -        }
>           for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) {
>               if (xferCompatibleIndex(pDestIdx, pSrcIdx))
>                   break;
>           }
> -        if (pSrcIdx == 0) {
> -            return 0;    /* pDestIdx has no corresponding index in pSrc */
> -        }
> +        /* pDestIdx has no corresponding index in pSrc. */
> +        if (pSrcIdx == NULL)
> +            return 0;
>       }
>       /* Get server checks. */
>       ExprList *pCheck_src = space_checks_expr_list(
> @@ -1943,11 +1938,11 @@ xferOptimization(Parse * pParse,    /* Parser 
> context */
>           struct space *src_space =
> space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
>           vdbe_emit_open_cursor(pParse, iSrc, pSrcIdx->tnum, src_space);
> -        VdbeComment((v, "%s", pSrcIdx->zName));
> +        VdbeComment((v, "%s", pSrcIdx->def->name));
>           struct space *dest_space =
> space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
>           vdbe_emit_open_cursor(pParse, iDest, pDestIdx->tnum, dest_space);
> -        VdbeComment((v, "%s", pDestIdx->zName));
> +        VdbeComment((v, "%s", pDestIdx->def->name));
>           addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
>           VdbeCoverage(v);
>           sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
> diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
> index 5fb29c75c..7067a5ab1 100644
> --- a/src/box/sql/pragma.c
> +++ b/src/box/sql/pragma.c
> @@ -370,7 +370,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First 
> part of [schema.]id field */
>                   k = 1;
>               } else {
>                   for (k = 1; k <= def->field_count &&
> -                     pk->aiColumn[k - 1] != (int) i; ++k) {
> +                     pk->def->key_def->parts[k - 1].fieldno
> +                     != i; ++k) {
>                   }
>               }
>               bool is_nullable = def->fields[i].is_nullable;
> @@ -414,7 +415,7 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First 
> part of [schema.]id field */
>                       size_t avg_tuple_size_idx =
>                           sql_index_tuple_size(space, idx);
>                       sqlite3VdbeMultiLoad(v, 2, "sii",
> -                                 pIdx->zName,
> +                                 pIdx->def->name,
>                                    avg_tuple_size_idx,
>                                    index_field_tuple_est(pIdx, 0));
>                       sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
> @@ -443,11 +444,13 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /* 
> First part of [schema.]id field */
>                            */
>                           pParse->nMem = 3;
>                       }
> -                    mx = index_column_count(pIdx);
> +                    mx = pIdx->def->key_def->part_count;
>                       assert(pParse->nMem <=
>                              pPragma->nPragCName);
> -                    for (i = 0; i < mx; i++) {
> -                        i16 cnum = pIdx->aiColumn[i];
> +                    struct key_part *part =
> +                        pIdx->def->key_def->parts;
> +                    for (i = 0; i < mx; i++, part++) {
> +                        i16 cnum = (int) part->fieldno;
>                           assert(pIdx->pTable);
>                           sqlite3VdbeMultiLoad(v, 1,
>                                        "iis", i,
> @@ -461,19 +464,18 @@ sqlite3Pragma(Parse * pParse, Token * pId,    /* 
> First part of [schema.]id field */
>                                        name);
>                           if (pPragma->iArg) {
>                               const char *c_n;
> -                            uint32_t id;
> +                            uint32_t id =
> +                                part->coll_id;
>                               struct coll *coll =
> -                                sql_index_collation(pIdx, i, &id);
> +                                part->coll;
>                               if (coll != NULL)
>                                   c_n = coll_by_id(id)->name;
>                               else
>                                   c_n = "BINARY";
> -                            enum sort_order sort_order;
> -                            sort_order = sql_index_column_sort_order(pIdx,
> -                                                 i);
>                               sqlite3VdbeMultiLoad(v,
>                                            4,
>                                            "isi",
> +                                         part->
>                                            sort_order,
>                                            c_n,
>                                            i <
> @@ -503,10 +505,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First 
> part of [schema.]id field */
>                               { "c", "u", "pk" };
>                           sqlite3VdbeMultiLoad(v, 1,
>                                        "isisi", i,
> -                                     pIdx->
> -                                     zName,
> -                                     index_is_unique
> -                                     (pIdx),
> +                                     pIdx->def->name,
> + pIdx->def->opts.is_unique,
>                                        azOrigin
>                                        [pIdx->
>                                         idxType],
> diff --git a/src/box/sql/select.c b/src/box/sql/select.c
> index 368bcd6f0..c7c186d9d 100644
> --- a/src/box/sql/select.c
> +++ b/src/box/sql/select.c
> @@ -4367,7 +4367,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct 
> SrcList_item *pFrom)
>           char *zIndexedBy = pFrom->u1.zIndexedBy;
>           Index *pIdx;
>           for (pIdx = pTab->pIndex;
> -             pIdx && strcmp(pIdx->zName, zIndexedBy);
> +             pIdx && strcmp(pIdx->def->name, zIndexedBy);
>                pIdx = pIdx->pNext) ;
>           if (!pIdx) {
>               sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
> diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
> index e7a02dc1d..fcedf4358 100644
> --- a/src/box/sql/sqliteInt.h
> +++ b/src/box/sql/sqliteInt.h
> @@ -2069,27 +2069,17 @@ struct UnpackedRecord {
>    * Each SQL index is represented in memory by an
>    * instance of the following structure.
>    *
> - * The columns of the table that are to be indexed are described
> - * by the aiColumn[] field of this structure.  For example, suppose
> - * we have the following table and index:
> - *
> - *     CREATE TABLE Ex1(c1 int, c2 int, c3 text);
> - *     CREATE INDEX Ex2 ON Ex1(c3,c1);
> - *
> - * In the Table structure describing Ex1, nCol==3 because there are
> - * three columns in the table.  In the Index structure describing
> - * Ex2, nColumn==2 since 2 of the 3 columns of Ex1 are indexed.
> - * The value of aiColumn is {2, 0}.  aiColumn[0]==2 because the
> - * first column to be indexed (c3) has an index of 2 in Ex1.aCol[].
> - * The second column to be indexed (c1) has an index of 0 in
> - * Ex1.aCol[], hence Ex2.aiColumn[1]==0.
> - *
> - * The Index.onError field determines whether or not the indexed columns
> - * must be unique and what to do if they are not.  When Index.onError=
> - * ON_CONFLICT_ACTION_NONE, it means this is not a unique index.
> - * Otherwise it is a unique index and the value of Index.onError indicate
> - * the which conflict resolution algorithm to employ whenever an attempt
> - * is made to insert a non-unique element.
> + * Indexes name, corresponding space_id, type (in tarantool
> + * sense - HASH, TREE, etc) are stored in index definition - in
> + * Index.def.
> + * SQL statement which created the index and 'is_unique' flag are
> + * stored in Index.def.opts. Information about index parts (part
> + * count, corresponding space fields' numbers, parts' collations
> + * and sort orders, etc) are stored in Index.def.key_def.parts
> + *
> + * Index.onError indicate the which conflict resolution algorithm
> + * to employ whenever an attempt is made to insert a non-unique
> + * element in unique index.
>    *
>    * While parsing a CREATE TABLE or CREATE INDEX statement in order to
>    * generate VDBE code (as opposed to reading from Tarantool's _space
> @@ -2100,26 +2090,18 @@ struct UnpackedRecord {
>    * program is executed). See convertToWithoutRowidTable() for details.
>    */
>   struct Index {
> -    char *zName;        /* Name of this index */
> -    i16 *aiColumn;        /* Which columns are used by this index.  1st 
> is 0 */
>       LogEst *aiRowLogEst;    /* From ANALYZE: Est. rows selected by 
> each column */
>       Table *pTable;        /* The SQL table being indexed */
>       char *zColAff;        /* String defining the affinity of each 
> column */
>       Index *pNext;        /* The next index associated with the same 
> table */
>       Schema *pSchema;    /* Schema containing this index */
> -    /** Sorting order for each column. */
> -    enum sort_order *sort_order;
> -    /** Array of collation sequences for index. */
> -    struct coll **coll_array;
> -    /** Array of collation identifiers. */
> -    uint32_t *coll_id_array;
>       Expr *pPartIdxWhere;    /* WHERE clause for partial indices */
>       int tnum;        /* DB Page containing root of this index */
> -    u16 nColumn;        /* Number of columns stored in the index */
>       u8 onError;        /* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
>                    * or _NONE
>                    */
>       unsigned idxType:2;    /* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE 
> INDEX */
> +    struct index_def *def;
>   };
> 
>   /**
> @@ -3541,34 +3523,6 @@ void sqlite3AddCollateType(Parse *, Token *);
>    */
>   struct coll *
>   sql_column_collation(struct space_def *def, uint32_t column, uint32_t 
> *coll_id);
> -/**
> - * Return name of given column collation from index.
> - *
> - * @param idx Index which is used to fetch column.
> - * @param column Number of column.
> - * @param[out] coll_id Collation identifier.
> - * @retval Pointer to collation.
> - */
> -struct coll *
> -sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
> -
> -/**
> - * Return key_def of provided struct Index.
> - * @param idx Pointer to `struct Index` object.
> - * @retval Pointer to `struct key_def`.
> - */
> -struct key_def*
> -sql_index_key_def(struct Index *idx);
> -
> -/**
> - * Return sort order of given column from index.
> - *
> - * @param idx Index which is used to fetch column.
> - * @param column Number of column.
> - * @retval Sort order of requested column.
> - */
> -enum sort_order
> -sql_index_column_sort_order(Index *idx, uint32_t column);
> 
>   void sqlite3EndTable(Parse *, Token *, Token *, Select *);
> 
> @@ -3655,9 +3609,7 @@ void sqlite3SrcListShiftJoinType(SrcList *);
>   void sqlite3SrcListAssignCursors(Parse *, SrcList *);
>   void sqlite3IdListDelete(sqlite3 *, IdList *);
>   void sqlite3SrcListDelete(sqlite3 *, SrcList *);
> -Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
> -bool
> -index_is_unique(Index *);
> +Index *sqlite3AllocateIndexObject(sqlite3 *, i16);
> 
>   /**
>    * Create a new index for an SQL table.  name is the name of the
> @@ -4373,8 +4325,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
>   int
>   sql_analysis_load(struct sqlite3 *db);
> 
> -uint32_t
> -index_column_count(const Index *);
>   bool
>   index_is_unique_not_null(const Index *);
>   void sqlite3RegisterLikeFunctions(sqlite3 *, int);
> diff --git a/src/box/sql/update.c b/src/box/sql/update.c
> index 10385eb78..d45672228 100644
> --- a/src/box/sql/update.c
> +++ b/src/box/sql/update.c
> @@ -238,17 +238,18 @@ sqlite3Update(Parse * pParse,        /* The parser 
> context */
>        */
>       for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
>           int reg;
> -        int nIdxCol = index_column_count(pIdx);
> +        uint32_t part_count = pIdx->def->key_def->part_count;
>           if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
>               reg = ++pParse->nMem;
> -            pParse->nMem += nIdxCol;
> +            pParse->nMem += part_count;
>           } else {
>               reg = 0;
> -            for (i = 0; i < nIdxCol; i++) {
> -                i16 iIdxCol = pIdx->aiColumn[i];
> -                if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
> +            for (uint32_t i = 0; i < part_count; i++) {
> +                uint32_t fieldno =
> + pIdx->def->key_def->parts[i].fieldno;
> +                if (aXRef[fieldno] >= 0) {
>                       reg = ++pParse->nMem;
> -                    pParse->nMem += nIdxCol;
> +                    pParse->nMem += part_count;
>                       break;
>                   }
>               }
> @@ -300,17 +301,18 @@ sqlite3Update(Parse * pParse,        /* The parser 
> context */
>        * In this case we have to manually load columns in order to make 
> tuple.
>        */
>       int iPk;    /* First of nPk memory cells holding PRIMARY KEY value */
> -    i16 nPk;    /* Number of components of the PRIMARY KEY */
> +    /* Number of components of the PRIMARY KEY.  */
> +    uint32_t pk_part_count;
>       int addrOpen;    /* Address of the OpenEphemeral instruction */
> 
>       if (is_view) {
> -        nPk = nKey;
> +        pk_part_count = nKey;
>       } else {
>           assert(pPk != 0);
> -        nPk = index_column_count(pPk);
> +        pk_part_count = pPk->def->key_def->part_count;
>       }
>       iPk = pParse->nMem + 1;
> -    pParse->nMem += nPk;
> +    pParse->nMem += pk_part_count;
>       regKey = ++pParse->nMem;
>       iEph = pParse->nTab++;
>       sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
> @@ -319,7 +321,8 @@ sqlite3Update(Parse * pParse,        /* The parser 
> context */
>           addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
>                            nKey);
>       } else {
> -        addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
> +        addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
> +                         pk_part_count);
>           sql_vdbe_set_p4_key_def(pParse, pPk);
>       }
> 
> @@ -329,27 +332,27 @@ sqlite3Update(Parse * pParse,        /* The parser 
> context */
>           goto update_cleanup;
>       okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
>       if (is_view) {
> -        for (i = 0; i < nPk; i++) {
> +        for (i = 0; i < (int) pk_part_count; i++) {
>               sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
>           }
>       } else {
> -        for (i = 0; i < nPk; i++) {
> -            assert(pPk->aiColumn[i] >= 0);
> +        for (i = 0; i < (int) pk_part_count; i++) {
>               sqlite3ExprCodeGetColumnOfTable(v, def, iDataCur,
> -                            pPk->aiColumn[i],
> +                            pPk->def->key_def->
> +                                parts[i].fieldno,
>                               iPk + i);
>           }
>       }
> 
>       if (okOnePass) {
>           sqlite3VdbeChangeToNoop(v, addrOpen);
> -        nKey = nPk;
> +        nKey = pk_part_count;
>           regKey = iPk;
>       } else {
>           const char *zAff = is_view ? 0 :
>                      sqlite3IndexAffinityStr(pParse->db, pPk);
> -        sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
> -                      zAff, nPk);
> +        sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, pk_part_count,
> +                  regKey, zAff, pk_part_count);
>           sqlite3VdbeAddOp2(v, OP_IdxInsert, iEph, regKey);
>           /* Set flag to save memory allocating one by malloc. */
>           sqlite3VdbeChangeP5(v, 1);
> diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
> index a29d0a3ae..9372ba360 100644
> --- a/src/box/sql/vdbeaux.c
> +++ b/src/box/sql/vdbeaux.c
> @@ -1159,7 +1159,7 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, 
> struct Index *idx)
>       struct Vdbe *v = parse->pVdbe;
>       assert(v != NULL);
>       assert(idx != NULL);
> -    struct key_def *def = key_def_dup(sql_index_key_def(idx));
> +    struct key_def *def = key_def_dup(idx->def->key_def);
>       if (def == NULL)
>           sqlite3OomFault(parse->db);
>       else
> diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
> index f408b7701..baf3df48b 100644
> --- a/src/box/sql/vdbemem.c
> +++ b/src/box/sql/vdbemem.c
> @@ -1087,15 +1087,15 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
>               Index *pIdx = p->pIdx;    /* Index being probed */
>               int nByte;    /* Bytes of space to allocate */
>               int i;    /* Counter variable */
> -            int nCol = index_column_count(pIdx);
> +            int part_count = pIdx->def->key_def->part_count;
> 
> -            nByte = sizeof(Mem) * nCol +
> +            nByte = sizeof(Mem) * part_count +
>                   ROUND8(sizeof(UnpackedRecord));
>               pRec =
>                   (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
>               if (pRec == NULL)
>                   return NULL;
> -            pRec->key_def = key_def_dup(sql_index_key_def(pIdx));
> +            pRec->key_def = key_def_dup(pIdx->def->key_def);
>               if (pRec->key_def == NULL) {
>                   sqlite3DbFree(db, pRec);
>                   sqlite3OomFault(db);
> @@ -1103,7 +1103,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
>               }
>               pRec->aMem = (Mem *)((char *) pRec +
>                            ROUND8(sizeof(UnpackedRecord)));
> -            for (i = 0; i < nCol; i++) {
> +            for (i = 0; i < (int) part_count; i++) {
>                   pRec->aMem[i].flags = MEM_Null;
>                   pRec->aMem[i].db = db;
>               }
> @@ -1650,10 +1650,10 @@ sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
>   {
>       if (pRec) {
>           int i;
> -        int nCol = pRec->key_def->part_count;
> +        int part_count = pRec->key_def->part_count;
>           Mem *aMem = pRec->aMem;
>           sqlite3 *db = aMem[0].db;
> -        for (i = 0; i < nCol; i++) {
> +        for (i = 0; i < part_count; i++) {
>               sqlite3VdbeMemRelease(&aMem[i]);
>           }
>           sqlite3DbFree(db, pRec);
> diff --git a/src/box/sql/where.c b/src/box/sql/where.c
> index c0c26ce29..599863041 100644
> --- a/src/box/sql/where.c
> +++ b/src/box/sql/where.c
> @@ -372,13 +372,19 @@ whereScanInit(WhereScan * pScan,    /* The 
> WhereScan object being initialized */
>       pScan->is_column_seen = false;
>       if (pIdx) {
>           int j = iColumn;
> -        iColumn = pIdx->aiColumn[j];
> +        iColumn = pIdx->def->key_def->parts[j].fieldno;
> +        /*
> +         * pIdx->tnum == 0 means that pIdx is a fake
> +         * integer primary key index
> +         */
> +        if (pIdx->tnum == 0)
> +            iColumn = -1;
> +
>           if (iColumn >= 0) {
>               char affinity =
> pIdx->pTable->def->fields[iColumn].affinity;
>               pScan->idxaff = affinity;
> -            uint32_t id;
> -            pScan->coll = sql_index_collation(pIdx, j, &id);
> +            pScan->coll = pIdx->def->key_def->parts[j].coll;
>               pScan->is_column_seen = true;
>           }
>       }
> @@ -541,47 +547,24 @@ findIndexCol(Parse * pParse,    /* Parse context */
>            Index * pIdx,    /* Index to match column of */
>            int iCol)        /* Column of index to match */
>   {
> +    struct key_part *part_to_match = &pIdx->def->key_def->parts[iCol];
>       for (int i = 0; i < pList->nExpr; i++) {
>           Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
> -        if (p->op == TK_COLUMN &&
> -            p->iColumn == pIdx->aiColumn[iCol] &&
> -            p->iTable == iBase) {
> +        if (p->op == TK_COLUMN && p->iTable == iBase &&
> +            p->iColumn == (int) part_to_match->fieldno) {
>               bool is_found;
>               uint32_t id;
>               struct coll *coll = sql_expr_coll(pParse,
>                                 pList->a[i].pExpr,
>                                 &is_found, &id);
> -            if (is_found &&
> -                coll == sql_index_collation(pIdx, iCol, &id)) {
> +            if (is_found && coll == part_to_match->coll)
>                   return i;
> -            }
>           }
>       }
> 
>       return -1;
>   }
> 
> -/*
> - * Return TRUE if the iCol-th column of index pIdx is NOT NULL
> - */
> -static int
> -indexColumnNotNull(Index * pIdx, int iCol)
> -{
> -    int j;
> -    assert(pIdx != 0);
> -    assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
> -    j = pIdx->aiColumn[iCol];
> -    if (j >= 0) {
> -        return !pIdx->pTable->def->fields[j].is_nullable;
> -    } else if (j == (-1)) {
> -        return 1;
> -    } else {
> -        assert(j == (-2));
> -        return 0;    /* Assume an indexed expression can always yield a 
> NULL */
> -
> -    }
> -}
> -
>   /*
>    * Return true if the DISTINCT expression-list passed as the third 
> argument
>    * is redundant.
> @@ -633,9 +616,9 @@ isDistinctRedundant(Parse * pParse,        /* 
> Parsing context */
>        *      contain a "col=X" term are subject to a NOT NULL constraint.
>        */
>       for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
> -        if (!index_is_unique(pIdx))
> +        if (!pIdx->def->opts.is_unique)
>               continue;
> -        int col_count = index_column_count(pIdx);
> +        int col_count = pIdx->def->key_def->part_count;
>           for (i = 0; i < col_count; i++) {
>               if (0 ==
>                   sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
> @@ -643,11 +626,12 @@ isDistinctRedundant(Parse * pParse, /* Parsing 
> context */
>                   if (findIndexCol
>                       (pParse, pDistinct, iBase, pIdx, i) < 0)
>                       break;
> -                if (indexColumnNotNull(pIdx, i) == 0)
> +                uint32_t j = pIdx->def->key_def->parts[i].fieldno;
> +                if (pIdx->pTable->def->fields[j].is_nullable)
>                       break;
>               }
>           }
> -        if (i == (int)index_column_count(pIdx)) {
> +        if (i == (int) pIdx->def->key_def->part_count) {
>               /* This index implies that the DISTINCT qualifier is 
> redundant. */
>               return 1;
>           }
> @@ -1184,7 +1168,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
>   char
>   sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
>   {
> -    assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
> +    assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
>       if (!pIdx->zColAff) {
>           if (sqlite3IndexAffinityStr(db, pIdx) == 0)
>               return AFFINITY_BLOB;
> @@ -1246,13 +1230,12 @@ whereRangeSkipScanEst(Parse * pParse,     /* 
> Parsing & code generating context */
>       int nUpper = index->def->opts.stat->sample_count + 1;
>       int rc = SQLITE_OK;
>       u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
> -    uint32_t id;
> 
>       sqlite3_value *p1 = 0;    /* Value extracted from pLower */
>       sqlite3_value *p2 = 0;    /* Value extracted from pUpper */
>       sqlite3_value *pVal = 0;    /* Value extracted from record */
> 
> -    struct coll *pColl = sql_index_collation(p, nEq, &id);
> +    struct coll *coll = p->def->key_def->parts[nEq].coll;
>       if (pLower) {
>           rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
>                              aff, &p1);
> @@ -1273,12 +1256,12 @@ whereRangeSkipScanEst(Parse * pParse,     /* 
> Parsing & code generating context */
>               rc = sqlite3Stat4Column(db, samples[i].sample_key,
>                           samples[i].key_size, nEq, &pVal);
>               if (rc == SQLITE_OK && p1) {
> -                int res = sqlite3MemCompare(p1, pVal, pColl);
> +                int res = sqlite3MemCompare(p1, pVal, coll);
>                   if (res >= 0)
>                       nLower++;
>               }
>               if (rc == SQLITE_OK && p2) {
> -                int res = sqlite3MemCompare(p2, pVal, pColl);
> +                int res = sqlite3MemCompare(p2, pVal, coll);
>                   if (res >= 0)
>                       nUpper++;
>               }
> @@ -1448,7 +1431,7 @@ whereRangeScanEst(Parse * pParse,    /* Parsing & 
> code generating context */
>                      || (pLower->eOperator & (WO_GT | WO_GE)) != 0);
>               assert(pUpper == 0
>                      || (pUpper->eOperator & (WO_LT | WO_LE)) != 0);
> -            if (sql_index_column_sort_order(p, nEq) !=
> +            if (p->def->key_def->parts[nEq].sort_order !=
>                   SORT_ORDER_ASC) {
>                   /* The roles of pLower and pUpper are swapped for a 
> DESC index */
>                   SWAP(pLower, pUpper);
> @@ -1598,7 +1581,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing & 
> code generating context */
>       int bOk;
> 
>       assert(nEq >= 1);
> -    assert(nEq <= (int)index_column_count(p));
> +    assert(nEq <= (int) p->def->key_def->part_count);
>       assert(pBuilder->nRecValid < nEq);
> 
>       /* If values are not available for all fields of the index to the left
> @@ -1619,7 +1602,7 @@ whereEqualScanEst(Parse * pParse,    /* Parsing & 
> code generating context */
> 
>       whereKeyStats(pParse, p, pRec, 0, a);
>       WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
> -              p->zName, nEq - 1, (int)a[1]));
> +              p->def->name, nEq - 1, (int)a[1]));
>       *pnRow = a[1];
> 
>       return rc;
> @@ -1751,7 +1734,7 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
>                  pItem->zAlias ? pItem->zAlias : pTab->def->name);
>   #endif
>       const char *zName;
> -    if (p->pIndex && (zName = p->pIndex->zName) != 0) {
> +    if (p->pIndex && (zName = p->pIndex->def->name) != 0) {
>           if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
>               int i = sqlite3Strlen30(zName) - 1;
>               while (zName[i] != '_')
> @@ -2314,7 +2297,7 @@ whereRangeVectorLen(Parse * pParse,    /* Parsing 
> context */
>       int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
>       int i;
> 
> -    nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
> +    nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
>       for (i = 1; i < nCmp; i++) {
>           /* Test if comparison i of pTerm is compatible with column (i+nEq)
>            * of the index. If not, exit the loop.
> @@ -2335,13 +2318,11 @@ whereRangeVectorLen(Parse * pParse,    /* 
> Parsing context */
>            * order of the index column is the same as the sort order of the
>            * leftmost index column.
>            */
> -        if (pLhs->op != TK_COLUMN
> -            || pLhs->iTable != iCur
> -            || pLhs->iColumn != pIdx->aiColumn[i + nEq]
> -            || sql_index_column_sort_order(pIdx, i + nEq) !=
> -               sql_index_column_sort_order(pIdx, nEq)) {
> +        if (pLhs->op != TK_COLUMN || pLhs->iTable != iCur
> +            || pLhs->iColumn != (int)pIdx->def->key_def->parts[i + 
> nEq].fieldno
> +            || pIdx->def->key_def->parts[i + nEq].sort_order !=
> + pIdx->def->key_def->parts[nEq].sort_order)
>               break;
> -        }
> 
>           aff = sqlite3CompareAffinity(pRhs, sqlite3ExprAffinity(pLhs));
>           idxaff =
> @@ -2353,7 +2334,7 @@ whereRangeVectorLen(Parse * pParse,    /* Parsing 
> context */
>           pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
>           if (pColl == 0)
>               break;
> -            if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
> +        if (pIdx->def->key_def->parts[(i + nEq)].coll != pColl)
>               break;
>       }
>       return i;
> @@ -2396,13 +2377,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>       LogEst rSize;        /* Number of rows in the table */
>       LogEst rLogSize;    /* Logarithm of table size */
>       WhereTerm *pTop = 0, *pBtm = 0;    /* Top and bottom range 
> constraints */
> -    uint32_t nProbeCol = index_column_count(pProbe);
> +    uint32_t probe_part_count = pProbe->def->key_def->part_count;
> 
>       pNew = pBuilder->pNew;
>       if (db->mallocFailed)
>           return SQLITE_NOMEM_BKPT;
>       WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
> -               pProbe->zName, pNew->nEq));
> +               pProbe->def->name, pNew->nEq));
> 
>       assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
>       if (pNew->wsFlags & WHERE_BTM_LIMIT) {
> @@ -2431,7 +2412,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>           stat = &surrogate_stat;
>       if (stat->is_unordered)
>           opMask &= ~(WO_GT | WO_GE | WO_LT | WO_LE);
> -    assert(pNew->nEq < nProbeCol);
> +    assert(pNew->nEq < probe_part_count);
> 
>       saved_nEq = pNew->nEq;
>       saved_nBtm = pNew->nBtm;
> @@ -2452,8 +2433,9 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>           LogEst nOutUnadjusted;    /* nOut before IN() and WHERE 
> adjustments */
>           int nIn = 0;
>           int nRecValid = pBuilder->nRecValid;
> +        uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
>           if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
> -            && indexColumnNotNull(pProbe, saved_nEq)
> +            && !pProbe->pTable->def->fields[j].is_nullable
>               ) {
>               continue;    /* ignore IS [NOT] NULL constraints on NOT 
> NULL columns */
>           }
> @@ -2523,14 +2505,16 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>                                */
>               }
>           } else if (eOp & WO_EQ) {
> -            int iCol = pProbe->aiColumn[saved_nEq];
> +            int iCol = pProbe->def->key_def->parts[saved_nEq].fieldno;
>               pNew->wsFlags |= WHERE_COLUMN_EQ;
>               assert(saved_nEq == pNew->nEq);
> -            if ((iCol > 0 && nInMul == 0
> -                && saved_nEq == nProbeCol - 1)
> -                ) {
> -                if (iCol >= 0 &&
> -                    !index_is_unique_not_null(pProbe)) {
> +            if ((iCol > 0 && nInMul == 0 &&
> +                 saved_nEq == probe_part_count - 1)) {
> +                bool index_is_unique_not_null =
> +                    pProbe->def->key_def->is_nullable &&
> +                    pProbe->def->opts.is_unique;
> +                if (pProbe->tnum != 0 &&
> +                    !index_is_unique_not_null) {
>                       pNew->wsFlags |= WHERE_UNQ_WANTED;
>                   } else {
>                       pNew->wsFlags |= WHERE_ONEROW;
> @@ -2592,8 +2576,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>               assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
> 
>               assert(pNew->nOut == saved_nOut);
> -            if (pTerm->truthProb <= 0
> -                && pProbe->aiColumn[saved_nEq] >= 0) {
> +            if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
>                   assert((eOp & WO_IN) || nIn == 0);
>                   testcase(eOp & WO_IN);
>                   pNew->nOut += pTerm->truthProb;
> @@ -2696,7 +2679,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>           }
> 
>           if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0
> -            && pNew->nEq < nProbeCol) {
> +            && pNew->nEq < probe_part_count) {
>               whereLoopAddBtreeIndex(pBuilder, pSrc, pProbe,
>                              nInMul + nIn);
>           }
> @@ -2724,7 +2707,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>        * more expensive.
>        */
>       assert(42 == sqlite3LogEst(18));
> -    if (saved_nEq == saved_nSkip && saved_nEq + 1U < nProbeCol &&
> +    if (saved_nEq == saved_nSkip && saved_nEq + 1U < probe_part_count &&
>           stat->skip_scan_enabled == true &&
>           /* TUNING: Minimum for skip-scan */
>           index_field_tuple_est(pProbe, saved_nEq + 1) >= 42 &&
> @@ -2749,7 +2732,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * 
> pBuilder,    /* The WhereLoop factory */
>       }
> 
>       WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
> -               pProbe->zName, saved_nEq, rc));
> +               pProbe->def->name, saved_nEq, rc));
>       return rc;
>   }
> 
> @@ -2792,7 +2775,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
>   {
>       ExprList *pOB;
>       int ii, jj;
> -    int nIdxCol = index_column_count(pIndex);
> +    int part_count = pIndex->def->key_def->part_count;
>       if (index_is_unordered(pIndex))
>           return 0;
>       if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
> @@ -2802,8 +2785,9 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
>           if (pExpr->op == TK_COLUMN && pExpr->iTable == iCursor) {
>               if (pExpr->iColumn < 0)
>                   return 1;
> -            for (jj = 0; jj < nIdxCol; jj++) {
> -                if (pExpr->iColumn == pIndex->aiColumn[jj])
> +            for (jj = 0; jj < part_count; jj++) {
> +                if (pExpr->iColumn == (int)
> + pIndex->def->key_def->parts[jj].fieldno)
>                       return 1;
>               }
>           }
> @@ -2882,7 +2866,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    
> /* WHERE clause information */
>       Index *pProbe;        /* An index we are evaluating */
>       Index sPk;        /* A fake index object for the primary key */
>       LogEst aiRowEstPk[2];    /* The aiRowLogEst[] value for the sPk 
> index */
> -    i16 aiColumnPk = -1;    /* The aColumn[] value for the sPk index */
>       SrcList *pTabList;    /* The FROM clause */
>       struct SrcList_item *pSrc;    /* The FROM clause btree term to add */
>       WhereLoop *pNew;    /* Template WhereLoop object */
> @@ -2913,11 +2896,28 @@ whereLoopAddBtree(WhereLoopBuilder * 
> pBuilder,    /* WHERE clause information */
>            */
>           Index *pFirst;    /* First of real indices on the table */
>           memset(&sPk, 0, sizeof(Index));
> -        sPk.nColumn = 1;
> -        sPk.aiColumn = &aiColumnPk;
>           sPk.aiRowLogEst = aiRowEstPk;
>           sPk.onError = ON_CONFLICT_ACTION_REPLACE;
>           sPk.pTable = pTab;
> +
> +        struct key_def *key_def = key_def_new(1);
> +        if (key_def == NULL)
> +            return SQLITE_ERROR;
> +
> +        key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
> +                 ON_CONFLICT_ACTION_ABORT,
> +                 NULL, COLL_NONE, SORT_ORDER_ASC);
> +
> +        struct index_opts index_opts = index_opts_default;
> +
> +        sPk.def = index_def_new(pTab->def->id, 0, "primary",
> +                    sizeof("primary") - 1, TREE, &index_opts,
> +                    key_def, NULL);
> +        key_def_delete(key_def);
> +
> +        if (sPk.def == NULL)
> +            return SQLITE_ERROR;
> +
>           aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
>           aiRowEstPk[1] = 0;
>           pFirst = pSrc->pTab->pIndex;
> @@ -3392,8 +3392,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    
> /* The WHERE clause */
>                      index_is_unordered(pIndex)) {
>                   return 0;
>               } else {
> -                nColumn = index_column_count(pIndex);
> -                isOrderDistinct = index_is_unique(pIndex);
> +                nColumn = pIndex->def->key_def->part_count;
> +                isOrderDistinct = pIndex->def->opts.is_unique;
>               }
> 
>               /* Loop through all columns of the index and deal with the 
> ones
> @@ -3454,9 +3454,10 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    
> /* The WHERE clause */
>                    * (revIdx) for the j-th column of the index.
>                    */
>                   if (pIndex != NULL) {
> -                    iColumn = pIndex->aiColumn[j];
> -                    revIdx = sql_index_column_sort_order(pIndex,
> -                                         j);
> +                    iColumn = pIndex->def->key_def->
> +                        parts[j].fieldno;
> +                    revIdx = pIndex->def->key_def->
> +                        parts[j].sort_order;
>                       if (iColumn == pIndex->pTable->iPKey)
>                           iColumn = -1;
>                   } else {
> @@ -3506,8 +3507,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    
> /* The WHERE clause */
>                                         pOrderBy->a[i].pExpr,
>                                         &is_found, &id);
>                           struct coll *idx_coll =
> -                            sql_index_collation(pIndex,
> -                                        j, &id);
> + pIndex->def->key_def->parts[j].coll;
>                           if (is_found &&
>                               coll != idx_coll)
>                               continue;
> @@ -4777,7 +4777,7 @@ sqlite3WhereBegin(Parse * pParse,    /* The parser 
> context */
>                       sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);    /* Hint 
> to COMDB2 */
>                   }
>                   if (pIx != NULL)
> -                    VdbeComment((v, "%s", pIx->zName));
> +                    VdbeComment((v, "%s", pIx->def->name));
>                   else
>                       VdbeComment((v, "%s", idx_def->name));
>   #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
> @@ -4910,7 +4910,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
>           if (pLevel->addrSkip) {
>               sqlite3VdbeGoto(v, pLevel->addrSkip);
>               VdbeComment((v, "next skip-scan on %s",
> -                     pLoop->pIndex->zName));
> +                     pLoop->pIndex->def->name));
>               sqlite3VdbeJumpHere(v, pLevel->addrSkip);
>               sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
>           }
> diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
> index c35c25ac4..88ae0773d 100644
> --- a/src/box/sql/wherecode.c
> +++ b/src/box/sql/wherecode.c
> @@ -48,7 +48,7 @@
>   static const char *
>   explainIndexColumnName(Index * pIdx, int i)
>   {
> -    i = pIdx->aiColumn[i];
> +    i = pIdx->def->key_def->parts[i].fieldno;
>       return pIdx->pTable->def->fields[i].name;
>   }
> 
> @@ -243,7 +243,7 @@ sqlite3WhereExplainOneScan(Parse * pParse, /* Parse 
> context */
>               if (zFmt) {
>                   sqlite3StrAccumAppend(&str, " USING ", 7);
>                   if (pIdx != NULL)
> -                    sqlite3XPrintf(&str, zFmt, pIdx->zName);
> +                    sqlite3XPrintf(&str, zFmt, pIdx->def->name);
>                   else if (idx_def != NULL)
>                       sqlite3XPrintf(&str, zFmt, idx_def->name);
>                   else
> @@ -488,7 +488,7 @@ codeEqualityTerm(Parse * pParse,    /* The parsing 
> context */
>           int *aiMap = 0;
> 
>           if (pLoop->pIndex != 0 &&
> -            sql_index_column_sort_order(pLoop->pIndex, iEq)) {
> + pLoop->pIndex->def->key_def->parts[iEq].sort_order) {
>               testcase(iEq == 0);
>               testcase(bRev);
>               bRev = !bRev;
> @@ -736,7 +736,7 @@ codeAllEqualityTerms(Parse * pParse,    /* Parsing 
> context */
>           sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
>           VdbeCoverageIf(v, bRev == 0);
>           VdbeCoverageIf(v, bRev != 0);
> -        VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
> +        VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
>           j = sqlite3VdbeAddOp0(v, OP_Goto);
>           pLevel->addrSkip =
>               sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
> @@ -746,7 +746,8 @@ codeAllEqualityTerms(Parse * pParse,    /* Parsing 
> context */
>           sqlite3VdbeJumpHere(v, j);
>           for (j = 0; j < nSkip; j++) {
>               sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
> -                      pIdx->aiColumn[j], regBase + j);
> + pIdx->def->key_def->parts[j].fieldno,
> +                      regBase + j);
>               VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
>           }
>       }
> @@ -1037,14 +1038,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>           assert(pWInfo->pOrderBy == 0
>                  || pWInfo->pOrderBy->nExpr == 1
>                  || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
> -        int nIdxCol;
> +        uint32_t part_count;
>           if (pIdx != NULL)
> -            nIdxCol = index_column_count(pIdx);
> +            part_count = pIdx->def->key_def->part_count;
>           else
> -            nIdxCol = idx_def->key_def->part_count;
> +            part_count = idx_def->key_def->part_count;
>           if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
> -            && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
> -            j = pIdx->aiColumn[nEq];
> +            && pWInfo->nOBSat > 0 && (part_count > nEq)) {
> +            j = pIdx->def->key_def->parts[nEq].fieldno;
>               /* Allow seek for column with `NOT NULL` == false attribute.
>                * If a column may contain NULL-s, the comparator installed
>                * by Tarantool is prepared to seek using a NULL value.
> @@ -1055,8 +1056,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>                * FYI: entries in an index are ordered as follows:
>                *      NULL, ... NULL, min_value, ...
>                */
> -            if (j >= 0 &&
> - pIdx->pTable->def->fields[j].is_nullable) {
> +            if (pIdx->pTable->def->fields[j].is_nullable) {
>                   assert(pLoop->nSkip == 0);
>                   bSeekPastNull = 1;
>                   nExtraReg = 1;
> @@ -1095,14 +1095,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>                   assert((bRev & ~1) == 0);
>                   pLevel->iLikeRepCntr <<= 1;
>                   pLevel->iLikeRepCntr |=
> -                    bRev ^ (sql_index_column_sort_order(pIdx, nEq) ==
> +                    bRev ^ (pIdx->def->key_def->
> +                          parts[nEq].sort_order ==
>                           SORT_ORDER_DESC);
>               }
>   #endif
>               if (pRangeStart == 0) {
> -                j = pIdx->aiColumn[nEq];
> -                if (j >= 0 &&
> - pIdx->pTable->def->fields[j].is_nullable)
> +                j = pIdx->def->key_def->parts[nEq].fieldno;
> +                if (pIdx->pTable->def->fields[j].is_nullable)
>                       bSeekPastNull = 1;
>               }
>           }
> @@ -1113,10 +1113,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>            * a forward order scan on a descending index, interchange the
>            * start and end terms (pRangeStart and pRangeEnd).
>            */
> -        if ((nEq < nIdxCol &&
> -             bRev == (sql_index_column_sort_order(pIdx, nEq) ==
> +        if ((nEq < part_count &&
> +             bRev == (pIdx->def->key_def->parts[nEq].sort_order ==
>                     SORT_ORDER_ASC)) ||
> -            (bRev && nIdxCol == nEq)) {
> +            (bRev && part_count == nEq)) {
>               SWAP(pRangeEnd, pRangeStart);
>               SWAP(bSeekPastNull, bStopAtNull);
>               SWAP(nBtm, nTop);
> @@ -1196,16 +1196,17 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>               }
>           } else {
>               pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
> +            uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
>               affinity =
> - pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
> + pIdx->pTable->def->fields[fieldno].affinity;
>           }
> 
> -        int nPkCol;
> +        uint32_t pk_part_count;
>           if (pk != NULL)
> -            nPkCol = index_column_count(pk);
> +            pk_part_count = pk->def->key_def->part_count;
>           else
> -            nPkCol = idx_pk->key_def->part_count;
> -        if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
> +            pk_part_count = idx_pk->key_def->part_count;
> +        if (pk_part_count == 1 && affinity == AFFINITY_INTEGER) {
>               /* Right now INTEGER PRIMARY KEY is the only option to
>                * get Tarantool's INTEGER column type. Need special handling
>                * here: try to loosely convert FLOAT to INT. If RHS type
> @@ -1213,8 +1214,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>                */
>               int limit = pRangeStart == NULL ? nEq : nEq + 1;
>               for (int i = 0; i < limit; i++) {
> -                if ((pIdx != NULL && pIdx->aiColumn[i] ==
> -                     pk->aiColumn[0]) ||
> +                if ((pIdx != NULL &&
> + pIdx->def->key_def->parts[i].fieldno ==
> +                     pk->def->key_def->parts[0].fieldno) ||
>                       (idx_pk != NULL &&
>                        idx_def->key_def->parts[i].fieldno ==
>                        idx_pk->key_def->parts[0].fieldno)) {
> @@ -1326,17 +1328,17 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>               /* pIdx is a covering index.  No need to access the main 
> table. */
>           }  else if (iCur != iIdxCur) {
>               Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
> -            int nPkCol = index_column_count(pPk);
> -            int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
> -            for (j = 0; j < nPkCol; j++) {
> -                k = pPk->aiColumn[j];
> +            int pk_part_count = pPk->def->key_def->part_count;
> +            int iKeyReg = sqlite3GetTempRange(pParse, pk_part_count);
> +            for (j = 0; j < pk_part_count; j++) {
> +                k = pPk->def->key_def->parts[j].fieldno;
>                   sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
>                             iKeyReg + j);
>               }
>               sqlite3VdbeAddOp4Int(v, OP_NotFound, iCur, addrCont,
> -                         iKeyReg, nPkCol);
> +                         iKeyReg, pk_part_count);
>               VdbeCoverage(v);
> -            sqlite3ReleaseTempRange(pParse, iKeyReg, nPkCol);
> +            sqlite3ReleaseTempRange(pParse, iKeyReg, pk_part_count);
>           }
> 
>           /* Record the instruction used to terminate the loop. */
> @@ -1434,10 +1436,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>            */
>           if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
>               Index *pPk = sqlite3PrimaryKeyIndex(pTab);
> -            int nPkCol = index_column_count(pPk);
> +            int pk_part_count = pPk->def->key_def->part_count;
>               regRowset = pParse->nTab++;
>               sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
> -                      regRowset, nPkCol);
> +                      regRowset, pk_part_count);
>               sql_vdbe_set_p4_key_def(pParse, pPk);
>               regPk = ++pParse->nMem;
>           }
> @@ -1538,16 +1540,26 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>                           int iSet =
>                               ((ii == pOrWc->nTerm - 1) ? -1 : ii);
>                           Index *pPk = sqlite3PrimaryKeyIndex (pTab);
> -                        int nPk = index_column_count(pPk);
> -                        int iPk;
> +                        uint32_t part_count =
> +                            pPk->def->key_def->
> +                                part_count;
> 
>                           /* Read the PK into an array of temp registers. */
> -                        r = sqlite3GetTempRange(pParse, nPk);
> -                        for (iPk = 0; iPk < nPk; iPk++) {
> -                            int iCol = pPk->aiColumn[iPk];
> +                        r = sqlite3GetTempRange(pParse,
> +                                    part_count);
> +                        for (uint32_t iPk = 0;
> +                             iPk < part_count;
> +                             iPk++) {
> +                            uint32_t fieldno =
> +                                pPk->def->
> +                                key_def->
> +                                parts[iPk].
> +                                fieldno;
>                               sqlite3ExprCodeGetColumnToReg
> -                                (pParse, pTab->def,
> -                                 iCol, iCur,
> +                                (pParse,
> +                                 pTab->def,
> +                                 fieldno,
> +                                 iCur,
>                                    r + iPk);
>                           }
> 
> @@ -1563,24 +1575,28 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * 
> pWInfo,    /* Complete information about t
>                            * need to insert the key into the temp table, 
> as it will never
>                            * be tested for.
>                            */
> +                        uint32_t pk_part_count =
> +                            pPk->def->key_def->
> +                                part_count;
>                           if (iSet) {
>                               jmp1 = sqlite3VdbeAddOp4Int
>                                   (v, OP_Found,
>                                    regRowset, 0,
> -                                 r, nPk);
> +                                 r,
> +                                 pk_part_count);
>                               VdbeCoverage(v);
>                           }
>                           if (iSet >= 0) {
>                               sqlite3VdbeAddOp3
>                                   (v, OP_MakeRecord,
> -                                 r, nPk, regPk);
> +                                 r, pk_part_count, regPk);
>                               sqlite3VdbeAddOp2
>                                   (v, OP_IdxInsert,
>                                    regRowset, regPk);
>                           }
> 
>                           /* Release the array of temp registers */
> -                        sqlite3ReleaseTempRange(pParse, r, nPk);
> +                        sqlite3ReleaseTempRange(pParse, r, pk_part_count);
>                       }
> 
>                       /* Invoke the main loop body as a subroutine */
> 




More information about the Tarantool-patches mailing list