[tarantool-patches] Re: [PATCH] sql: use collation pointers instead of names

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Tue Apr 17 21:06:17 MSK 2018


Hello. Thanks for fixes, but you did not all of them. I
pushed my fixes as a separate commit on your branch. Please,
look at them, squash into your commit, and then the patch
will be ok for me.

On 17/04/2018 08:35, Kirill Yukhin wrote:
> Before the change SQL FE used collation names to refer to
> collations. Main data structures were using names as well.
> The patch fixes that and uses explicit pointers to Tarantool's
> `struct coll` during code gen and inside data structures.
> 
> Closes #3205
> ---
> Hello Vlad,
> I've fixed all your inputs, thanks!
> 
> Branch: https://github.com/tarantool/tarantool/tree/kyukhin/gh-3205-use-coll-pointers
> Issue: https://github.com/tarantool/tarantool/issues/3205
> 
>   src/box/sql.c           |  21 ++------
>   src/box/sql/alter.c     |   2 +-
>   src/box/sql/analyze.c   |   9 +---
>   src/box/sql/build.c     | 133 ++++++++++++++++++++++--------------------------
>   src/box/sql/callback.c  |  53 ++-----------------
>   src/box/sql/expr.c      | 129 ++++++++++++++++++++++------------------------
>   src/box/sql/fkey.c      |  23 +++++----
>   src/box/sql/func.c      |   3 +-
>   src/box/sql/insert.c    |  12 ++---
>   src/box/sql/pragma.c    |   9 +++-
>   src/box/sql/select.c    | 111 ++++++++++++++++++++++------------------
>   src/box/sql/sqliteInt.h |  40 ++++++++++++---
>   src/box/sql/vdbe.c      |   2 +-
>   src/box/sql/vdbeaux.c   |   7 +++
>   src/box/sql/vdbesort.c  |   4 +-
>   src/box/sql/where.c     | 102 +++++++++++++++++--------------------
>   src/box/sql/whereInt.h  |   7 ++-
>   src/box/sql/whereexpr.c |  34 ++++++++-----
>   18 files changed, 338 insertions(+), 363 deletions(-)
> 
> diff --git a/src/box/sql.c b/src/box/sql.c
> index a6713f1..614917d 100644
> --- a/src/box/sql.c
> +++ b/src/box/sql.c
> @@ -1465,12 +1465,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
>   
>   	for (i = 0; i < n; i++) {
>   		const char *t;
> -		struct coll *coll = NULL;
> +		struct coll *coll = aCol[i].coll;
>   		struct Expr *def = aCol[i].pDflt;
> -		if (aCol[i].zColl != NULL &&
> -		    strcasecmp(aCol[i].zColl, "binary") != 0) {
> -			coll = sqlite3FindCollSeq(aCol[i].zColl);
> -		}
>   		int base_len = 4;
>   		if (coll != NULL)
>   			base_len += 1;
> @@ -1571,28 +1567,19 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
>   	for (i = 0; i < n; i++) {
>   		int col = pIndex->aiColumn[i];
>   		const char *t;
> -		struct coll * collation = NULL;
>   		if (pk_forced_int == col)
>   			t = "integer";
>   		else
>   			t = convertSqliteAffinity(aCol[col].affinity, aCol[col].notNull == 0);
>   		/* do not decode default collation */
> -		if (sqlite3StrICmp(pIndex->azColl[i], "binary") != 0){
> -			collation = sqlite3FindCollSeq(pIndex->azColl[i]);
> -			/*
> -			 * At this point, the collation has already been found
> -			 * once and the assert should not fire.
> -			 */
> -			assert(collation);
> -		}
> -		p = enc->encode_map(p, collation == NULL ? 4 : 5);
> +		p = enc->encode_map(p, pIndex->coll_array[i] == NULL ? 4 : 5);
>   		p = enc->encode_str(p, "type", sizeof("type")-1);
>   		p = enc->encode_str(p, t, strlen(t));
>   		p = enc->encode_str(p, "field", sizeof("field")-1);
>   		p = enc->encode_uint(p, col);
> -		if (collation != NULL){
> +		if (pIndex->coll_array[i] != NULL) {
>   			p = enc->encode_str(p, "collation", sizeof("collation")-1);
> -			p = enc->encode_uint(p, collation->id);
> +			p = enc->encode_uint(p, pIndex->coll_array[i]->id);
>   		}
>   		p = enc->encode_str(p, "is_nullable", 11);
>   		p = enc->encode_bool(p, aCol[col].notNull == ON_CONFLICT_ACTION_NONE);
> diff --git a/src/box/sql/alter.c b/src/box/sql/alter.c
> index 129ef82..b30a973 100644
> --- a/src/box/sql/alter.c
> +++ b/src/box/sql/alter.c
> @@ -296,7 +296,7 @@ sqlite3AlterBeginAddColumn(Parse * pParse, SrcList * pSrc)
>   	for (i = 0; i < pNew->nCol; i++) {
>   		Column *pCol = &pNew->aCol[i];
>   		pCol->zName = sqlite3DbStrDup(db, pCol->zName);
> -		pCol->zColl = 0;
> +		pCol->coll = NULL;
>   		pCol->pDflt = 0;
>   	}
>   	pNew->pSchema = db->pSchema;
> diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
> index 665bfbc..f0054c5 100644
> --- a/src/box/sql/analyze.c
> +++ b/src/box/sql/analyze.c
> @@ -977,18 +977,13 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
>   				VdbeCoverage(v);
>   			}
>   			for (i = 0; i < nColTest; i++) {
> -				const char *zCollName =
> -					index_collation_name(pIdx, i);
> -				char *pColl =
> -				    (char *)sqlite3LocateCollSeq(pParse,
> -								 pParse->db,
> -								 zCollName);
> +				struct coll *coll = sql_index_collation(pIdx, i);
>   				sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
>   				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
>   						  pIdx->aiColumn[i], regTemp);
>   				aGotoChng[i] =
>   				    sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
> -						      regPrev + i, pColl,
> +						      regPrev + i, (char *)coll,
>   						      P4_COLLSEQ);
>   				sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
>   				VdbeCoverage(v);
> diff --git a/src/box/sql/build.c b/src/box/sql/build.c
> index c6185e4..74b231a 100644
> --- a/src/box/sql/build.c
> +++ b/src/box/sql/build.c
> @@ -300,7 +300,6 @@ sqlite3DeleteColumnNames(sqlite3 * db, Table * pTable)
>   		for (i = 0; i < pTable->nCol; i++, pCol++) {
>   			sqlite3DbFree(db, pCol->zName);
>   			sql_expr_free(db, pCol->pDflt, false);
> -			sqlite3DbFree(db, pCol->zColl);
>   		}
>   		sqlite3DbFree(db, pTable->aCol);
>   	}
> @@ -952,6 +951,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>   	Table *p;
>   	int i;
>   	char *zColl;		/* Dequoted name of collation sequence */
> +	struct coll *coll;
>   	sqlite3 *db;
>   
>   	if ((p = pParse->pNewTable) == 0)
> @@ -962,10 +962,10 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>   	if (!zColl)
>   		return;
>   
> -	if (sqlite3LocateCollSeq(pParse, db, zColl)) {
> +	coll =  sqlite3LocateCollSeq(pParse, db, zColl);
> +	if (coll != NULL) {
>   		Index *pIdx;
> -		sqlite3DbFree(db, p->aCol[i].zColl);
> -		p->aCol[i].zColl = zColl;
> +		p->aCol[i].coll = coll;
>   
>   		/* If the column is declared as "<name> PRIMARY KEY COLLATE <type>",
>   		 * then an index may have been created on this column before the
> @@ -973,9 +973,8 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>   		 */
>   		for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
>   			assert(pIdx->nColumn == 1);
> -			if (pIdx->aiColumn[0] == i) {
> -				pIdx->azColl[0] = column_collation_name(p, i);
> -			}
> +			if (pIdx->aiColumn[0] == i)
> +				pIdx->coll_array[0] = sql_column_collation(p, i);
>   		}
>   	} else {
>   		sqlite3DbFree(db, zColl);
> @@ -983,14 +982,14 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
>   }
>   
>   /**
> - * Return name of given column collation from table.
> + * Return collation of given column from table.
>    *
>    * @param table Table which is used to fetch column.
>    * @param column Number of column.
> - * @retval Pointer to collation's name.
> + * @retval Pointer to collation.
>    */
> -const char *
> -column_collation_name(Table *table, uint32_t column)
> +struct coll *
> +sql_column_collation(Table *table, uint32_t column)
>   {
>   	assert(table != NULL);
>   	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(table->tnum);
> @@ -1007,15 +1006,12 @@ column_collation_name(Table *table, uint32_t column)
>   	 * In cases mentioned above collation is fetched from
>   	 * SQL specific structures.
>   	 */
> -	if (space == NULL || space_index(space, 0) == NULL)
> -		return table->aCol[column].zColl;
> -
> -	/* "BINARY" is a name for default collation in SQL. */
> -	char *coll_name = (char *)sqlite3StrBINARY;
> -	if (space->format->fields[column].coll != NULL) {
> -		coll_name = space->format->fields[column].coll->name;
> +	if (space == NULL || space_index(space, 0) == NULL) {
> +		assert(column < (uint32_t)table->nCol);
> +		return table->aCol[column].coll;
>   	}
> -	return coll_name;
> +
> +	return space->format->fields[column].coll;
>   }
>   
>   /**
> @@ -1023,30 +1019,30 @@ column_collation_name(Table *table, uint32_t column)
>    *
>    * @param idx Index which is used to fetch column.
>    * @param column Number of column.
> - * @retval Pointer to collation's name.
> + * @retval Pointer to collation.
>    */
> -const char *
> -index_collation_name(Index *idx, uint32_t column)
> +struct coll *
> +sql_index_collation(Index *idx, uint32_t column)
>   {
>   	assert(idx != NULL);
>   	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
>   	struct space *space = space_by_id(space_id);
> +
> +	assert(column < idx->nColumn);
>   	/*
>   	 * If space is still under construction, or it is
>   	 * an ephemeral space, then fetch collation from
>   	 * SQL internal structure.
>   	 */
> -	if (space == NULL)
> -		return (char *)idx->azColl[column];
> +	if (space == NULL) {
> +		assert(column < idx->nColumn);
> +		return idx->coll_array[column];
> +	}
>   
>   	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
>   	struct index *index = space_index(space, index_id);
>   	assert(index != NULL && index->def->key_def->part_count >= column);
> -	struct coll *coll = index->def->key_def->parts[column].coll;
> -	/* "BINARY" is a name for default collation in SQL. */
> -	if (coll == NULL)
> -		return (char *)sqlite3StrBINARY;
> -	return index->def->key_def->parts[column].coll->name;
> +	return index->def->key_def->parts[column].coll;
>   }
>   
>   /**
> @@ -1117,6 +1113,9 @@ sqlite3LocateCollSeq(Parse * pParse, sqlite3 * db, const char *zName)
>   	struct coll *pColl;
>   	initbusy = db->init.busy;
>   
> +	if (sqlite3StrICmp(zName, "binary") == 0)
> +		return NULL;
> +
>   	pColl = sqlite3FindCollSeq(zName);
>   	if (!initbusy && (!pColl)) {
>   		pColl = sqlite3GetCollSeq(pParse, pColl, zName);
> @@ -2623,7 +2622,7 @@ sqlite3AllocateIndexObject(sqlite3 * db,	/* Database connection */
>   	p = sqlite3DbMallocZero(db, nByte + nExtra);
>   	if (p) {
>   		char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
> -		p->azColl = (const char **)pExtra;
> +		p->coll_array = (struct coll **)pExtra;
>   		pExtra += ROUND8(sizeof(char *) * nCol);
>   		p->aiRowLogEst = (LogEst *) pExtra;
>   		pExtra += sizeof(LogEst) * (nCol + 1);
> @@ -2920,7 +2919,7 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
>   		goto exit_create_index;
>   	}
>   	assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
> -	assert(EIGHT_BYTE_ALIGNMENT(pIndex->azColl));
> +	assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
>   	pIndex->zName = zExtra;
>   	zExtra += nName + 1;
>   	memcpy(pIndex->zName, zName, nName + 1);
> @@ -2959,7 +2958,7 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
>   	for (i = 0, pListItem = pList->a; i < pList->nExpr; i++, pListItem++) {
>   		Expr *pCExpr;	/* The i-th index expression */
>   		int requestedSortOrder;	/* ASC or DESC on the i-th expression */
> -		const char *zColl;	/* Collation sequence name */
> +		struct coll *coll;
>   		sqlite3ResolveSelfReference(pParse, pTab, NC_IdxExpr,
>   					    pListItem->pExpr, 0);
>   		if (pParse->nErr)
> @@ -2978,25 +2977,21 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
>   			}
>   			pIndex->aiColumn[i] = (i16) j;
>   		}
> -		zColl = 0;
> +		coll = NULL;
> +		const char *coll_name;
>   		if (pListItem->pExpr->op == TK_COLLATE) {
> -			int nColl;
> -			zColl = pListItem->pExpr->u.zToken;
> -			nColl = sqlite3Strlen30(zColl) + 1;
> -			assert(nExtra >= nColl);
> -			memcpy(zExtra, zColl, nColl);
> -			zColl = zExtra;
> -			zExtra += nColl;
> -			nExtra -= nColl;
> +			coll_name = pListItem->pExpr->u.zToken;
> +			coll = sqlite3GetCollSeq(pParse, 0, coll_name);
> +
> +			if (coll == NULL &&
> +			    sqlite3StrICmp(coll_name, "binary") != 0) {
> +				goto exit_create_index;
> +			}
>   		} else if (j >= 0) {
> -			zColl = column_collation_name(pTab, j);
> -		}
> -		if (!zColl)
> -			zColl = sqlite3StrBINARY;
> -		if (!db->init.busy && !sqlite3LocateCollSeq(pParse, db, zColl)) {
> -			goto exit_create_index;
> +			coll = sql_column_collation(pTab, j);
>   		}
> -		pIndex->azColl[i] = zColl;
> +		pIndex->coll_array[i] = coll;
> +
>   		/* Tarantool: DESC indexes are not supported so far.
>   		 * See gh-3016.
>   		 */
> @@ -3040,14 +3035,13 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
>   			if (pIdx->nColumn != pIndex->nColumn)
>   				continue;
>   			for (k = 0; k < pIdx->nColumn; k++) {
> -				const char *z1;
> -				const char *z2;
>   				assert(pIdx->aiColumn[k] >= 0);
>   				if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
>   					break;
> -				z1 = index_collation_name(pIdx, k);
> -				z2 = index_collation_name(pIndex, k);
> -				if (strcmp(z1, z2))
> +				struct coll *coll1, *coll2;
> +				coll1 = sql_index_collation(pIdx, k);
> +				coll2 = sql_index_collation(pIndex, k);
> +				if (coll1 != coll2)
>   					break;
>   			}
>   			if (k == pIdx->nColumn) {
> @@ -3963,19 +3957,18 @@ sqlite3UniqueConstraint(Parse * pParse,	/* Parsing context */
>    * true if it does and false if it does not.
>    */
>   #ifndef SQLITE_OMIT_REINDEX
> -static int
> -collationMatch(const char *zColl, Index * pIndex)
> +static bool
> +collationMatch(struct coll *coll, struct Index *index)
>   {
>   	int i;
> -	assert(zColl != 0);
> -	for (i = 0; i < pIndex->nColumn; i++) {
> -		const char *z = index_collation_name(pIndex, i);
> -		assert(z != 0 || pIndex->aiColumn[i] < 0);
> -		if (pIndex->aiColumn[i] >= 0 && 0 == sqlite3StrICmp(z, zColl)) {
> -			return 1;
> -		}
> +	assert(coll != 0);
> +	for (i = 0; i < index->nColumn; i++) {
> +		struct coll *idx_coll = sql_index_collation(index, i);
> +		assert(idx_coll != 0 || index->aiColumn[i] < 0);
> +		if (index->aiColumn[i] >= 0 && coll == idx_coll)
> +			return true;
>   	}
> -	return 0;
> +	return false;
>   }
>   #endif
>   
> @@ -3985,12 +3978,12 @@ collationMatch(const char *zColl, Index * pIndex)
>    */
>   #ifndef SQLITE_OMIT_REINDEX
>   static void
> -reindexTable(Parse * pParse, Table * pTab, char const *zColl)
> +reindexTable(Parse * pParse, Table * pTab, struct coll *coll)
>   {
>   	Index *pIndex;		/* An index associated with pTab */
>   
>   	for (pIndex = pTab->pIndex; pIndex; pIndex = pIndex->pNext) {
> -		if (zColl == 0 || collationMatch(zColl, pIndex)) {
> +		if (coll == 0 || collationMatch(coll, pIndex)) {
>   			sql_set_multi_write(pParse, false);
>   			sqlite3RefillIndex(pParse, pIndex, -1);
>   		}
> @@ -4005,7 +3998,7 @@ reindexTable(Parse * pParse, Table * pTab, char const *zColl)
>    */
>   #ifndef SQLITE_OMIT_REINDEX
>   static void
> -reindexDatabases(Parse * pParse, char const *zColl)
> +reindexDatabases(Parse * pParse, struct coll *coll)
>   {
>   	sqlite3 *db = pParse->db;	/* The database connection */
>   	HashElem *k;		/* For looping over tables in pSchema */
> @@ -4015,7 +4008,7 @@ reindexDatabases(Parse * pParse, char const *zColl)
>   	for (k = sqliteHashFirst(&db->pSchema->tblHash); k;
>   	     k = sqliteHashNext(k)) {
>   		pTab = (Table *) sqliteHashData(k);
> -		reindexTable(pParse, pTab, zColl);
> +		reindexTable(pParse, pTab, coll);
>   	}
>   }
>   #endif
> @@ -4057,7 +4050,7 @@ sqlite3Reindex(Parse * pParse, Token * pName1, Token * pName2)
>   			return;
>   		pColl = sqlite3FindCollSeq(zColl);
>   		if (pColl) {
> -			reindexDatabases(pParse, zColl);
> +			reindexDatabases(pParse, pColl);
>   			sqlite3DbFree(db, zColl);
>   			return;
>   		}
> @@ -4126,9 +4119,7 @@ sqlite3KeyInfoOfIndex(Parse * pParse, sqlite3 * db, Index * pIdx)
>   	if (pKey) {
>   		assert(sqlite3KeyInfoIsWriteable(pKey));
>   		for (i = 0; i < nCol; i++) {
> -			const char *zColl = index_collation_name(pIdx, i);
> -			pKey->aColl[i] = zColl == sqlite3StrBINARY ? 0 :
> -			    sqlite3LocateCollSeq(pParse, db, zColl);
> +			pKey->aColl[i] = sql_index_collation(pIdx, i);
>   			pKey->aSortOrder[i] = pIdx->aSortOrder[i];
>   		}
>   		if (pParse && pParse->nErr) {
> diff --git a/src/box/sql/callback.c b/src/box/sql/callback.c
> index b176931..0d34e6a 100644
> --- a/src/box/sql/callback.c
> +++ b/src/box/sql/callback.c
> @@ -62,10 +62,9 @@ sqlite3GetCollSeq(Parse * pParse,	/* Parsing context */
>   	struct coll *p;
>   
>   	p = pColl;
> -	if (!p) {
> +	if (p == NULL)
>   		p = sqlite3FindCollSeq(zName);
> -	}
> -	if (p == 0) {
> +	if (p == NULL && (sqlite3StrICmp(zName, "binary") != 0)) {
>   		if (pParse)
>   			sqlite3ErrorMsg(pParse,
>   					"no such collation sequence: %s",
> @@ -102,49 +101,6 @@ sqlite3CheckCollSeq(Parse * pParse, struct coll * pColl)
>   	return SQLITE_OK;
>   }
>   
> -/*
> - * This is the default collating function named "BINARY" which is always
> - * available.
> - * It is hardcoded to support Tarantool's collation interface.
> - */
> -static int
> -binCollFunc(const char *pKey1, size_t nKey1, const char *pKey2, size_t nKey2, const struct coll * collation)
> -{
> -	int rc;
> -	size_t n;
> -	(void) collation;
> -	n = nKey1 < nKey2 ? nKey1 : nKey2;
> -	/* EVIDENCE-OF: R-65033-28449 The built-in BINARY collation compares
> -	 * strings byte by byte using the memcmp() function from the standard C
> -	 * library.
> -	 */
> -	rc = memcmp(pKey1, pKey2, n);
> -	if (rc == 0) {
> -		rc = (int)nKey1 - (int)nKey2;
> -	}
> -	return rc;
> -}
> -
> -/*
> - * This hardcoded structure created just to be called the same way
> - * as collations in Tarantool, to support binary collation easily.
> - */
> -
> -struct coll_plus_name_struct{
> -    struct coll collation;
> -    char name[20]; /* max of possible name lengths */
> -};
> -static struct coll_plus_name_struct binary_coll_with_name =
> -	{{0, 0, COLL_TYPE_ICU, {0}, binCollFunc, 0, sizeof("BINARY"), {}},
> -		"BINARY"};
> -static struct coll * binary_coll = (struct coll*)&binary_coll_with_name;
> -
> -struct coll *
> -sql_default_coll()
> -{
> -	return binary_coll;
> -}
> -
>   /**
>    * Return the coll* pointer for the collation sequence named zName.
>    *
> @@ -158,9 +114,8 @@ sql_default_coll()
>   struct coll *
>   sqlite3FindCollSeq(const char *zName)
>   {
> -	if (zName == NULL || sqlite3StrICmp(zName, "binary")==0){
> -		return binary_coll;
> -	}
> +	if (zName == NULL || sqlite3StrICmp(zName, "binary")==0)
> +		return 0;
>   	return coll_by_name(zName, strlen(zName));
>   }
>   
> diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
> index 8071314..9ac67fb 100644
> --- a/src/box/sql/expr.c
> +++ b/src/box/sql/expr.c
> @@ -158,21 +158,13 @@ sqlite3ExprSkipCollate(Expr * pExpr)
>   	return pExpr;
>   }
>   
> -/*
> - * Return the collation sequence for the expression pExpr. If
> - * there is no defined collating sequence, return NULL.
> - *
> - * The collating sequence might be determined by a COLLATE operator
> - * or by the presence of a column with a defined collating sequence.
> - * COLLATE operators take first precedence.  Left operands take
> - * precedence over right operands.
> - */
>   struct coll *
> -sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
> +sql_expr_coll(Parse *parse, Expr *expr, bool *found)
>   {
> -	struct coll *pColl = 0;
> -	Expr *p = pExpr;
> -	while (p) {
> +	struct coll *coll = 0;
> +	Expr *p = expr;
> +	*found = false;
> +	while (p != NULL) {
>   		int op = p->op;
>   		if (p->flags & EP_Generic)
>   			break;
> @@ -180,23 +172,22 @@ sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
>   			p = p->pLeft;
>   			continue;
>   		}
> -		if (op == TK_COLLATE
> -		    || (op == TK_REGISTER && p->op2 == TK_COLLATE)) {
> -			pColl =
> -			    sqlite3GetCollSeq(pParse, 0, p->u.zToken);
> +		if (op == TK_COLLATE ||
> +		    (op == TK_REGISTER && p->op2 == TK_COLLATE)) {
> +			coll = sqlite3GetCollSeq(parse, NULL, p->u.zToken);
> +			*found = true;
>   			break;
>   		}
> -		if ((op == TK_AGG_COLUMN || op == TK_COLUMN
> -		     || op == TK_REGISTER || op == TK_TRIGGER)
> -		    && p->pTab != 0) {
> +		if ((op == TK_AGG_COLUMN || op == TK_COLUMN ||
> +		     op == TK_REGISTER || op == TK_TRIGGER) &&
> +		    p->pTab != 0) {
>   			/* op==TK_REGISTER && p->pTab!=0 happens when pExpr was originally
>   			 * a TK_COLUMN but was previously evaluated and cached in a register
>   			 */
>   			int j = p->iColumn;
>   			if (j >= 0) {
> -				const char *zColl =
> -					column_collation_name(p->pTab, j);
> -				pColl = sqlite3FindCollSeq(zColl);
> +				coll = sql_column_collation(p->pTab, j);
> +				*found = true;
>   			}
>   			break;
>   		}
> @@ -204,40 +195,37 @@ sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
>   			if (p->pLeft && (p->pLeft->flags & EP_Collate) != 0) {
>   				p = p->pLeft;
>   			} else {
> -				Expr *pNext = p->pRight;
> +				Expr *next = p->pRight;
>   				/* The Expr.x union is never used at the same time as Expr.pRight */
>   				assert(p->x.pList == 0 || p->pRight == 0);
>   				/* p->flags holds EP_Collate and p->pLeft->flags does not.  And
>   				 * p->x.pSelect cannot.  So if p->x.pLeft exists, it must hold at
>   				 * least one EP_Collate. Thus the following two ALWAYS.
>   				 */
> -				if (p->x.pList != 0
> -				    &&
> +				if (p->x.pList != 0 &&
>   				    ALWAYS(!ExprHasProperty(p, EP_xIsSelect))) {
> -					int i;
> -					for (i = 0;
> +					for (int i = 0;
>   					     ALWAYS(i < p->x.pList->nExpr);
>   					     i++) {
> -						if (ExprHasProperty
> -						    (p->x.pList->a[i].pExpr,
> -						     EP_Collate)) {
> -							pNext =
> -							    p->x.pList->a[i].
> -							    pExpr;
> +						Expr *e;
> +						e = p->x.pList->a[i].pExpr;
> +						if (ExprHasProperty(e,
> +								    EP_Collate)) {
> +							next = e;
>   							break;
>   						}
>   					}
>   				}
> -				p = pNext;
> +				p = next;
>   			}
>   		} else {
>   			break;
>   		}
>   	}
> -	if (sqlite3CheckCollSeq(pParse, pColl)) {
> -		pColl = 0;
> -	}
> -	return pColl;
> +	if (sqlite3CheckCollSeq(parse, coll))
> +		parse = 0;
> +
> +	return coll;
>   }
>   
>   /*
> @@ -344,19 +332,19 @@ binaryCompareP5(Expr * pExpr1, Expr * pExpr2, int jumpIfNull)
>   struct coll *
>   sqlite3BinaryCompareCollSeq(Parse * pParse, Expr * pLeft, Expr * pRight)
>   {
> -	struct coll *pColl;
> +	struct coll *coll;
> +	bool found;
>   	assert(pLeft);
>   	if (pLeft->flags & EP_Collate) {
> -		pColl = sqlite3ExprCollSeq(pParse, pLeft);
> +		coll = sql_expr_coll(pParse, pLeft, &found);
>   	} else if (pRight && (pRight->flags & EP_Collate) != 0) {
> -		pColl = sqlite3ExprCollSeq(pParse, pRight);
> +		coll = sql_expr_coll(pParse, pRight, &found);
>   	} else {
> -		pColl = sqlite3ExprCollSeq(pParse, pLeft);
> -		if (!pColl) {
> -			pColl = sqlite3ExprCollSeq(pParse, pRight);
> -		}
> +		coll = sql_expr_coll(pParse, pLeft, &found);
> +		if (!found)
> +			coll = sql_expr_coll(pParse, pRight, &found);
>   	}
> -	return pColl;
> +	return coll;
>   }
>   
>   /*
> @@ -2520,14 +2508,15 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
>   						    (pParse, pLhs, pRhs);
>   					int j;
>   
> -					assert(pReq != 0 || pParse->nErr);
>   					for (j = 0; j < nExpr; j++) {
> -						if (pIdx->aiColumn[j]
> -						    != pRhs->iColumn)
> +						if (pIdx->aiColumn[j] !=
> +						    pRhs->iColumn) {
>   							continue;
> -						if (pReq != 0 && strcmp
> -						    (pReq->name,
> -						     index_collation_name(pIdx, j)) != 0) {
> +						}
> +						struct coll *idx_coll;
> +						idx_coll = sql_index_collation(pIdx, j);
> +						if (pReq != NULL &&
> +						    pReq != idx_coll) {
>   							continue;
>   						}
>   						break;
> @@ -2879,11 +2868,13 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
>   					affinity = SQLITE_AFF_BLOB;
>   				}
>   				if (pKeyInfo) {
> +					bool found; /* Not Used.  */
>   					assert(sqlite3KeyInfoIsWriteable
>   					       (pKeyInfo));
>   					pKeyInfo->aColl[0] =
> -					    sqlite3ExprCollSeq(pParse,
> -							       pExpr->pLeft);
> +						sql_expr_coll(pParse,
> +							      pExpr->pLeft,
> +							      &found);
>   				}
>   
>   				/* Loop through each expression in <exprlist>. */
> @@ -3140,8 +3131,10 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
>   	 * This is step (1) in the in-operator.md optimized algorithm.
>   	 */
>   	if (eType == IN_INDEX_NOOP) {
> +		bool found; /* Not used. */
>   		ExprList *pList = pExpr->x.pList;
> -		struct coll *pColl = sqlite3ExprCollSeq(pParse, pExpr->pLeft);
> +		struct coll *coll = sql_expr_coll(pParse, pExpr->pLeft,
> +						   &found);
>   		int labelOk = sqlite3VdbeMakeLabel(v);
>   		int r2, regToFree;
>   		int regCkNull = 0;
> @@ -3161,14 +3154,14 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
>   			}
>   			if (ii < pList->nExpr - 1 || destIfNull != destIfFalse) {
>   				sqlite3VdbeAddOp4(v, OP_Eq, rLhs, labelOk, r2,
> -						  (void *)pColl, P4_COLLSEQ);
> +						  (void *)coll, P4_COLLSEQ);
>   				VdbeCoverageIf(v, ii < pList->nExpr - 1);
>   				VdbeCoverageIf(v, ii == pList->nExpr - 1);
>   				sqlite3VdbeChangeP5(v, zAff[0]);
>   			} else {
>   				assert(destIfNull == destIfFalse);
>   				sqlite3VdbeAddOp4(v, OP_Ne, rLhs, destIfFalse,
> -						  r2, (void *)pColl,
> +						  r2, (void *)coll,
>   						  P4_COLLSEQ);
>   				VdbeCoverage(v);
>   				sqlite3VdbeChangeP5(v,
> @@ -3277,9 +3270,10 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
>   	for (i = 0; i < nVector; i++) {
>   		Expr *p;
>   		struct coll *pColl;
> +		bool found;
>   		int r3 = sqlite3GetTempReg(pParse);
>   		p = sqlite3VectorFieldSubexpr(pLeft, i);
> -		pColl = sqlite3ExprCollSeq(pParse, p);
> +		pColl = sql_expr_coll(pParse, p, &found);
>   		/* Tarantool: Replace i -> aiMap [i], since original order of columns
>   		 * is preserved.
>   		 */
> @@ -4066,7 +4060,7 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
>   			u32 constMask = 0;	/* Mask of function arguments that are constant */
>   			int i;	/* Loop counter */
>   			sqlite3 *db = pParse->db;	/* The database connection */
> -			struct coll *pColl = 0;	/* A collating sequence */
> +			struct coll *coll = 0;	/* A collating sequence */
>   
>   			assert(!ExprHasProperty(pExpr, EP_xIsSelect));
>   			if (ExprHasProperty(pExpr, EP_TokenOnly)) {
> @@ -4133,11 +4127,12 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
>   					constMask |= MASKBIT32(i);
>   				}
>   				if ((pDef->funcFlags & SQLITE_FUNC_NEEDCOLL) !=
> -				    0 && !pColl) {
> -					pColl =
> -					    sqlite3ExprCollSeq(pParse,
> -							       pFarg->a[i].
> -							       pExpr);
> +				    0 && coll == NULL) {
> +					bool found; /* Not used.  */
> +					coll = sql_expr_coll(pParse,
> +							      pFarg->a[i].
> +							      pExpr,
> +							      &found);
>   				}
>   			}
>   			if (pFarg) {
> @@ -4186,10 +4181,8 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
>   				r1 = 0;
>   			}
>   			if (pDef->funcFlags & SQLITE_FUNC_NEEDCOLL) {
> -				if (!pColl)
> -					pColl = sql_default_coll();
>   				sqlite3VdbeAddOp4(v, OP_CollSeq, 0, 0, 0,
> -						  (char *)pColl, P4_COLLSEQ);
> +						  (char *)coll, P4_COLLSEQ);
>   			}
>   			sqlite3VdbeAddOp4(v, OP_Function0, constMask, r1,
>   					  target, (char *)pDef, P4_FUNCDEF);
> diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
> index f56b6d9..a0a35e0 100644
> --- a/src/box/sql/fkey.c
> +++ b/src/box/sql/fkey.c
> @@ -287,7 +287,6 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
>   				int i, j;
>   				for (i = 0; i < nCol; i++) {
>   					i16 iCol = pIdx->aiColumn[i];	/* Index of column in parent tbl */
> -					const char *zDfltColl;	/* Def. collation for column */
>   					char *zIdxCol;	/* Name of indexed column */
>   
>   					if (iCol < 0)
> @@ -297,11 +296,12 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
>   					 * the default collation sequence for the column, this index is
>   					 * unusable. Bail out early in this case.
>   					 */
> -					zDfltColl =
> -						column_collation_name(pParent,
> -								      iCol);
> -					if (strcmp
> -					    (index_collation_name(pIdx, i), zDfltColl))
> +					struct coll *def_coll;
> +					def_coll = sql_column_collation(pParent,
> +									iCol);
> +					struct coll *coll;
> +					coll = sql_index_collation(pIdx, i);
> +					if (def_coll != coll)
>   						break;
>   
>   					zIdxCol = pParent->aCol[iCol].zName;
> @@ -526,7 +526,6 @@ exprTableRegister(Parse * pParse,	/* Parsing and code generating context */
>   {
>   	Expr *pExpr;
>   	Column *pCol;
> -	const char *zColl;
>   	sqlite3 *db = pParse->db;
>   
>   	pExpr = sqlite3Expr(db, TK_REGISTER, 0);
> @@ -535,9 +534,13 @@ exprTableRegister(Parse * pParse,	/* Parsing and code generating context */
>   			pCol = &pTab->aCol[iCol];
>   			pExpr->iTable = regBase + iCol + 1;
>   			pExpr->affinity = pCol->affinity;
> -			zColl = column_collation_name(pTab, iCol);
> -			pExpr =
> -			    sqlite3ExprAddCollateString(pParse, pExpr, zColl);
> +			const char *coll_name;
> +			if (pCol->coll == NULL && pCol->coll != NULL)
> +				coll_name = pCol->coll->name;
> +			else
> +				coll_name = "binary";
> +			pExpr = sqlite3ExprAddCollateString(pParse, pExpr,
> +							    coll_name);
>   		} else {
>   			pExpr->iTable = regBase;
>   			pExpr->affinity = SQLITE_AFF_INTEGER;
> diff --git a/src/box/sql/func.c b/src/box/sql/func.c
> index 47b45de..dcac22c 100644
> --- a/src/box/sql/func.c
> +++ b/src/box/sql/func.c
> @@ -54,7 +54,7 @@ sqlite3GetFuncCollSeq(sqlite3_context * context)
>   	assert(context->pVdbe != 0);
>   	pOp = &context->pVdbe->aOp[context->iOp - 1];
>   	assert(pOp->opcode == OP_CollSeq);
> -	assert(pOp->p4type == P4_COLLSEQ);
> +	assert(pOp->p4type == P4_COLLSEQ || pOp->p4.pColl == NULL);
>   	return pOp->p4.pColl;
>   }
>   
> @@ -82,7 +82,6 @@ minmaxFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
>   	assert(argc > 1);
>   	mask = sqlite3_user_data(context) == 0 ? 0 : -1;
>   	pColl = sqlite3GetFuncCollSeq(context);
> -	assert(pColl);
>   	assert(mask == -1 || mask == 0);
>   	iBest = 0;
>   	if (sqlite3_value_type(argv[0]) == SQLITE_NULL)
> diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
> index ae8dafb..f04496a 100644
> --- a/src/box/sql/insert.c
> +++ b/src/box/sql/insert.c
> @@ -1419,9 +1419,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
>   					      regIdx : regR);
>   
>   				for (i = 0; i < nPkCol; i++) {
> -					char *p4 = (char *)
> -						sqlite3LocateCollSeq(pParse, db,
> -								     index_collation_name(pPk, i));
> +					char *p4 = (char *)sql_index_collation(pPk, i);
>   					x = pPk->aiColumn[i];
>   					assert(x >= 0);
>   					if (i == (nPkCol - 1)) {
> @@ -1666,8 +1664,8 @@ xferCompatibleIndex(Index * pDest, Index * pSrc)
>   		if (pSrc->aSortOrder[i] != pDest->aSortOrder[i]) {
>   			return 0;	/* Different sort orders */
>   		}
> -		if (strcasecmp(index_collation_name(pSrc, i),
> -			       index_collation_name(pDest, i)) != 0) {
> +		if (sql_index_collation(pSrc, i) !=
> +		    sql_index_collation(pDest, i)) {
>   			return 0;	/* Different collating sequences */
>   		}
>   	}
> @@ -1806,8 +1804,8 @@ xferOptimization(Parse * pParse,	/* Parser context */
>   		if (pDestCol->affinity != pSrcCol->affinity) {
>   			return 0;	/* Affinity must be the same on all columns */
>   		}
> -		if (strcasecmp(column_collation_name(pDest, i),
> -			       column_collation_name(pSrc, i)) != 0) {
> +		if (sql_column_collation(pDest, i) !=
> +		    sql_column_collation(pSrc, i)) {
>   			return 0;	/* Collating sequence must be the same on all columns */
>   		}
>   		if (!table_column_is_nullable(pDest, i)
> diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
> index b724c98..a2a6391 100644
> --- a/src/box/sql/pragma.c
> +++ b/src/box/sql/pragma.c
> @@ -452,13 +452,20 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
>   								     aCol[cnum].
>   								     zName);
>   						if (pPragma->iArg) {
> +							const char *c_n;
> +							struct coll *coll;
> +							coll = sql_index_collation(pIdx, i);
> +							if (coll != NULL)
> +								c_n = coll->name;
> +							else
> +								c_n = "BINARY";
>   							sqlite3VdbeMultiLoad(v,
>   									     4,
>   									     "isi",
>   									     pIdx->
>   									     aSortOrder
>   									     [i],
> -									     index_collation_name(pIdx, i),
> +									     c_n,
>   									     i <
>   									     mx);
>   						}
> diff --git a/src/box/sql/select.c b/src/box/sql/select.c
> index d97e466..5a39435 100644
> --- a/src/box/sql/select.c
> +++ b/src/box/sql/select.c
> @@ -908,10 +908,12 @@ selectInnerLoop(Parse * pParse,		/* The parser context */
>   
>   				iJump = sqlite3VdbeCurrentAddr(v) + nResultCol;
>   				for (i = 0; i < nResultCol; i++) {
> -					struct coll *pColl =
> -					    sqlite3ExprCollSeq(pParse,
> -							       pEList->a[i].
> -							       pExpr);
> +					bool found;
> +					struct coll *coll =
> +					    sql_expr_coll(pParse,
> +							  pEList->a[i].
> +							  pExpr,
> +							  &found);
>   					if (i < nResultCol - 1) {
>   						sqlite3VdbeAddOp3(v, OP_Ne,
>   								  regResult + i,
> @@ -925,9 +927,11 @@ selectInnerLoop(Parse * pParse,		/* The parser context */
>   								  regPrev + i);
>   						VdbeCoverage(v);
>   					}
> -					sqlite3VdbeChangeP4(v, -1,
> -							    (const char *)pColl,
> -							    P4_COLLSEQ);
> +					if (found) {
> +						sqlite3VdbeChangeP4(v, -1,
> +								    (const char *)coll,
> +								    P4_COLLSEQ);
> +					}
>   					sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
>   				}
>   				assert(sqlite3VdbeCurrentAddr(v) == iJump
> @@ -1288,11 +1292,12 @@ keyInfoFromExprList(Parse * pParse,	/* Parsing context */
>   		assert(sqlite3KeyInfoIsWriteable(pInfo));
>   		for (i = iStart, pItem = pList->a + iStart; i < nExpr;
>   		     i++, pItem++) {
> -			struct coll *pColl;
> -			pColl = sqlite3ExprCollSeq(pParse, pItem->pExpr);
> -			if (!pColl)
> -				pColl = sql_default_coll();
> -			pInfo->aColl[i - iStart] = pColl;
> +			bool found; /* Not used.  */
> +			struct coll *coll;
> +			coll = sql_expr_coll(pParse,
> +					     pItem->pExpr,
> +					     &found);
> +			pInfo->aColl[i - iStart] = coll;
>   			pInfo->aSortOrder[i - iStart] = pItem->sortOrder;
>   		}
>   	}
> @@ -1935,7 +1940,6 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
>   	sqlite3 *db = pParse->db;
>   	NameContext sNC;
>   	Column *pCol;
> -	struct coll *pColl;
>   	int i;
>   	Expr *p;
>   	struct ExprList_item *a;
> @@ -1950,6 +1954,7 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
>   	sNC.pSrcList = pSelect->pSrc;
>   	a = pSelect->pEList->a;
>   	for (i = 0, pCol = pTab->aCol; i < pTab->nCol; i++, pCol++) {
> +		struct coll *coll;
>   		enum field_type type;
>   		p = a[i].pExpr;
>   		type = columnType(&sNC, p, 0, 0, 0, &pCol->szEst);
> @@ -1959,10 +1964,10 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
>   
>   		if (pCol->affinity == 0)
>   			pCol->affinity = SQLITE_AFF_BLOB;
> -		pColl = sqlite3ExprCollSeq(pParse, p);
> -		if (pColl && pCol->zColl == 0) {
> -			pCol->zColl = sqlite3DbStrDup(db, pColl->name);
> -		}
> +		bool found;
> +		coll = sql_expr_coll(pParse, p, &found);
> +		if (coll && pCol->coll == 0)
> +			pCol->coll = coll;
>   	}
>   	pTab->szTabRow = sqlite3LogEst(szAll * 4);
>   }
> @@ -2123,23 +2128,24 @@ computeLimitRegisters(Parse * pParse, Select * p, int iBreak)
>    * left-most term of the select that has a collating sequence.
>    */
>   static struct coll *
> -multiSelectCollSeq(Parse * pParse, Select * p, int iCol)
> +multiSelectCollSeq(Parse * pParse, Select * p, int iCol, bool *found)
>   {
> -	struct coll *pRet;
> -	if (p->pPrior) {
> -		pRet = multiSelectCollSeq(pParse, p->pPrior, iCol);
> -	} else {
> -		pRet = 0;
> -	}
> +	struct coll *coll;
> +	if (p->pPrior)
> +		coll = multiSelectCollSeq(pParse, p->pPrior, iCol, found);
> +	else
> +		coll = NULL;
>   	assert(iCol >= 0);
>   	/* iCol must be less than p->pEList->nExpr.  Otherwise an error would
>   	 * have been thrown during name resolution and we would not have gotten
>   	 * this far
>   	 */
> -	if (pRet == 0 && ALWAYS(iCol < p->pEList->nExpr)) {
> -		pRet = sqlite3ExprCollSeq(pParse, p->pEList->a[iCol].pExpr);
> +	if (!(*found) && ALWAYS(iCol < p->pEList->nExpr)) {
> +		coll = sql_expr_coll(pParse,
> +				     p->pEList->a[iCol].pExpr,
> +				     found);
>   	}
> -	return pRet;
> +	return coll;
>   }
>   
>   /*
> @@ -2163,22 +2169,29 @@ multiSelectOrderByKeyInfo(Parse * pParse, Select * p, int nExtra)
>   		for (i = 0; i < nOrderBy; i++) {
>   			struct ExprList_item *pItem = &pOrderBy->a[i];
>   			Expr *pTerm = pItem->pExpr;
> -			struct coll *pColl;
> +			struct coll *coll;
>   
>   			if (pTerm->flags & EP_Collate) {
> -				pColl = sqlite3ExprCollSeq(pParse, pTerm);
> +				bool found; /* Not used.  */
> +				coll = sql_expr_coll(pParse, pTerm, &found);
>   			} else {
> -				pColl =
> +				bool found = false;
> +				coll =
>   				    multiSelectCollSeq(pParse, p,
> -						       pItem->u.x.iOrderByCol - 1);
> -				if (pColl == 0)
> -					pColl = sql_default_coll();
> -				pOrderBy->a[i].pExpr =
> -				    sqlite3ExprAddCollateString(pParse, pTerm,
> -								pColl->name);
> +						       pItem->u.x.iOrderByCol - 1,
> +						       &found);
> +				if (coll != NULL) {
> +					pOrderBy->a[i].pExpr =
> +						sqlite3ExprAddCollateString(pParse, pTerm,
> +									    coll->name);
> +				} else {
> +					pOrderBy->a[i].pExpr =
> +						sqlite3ExprAddCollateString(pParse, pTerm,
> +									    "BINARY");
> +				}
>   			}
>   			assert(sqlite3KeyInfoIsWriteable(pRet));
> -			pRet->aColl[i] = pColl;
> +			pRet->aColl[i] = coll;
>   			pRet->aSortOrder[i] = pOrderBy->a[i].sortOrder;
>   		}
>   	}
> @@ -2827,10 +2840,8 @@ multiSelect(Parse * pParse,	/* Parsing context */
>   			goto multi_select_end;
>   		}
>   		for (i = 0, apColl = pKeyInfo->aColl; i < nCol; i++, apColl++) {
> -			*apColl = multiSelectCollSeq(pParse, p, i);
> -			if (0 == *apColl) {
> -				*apColl = sql_default_coll();
> -			}
> +			bool found = false; /* Not used.  */
> +			*apColl = multiSelectCollSeq(pParse, p, i, &found);
>   		}
>   
>   		for (pLoop = p; pLoop; pLoop = pLoop->pPrior) {
> @@ -3260,8 +3271,9 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
>   		if (pKeyDup) {
>   			assert(sqlite3KeyInfoIsWriteable(pKeyDup));
>   			for (i = 0; i < nExpr; i++) {
> +				bool found = false; /* Not used. */
>   				pKeyDup->aColl[i] =
> -				    multiSelectCollSeq(pParse, p, i);
> +					multiSelectCollSeq(pParse, p, i, &found);
>   				pKeyDup->aSortOrder[i] = 0;
>   			}
>   		}
> @@ -5241,22 +5253,21 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
>   				     regAgg);
>   		}
>   		if (pF->pFunc->funcFlags & SQLITE_FUNC_NEEDCOLL) {
> -			struct coll *pColl = 0;
> +			struct coll *coll = NULL;
>   			struct ExprList_item *pItem;
>   			int j;
>   			assert(pList != 0);	/* pList!=0 if pF->pFunc has NEEDCOLL */
> -			for (j = 0, pItem = pList->a; !pColl && j < nArg;
> +			bool found = false;
> +			for (j = 0, pItem = pList->a; !found && j < nArg;
>   			     j++, pItem++) {
> -				pColl =
> -				    sqlite3ExprCollSeq(pParse, pItem->pExpr);
> -			}
> -			if (!pColl) {
> -				pColl = sql_default_coll();
> +				coll = sql_expr_coll(pParse,
> +						     pItem->pExpr,
> +						     &found);
>   			}
>   			if (regHit == 0 && pAggInfo->nAccumulator)
>   				regHit = ++pParse->nMem;
>   			sqlite3VdbeAddOp4(v, OP_CollSeq, regHit, 0, 0,
> -					  (char *)pColl, P4_COLLSEQ);
> +					  (char *)coll, P4_COLLSEQ);
>   		}
>   		sqlite3VdbeAddOp3(v, OP_AggStep0, 0, regAgg, pF->iMem);
>   		sqlite3VdbeAppendP4(v, pF->pFunc, P4_FUNCDEF);
> diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
> index 59662cf..3c59589 100644
> --- a/src/box/sql/sqliteInt.h
> +++ b/src/box/sql/sqliteInt.h
> @@ -1878,10 +1878,13 @@ struct Column {
>   	char *zName;		/* Name of this column */
>   	enum field_type type;	/* Column type. */
>   	Expr *pDflt;		/* Default value of this column */
> -	char *zColl;		/* Collating sequence.  If NULL, use the default */
> -	enum on_conflict_action notNull;  /* An ON_CONFLICT_ACTION code for
> -					   * handling a NOT NULL constraint
> -					   */
> +	/** Collating sequence. */
> +	struct coll *coll;
> +	/**
> +	 * An ON_CONFLICT_ACTION code for handling a NOT NULL
> +	 * constraint.
> +	 */
> +	enum on_conflict_action notNull;
>   	char affinity;		/* One of the SQLITE_AFF_... values */
>   	u8 szEst;		/* Estimated size of value in this column. sizeof(INT)==1 */
>   	u8 is_primkey;		/* Boolean propertie for being PK */
> @@ -2148,7 +2151,8 @@ struct Index {
>   	Index *pNext;		/* The next index associated with the same table */
>   	Schema *pSchema;	/* Schema containing this index */
>   	u8 *aSortOrder;		/* for each column: True==DESC, False==ASC */
> -	const char **azColl;	/* Array of collation sequence names for index */
> +	/**  Array of collation sequences for index. */
> +	struct coll **coll_array;
>   	Expr *pPartIdxWhere;	/* WHERE clause for partial indices */
>   	ExprList *aColExpr;	/* Column expressions */
>   	int tnum;		/* DB Page containing root of this index */
> @@ -3548,14 +3552,20 @@ void sqlite3AddPrimaryKey(Parse *, ExprList *, int, int, int);
>   void sqlite3AddCheckConstraint(Parse *, Expr *);
>   void sqlite3AddDefaultValue(Parse *, ExprSpan *);
>   void sqlite3AddCollateType(Parse *, Token *);
> +
>   const char *
>   column_collation_name(Table *, uint32_t);
> +struct coll *
> +sql_column_collation(Table *, uint32_t);
>   const char *
>   index_collation_name(Index *, uint32_t);
>   struct coll *
> +sql_index_collation(Index *idx, uint32_t column);
> +struct coll *
>   sql_default_coll();
>   bool
>   space_is_view(Table *);
> +
>   void sqlite3EndTable(Parse *, Token *, Token *, Select *);
>   int
>   emit_open_cursor(Parse *, int, int);
> @@ -3845,7 +3855,25 @@ const char *sqlite3ErrName(int);
>   const char *sqlite3ErrStr(int);
>   struct coll *sqlite3FindCollSeq(const char *);
>   struct coll *sqlite3LocateCollSeq(Parse * pParse, sqlite3 * db, const char *zName);
> -struct coll *sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr);
> +
> +/**
> + * Return the collation sequence for the expression pExpr. If
> + * there is no defined collating sequence, return NULL.
> + *
> + * The collating sequence might be determined by a COLLATE operator
> + * or by the presence of a column with a defined collating sequence.
> + * COLLATE operators take first precedence.  Left operands take
> + * precedence over right operands.
> + *
> + * @param parse Parsing context.
> + * @param expr Expression to scan
> + * @param found Flag set if collation was found
> + *
> + * @retval Pointer to collation.
> + */
> +struct coll *
> +sql_expr_coll(Parse * pParse, Expr * pExpr, bool *found);
> +
>   Expr *sqlite3ExprAddCollateToken(Parse * pParse, Expr *, const Token *, int);
>   Expr *sqlite3ExprAddCollateString(Parse *, Expr *, const char *);
>   Expr *sqlite3ExprSkipCollate(Expr *);
> diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
> index 8237183..321ce2f 100644
> --- a/src/box/sql/vdbe.c
> +++ b/src/box/sql/vdbe.c
> @@ -1660,7 +1660,7 @@ case OP_Remainder: {           /* same as TK_REM, in1, in2, out3 */
>    * publicly.  Only built-in functions have access to this feature.
>    */
>   case OP_CollSeq: {
> -	assert(pOp->p4type==P4_COLLSEQ);
> +	assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl == NULL);
>   	if (pOp->p1) {
>   		sqlite3VdbeMemSetInt64(&aMem[pOp->p1], 0);
>   	}
> diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
> index bb121a3..e8a0917 100644
> --- a/src/box/sql/vdbeaux.c
> +++ b/src/box/sql/vdbeaux.c
> @@ -3840,6 +3840,13 @@ sqlite3MemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pCol
>   		 */
>   		if (pColl) {
>   			return vdbeCompareMemString(pMem1, pMem2, pColl, 0);
> +		} else {
> +			size_t n = pMem1->n < pMem2->n ? pMem1->n : pMem2->n;
> +			int res;
> +			res = memcmp(pMem1->z, pMem2->z, n);
> +			if (res == 0)
> +				res = (int)pMem1->n - (int)pMem2->n;
> +			return res;
>   		}
>   		/* If a NULL pointer was passed as the collate function, fall through
>   		 * to the blob case and use memcmp().
> diff --git a/src/box/sql/vdbesort.c b/src/box/sql/vdbesort.c
> index fc10ef6..be3cc4c 100644
> --- a/src/box/sql/vdbesort.c
> +++ b/src/box/sql/vdbesort.c
> @@ -930,9 +930,7 @@ sqlite3VdbeSorterInit(sqlite3 * db,	/* Database connection (for malloc()) */
>   		}
>   
>   		if ((pKeyInfo->nField + pKeyInfo->nXField) < 13
> -		    && (pKeyInfo->aColl[0] == 0
> -			|| pKeyInfo->aColl[0] == sql_default_coll())
> -		    ) {
> +		    && (pKeyInfo->aColl[0] == NULL)) {
>   			pSorter->typeMask =
>   			    SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
>   		}
> diff --git a/src/box/sql/where.c b/src/box/sql/where.c
> index 2a26302..0b4ab47 100644
> --- a/src/box/sql/where.c
> +++ b/src/box/sql/where.c
> @@ -298,26 +298,20 @@ whereScanNext(WhereScan * pScan)
>   					if ((pTerm->eOperator & pScan->
>   					     opMask) != 0) {
>   						/* Verify the affinity and collating sequence match */
> -						if (pScan->zCollName
> -						    && (pTerm->eOperator & WO_ISNULL) == 0) {
> -							struct coll *pColl;
> -							Parse *pParse =
> -							    pWC->pWInfo->pParse;
> +						if ((pTerm->eOperator & WO_ISNULL) == 0) {
>   							pX = pTerm->pExpr;
> -							if (!sqlite3IndexAffinityOk(pX, pScan->idxaff)) {
> -								continue;
> -							}
> -							assert(pX->pLeft);
> -							pColl =
> -							    sqlite3BinaryCompareCollSeq
> -							    (pParse, pX->pLeft,
> -							     pX->pRight);
> -							if (pColl == 0)
> -								pColl =
> -									sql_default_coll();
> -							if (strcmp(pColl->name,
> -									   pScan->zCollName)) {
> +							if (!sqlite3IndexAffinityOk(pX, pScan->idxaff))
>   								continue;
> +							if (pScan->column_seen) {
> +								Parse *pParse =
> +									pWC->pWInfo->pParse;
> +								struct coll *coll;
> +								assert(pX->pLeft);
> +								coll = sqlite3BinaryCompareCollSeq
> +									(pParse, pX->pLeft,
> +									 pX->pRight);
> +								if (coll != pScan->coll)
> +									continue;
>   							}
>   						}
>   						if ((pTerm->eOperator & (WO_EQ | WO_IS)) != 0
> @@ -376,7 +370,8 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
>   	pScan->pWC = pWC;
>   	pScan->pIdxExpr = 0;
>   	pScan->idxaff = 0;
> -	pScan->zCollName = 0;
> +	pScan->coll = NULL;
> +	pScan->column_seen = false;
>   	if (pIdx) {
>   		int j = iColumn;
>   		iColumn = pIdx->aiColumn[j];
> @@ -384,7 +379,8 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
>   			pScan->pIdxExpr = pIdx->aColExpr->a[j].pExpr;
>   		} else if (iColumn >= 0) {
>   			pScan->idxaff = pIdx->pTable->aCol[iColumn].affinity;
> -			pScan->zCollName = index_collation_name(pIdx, j);
> +			pScan->coll = sql_index_collation(pIdx, j);
> +			pScan->column_seen = true;
>   		}
>   	} else if (iColumn == XN_EXPR) {
>   		return 0;
> @@ -465,16 +461,17 @@ findIndexCol(Parse * pParse,	/* Parse context */
>   	     Index * pIdx,	/* Index to match column of */
>   	     int iCol)		/* Column of index to match */
>   {
> -	int i;
> -	const char *zColl = index_collation_name(pIdx, iCol);
> -
> -	for (i = 0; i < pList->nExpr; i++) {
> +	for (int i = 0; i < pList->nExpr; i++) {
>   		Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
> -		if (p->op == TK_COLUMN && p->iColumn == pIdx->aiColumn[iCol]
> -		    && p->iTable == iBase) {
> -			struct coll *pColl =
> -			    sqlite3ExprCollSeq(pParse, pList->a[i].pExpr);
> -			if (pColl && 0 == strcmp(pColl->name, zColl)) {
> +		if (p->op == TK_COLUMN &&
> +		    p->iColumn == pIdx->aiColumn[iCol] &&
> +		    p->iTable == iBase) {
> +			bool found;
> +			struct coll *coll = sql_expr_coll(pParse,
> +							  pList->a[i].pExpr,
> +							  &found);
> +			if (found &&
> +			    coll == sql_index_collation(pIdx, iCol)) {
>   				return i;
>   			}
>   		}
> @@ -1174,7 +1171,7 @@ whereRangeSkipScanEst(Parse * pParse,		/* Parsing & code generating context */
>   	sqlite3_value *p2 = 0;	/* Value extracted from pUpper */
>   	sqlite3_value *pVal = 0;	/* Value extracted from record */
>   
> -	pColl = sqlite3LocateCollSeq(pParse, db, index_collation_name(p, nEq));
> +	pColl = sql_index_collation(p, nEq);
>   	if (pLower) {
>   		rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
>   					       aff, &p1);
> @@ -2246,8 +2243,7 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
>   		pColl = sqlite3BinaryCompareCollSeq(pParse, pLhs, pRhs);
>   		if (pColl == 0)
>   			break;
> -		const char *zCollName = index_collation_name(pIdx, i + nEq);
> -		if (zCollName && strcmp(pColl->name, zCollName))
> +	        if (sql_index_collation(pIdx, i + nEq) != pColl)
>   			break;
>   	}
>   	return i;
> @@ -3147,7 +3143,6 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
>   	WhereLoop *pLoop = 0;	/* Current WhereLoop being processed. */
>   	WhereTerm *pTerm;	/* A single term of the WHERE clause */
>   	Expr *pOBExpr;		/* An expression from the ORDER BY clause */
> -	struct coll *pColl;		/* COLLATE function from an ORDER BY clause term */
>   	Index *pIndex;		/* The index associated with pLoop */
>   	sqlite3 *db = pWInfo->pParse->db;	/* Database connection */
>   	Bitmask obSat = 0;	/* Mask of ORDER BY terms satisfied so far */
> @@ -3235,20 +3230,15 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
>   			}
>   			if ((pTerm->eOperator & (WO_EQ | WO_IS)) != 0
>   			    && pOBExpr->iColumn >= 0) {
> -				const char *z1, *z2;
> -				pColl =
> -				    sqlite3ExprCollSeq(pWInfo->pParse,
> -						       pOrderBy->a[i].pExpr);
> -				if (!pColl)
> -					pColl = sql_default_coll();
> -				z1 = pColl->name;
> -				pColl =
> -				    sqlite3ExprCollSeq(pWInfo->pParse,
> -						       pTerm->pExpr);
> -				if (!pColl)
> -					pColl = sql_default_coll();
> -				z2 = pColl->name;
> -				if (strcmp(z1, z2) != 0)
> +				struct coll *coll1, *coll2;
> +				bool found; /* Not used. */
> +				coll1 = sql_expr_coll(pWInfo->pParse,
> +						      pOrderBy->a[i].pExpr,
> +						      &found);
> +				coll2 = sql_expr_coll(pWInfo->pParse,
> +						      pTerm->pExpr,
> +						      &found);
> +				if (coll1 != coll2)
>   					continue;
>   				testcase(pTerm->pExpr->op == TK_IS);
>   			}
> @@ -3372,15 +3362,15 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
>   						}
>   					}
>   					if (iColumn >= 0) {
> -						pColl =
> -						    sqlite3ExprCollSeq(pWInfo->pParse,
> -								       pOrderBy->a[i].pExpr);
> -						if (!pColl)
> -							pColl = sql_default_coll();
> -						const char *zCollName =
> -							index_collation_name(pIndex, j);
> -						if (strcmp(pColl->name,
> -							   zCollName) != 0)
> +						bool found;
> +						struct coll *coll;
> +						coll = sql_expr_coll(pWInfo->pParse,
> +								     pOrderBy->a[i].pExpr,
> +								     &found);
> +						struct coll *idx_coll;
> +						idx_coll = sql_index_collation(pIndex,
> +									       j);
> +						if (found && coll != idx_coll)
>   							continue;
>   					}
>   					isMatch = 1;
> diff --git a/src/box/sql/whereInt.h b/src/box/sql/whereInt.h
> index 381a1d2..b85cd73 100644
> --- a/src/box/sql/whereInt.h
> +++ b/src/box/sql/whereInt.h
> @@ -293,7 +293,12 @@ struct WhereTerm {
>   struct WhereScan {
>   	WhereClause *pOrigWC;	/* Original, innermost WhereClause */
>   	WhereClause *pWC;	/* WhereClause currently being scanned */
> -	const char *zCollName;	/* Required collating sequence, if not NULL */
> +	/** Required collating sequence. */
> +	struct coll *coll;
> +	/** Explicitly specified BINARY collation. */
> +	bool cool_binary;
> +	/** Flag is set if actual column was encountered. */
> +	bool column_seen;
>   	Expr *pIdxExpr;		/* Search for this index expression */
>   	char idxaff;		/* Must match this affinity, if zCollName!=NULL */
>   	unsigned char nEquiv;	/* Number of entries in aEquiv[] */
> diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
> index ccdff46..917c2dc 100644
> --- a/src/box/sql/whereexpr.c
> +++ b/src/box/sql/whereexpr.c
> @@ -165,12 +165,19 @@ exprCommute(Parse * pParse, Expr * pExpr)
>   			 * used by clearing the EP_Collate flag from Y.
>   			 */
>   			pExpr->pRight->flags &= ~EP_Collate;
> -		} else if (sqlite3ExprCollSeq(pParse, pExpr->pLeft) != 0) {
> -			/* Neither X nor Y have COLLATE operators, but X has a non-default
> -			 * collating sequence.  So add the EP_Collate marker on X to cause
> -			 * it to be searched first.
> -			 */
> -			pExpr->pLeft->flags |= EP_Collate;
> +		} else {
> +			bool found;
> +			sql_expr_coll(pParse, pExpr->pLeft, &found);
> +			if (found) {
> +				/* Neither X nor Y have COLLATE
> +				 * operators, but X has a
> +				 * non-default collating sequence.
> +				 * So add the EP_Collate marker on
> +				 * X to cause it to be searched
> +				 * first.
> +				 */
> +				pExpr->pLeft->flags |= EP_Collate;
> +			}
>   		}
>   	}
>   	SWAP(pExpr->pRight, pExpr->pLeft);
> @@ -822,7 +829,6 @@ static int
>   termIsEquivalence(Parse * pParse, Expr * pExpr)
>   {
>   	char aff1, aff2;
> -	struct coll *pColl;
>   	const char *zColl1, *zColl2;
>   	if (!OptimizationEnabled(pParse->db, SQLITE_Transitive))
>   		return 0;
> @@ -838,14 +844,16 @@ termIsEquivalence(Parse * pParse, Expr * pExpr)
>   	    ) {
>   		return 0;
>   	}
> -	pColl =
> +	struct coll *coll;
> +	coll =
>   	    sqlite3BinaryCompareCollSeq(pParse, pExpr->pLeft, pExpr->pRight);
> -	if (pColl == 0 || sqlite3StrICmp(pColl->name, "BINARY") == 0)
> +	if (coll == NULL || sqlite3StrICmp(coll->name, "BINARY") == 0)
>   		return 1;
> -	pColl = sqlite3ExprCollSeq(pParse, pExpr->pLeft);
> -	zColl1 = pColl ? pColl->name : 0;
> -	pColl = sqlite3ExprCollSeq(pParse, pExpr->pRight);
> -	zColl2 = pColl ? pColl->name : 0;
> +	bool found;
> +	coll = sql_expr_coll(pParse, pExpr->pLeft, &found);
> +	zColl1 = coll ? coll->name : NULL;
> +	coll = sql_expr_coll(pParse, pExpr->pRight, &found);
> +	zColl2 = coll ? coll->name : NULL;
>   	return sqlite3_stricmp(zColl1, zColl2) == 0;
>   }
>   
> 




More information about the Tarantool-patches mailing list