[tarantool-patches] [PATCH] sql: use collation pointers instead of names

Kirill Yukhin kyukhin at tarantool.org
Tue May 8 10:56:07 MSK 2018


Before the change SQL FE used collation names to refer to
collations. Main data structures were using names as well.
The patch fixes that and uses explicit pointers to Tarantool's
`struct coll` during code gen and inside data structures.

Closes #3205
---
Hello Vlad,
I've fixed all your inputs, thanks!

Branch: https://github.com/tarantool/tarantool/tree/kyukhin/gh-3205-use-coll-pointers
Issue: https://github.com/tarantool/tarantool/issues/3205

 src/box/sql.c           |  21 ++------
 src/box/sql/alter.c     |   2 +-
 src/box/sql/analyze.c   |   9 +---
 src/box/sql/build.c     | 133 ++++++++++++++++++++++--------------------------
 src/box/sql/callback.c  |  53 ++-----------------
 src/box/sql/expr.c      | 129 ++++++++++++++++++++++------------------------
 src/box/sql/fkey.c      |  23 +++++----
 src/box/sql/func.c      |   3 +-
 src/box/sql/insert.c    |  12 ++---
 src/box/sql/pragma.c    |   9 +++-
 src/box/sql/select.c    | 111 ++++++++++++++++++++++------------------
 src/box/sql/sqliteInt.h |  40 ++++++++++++---
 src/box/sql/vdbe.c      |   2 +-
 src/box/sql/vdbeaux.c   |   7 +++
 src/box/sql/vdbesort.c  |   4 +-
 src/box/sql/where.c     | 102 +++++++++++++++++--------------------
 src/box/sql/whereInt.h  |   7 ++-
 src/box/sql/whereexpr.c |  34 ++++++++-----
 18 files changed, 338 insertions(+), 363 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index a6713f1..614917d 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1465,12 +1465,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)
 
 	for (i = 0; i < n; i++) {
 		const char *t;
-		struct coll *coll = NULL;
+		struct coll *coll = aCol[i].coll;
 		struct Expr *def = aCol[i].pDflt;
-		if (aCol[i].zColl != NULL &&
-		    strcasecmp(aCol[i].zColl, "binary") != 0) {
-			coll = sqlite3FindCollSeq(aCol[i].zColl);
-		}
 		int base_len = 4;
 		if (coll != NULL)
 			base_len += 1;
@@ -1571,28 +1567,19 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
 	for (i = 0; i < n; i++) {
 		int col = pIndex->aiColumn[i];
 		const char *t;
-		struct coll * collation = NULL;
 		if (pk_forced_int == col)
 			t = "integer";
 		else
 			t = convertSqliteAffinity(aCol[col].affinity, aCol[col].notNull == 0);
 		/* do not decode default collation */
-		if (sqlite3StrICmp(pIndex->azColl[i], "binary") != 0){
-			collation = sqlite3FindCollSeq(pIndex->azColl[i]);
-			/* 
-			 * At this point, the collation has already been found 
-			 * once and the assert should not fire.
-			 */
-			assert(collation);
-		}
-		p = enc->encode_map(p, collation == NULL ? 4 : 5);
+		p = enc->encode_map(p, pIndex->coll_array[i] == NULL ? 4 : 5);
 		p = enc->encode_str(p, "type", sizeof("type")-1);
 		p = enc->encode_str(p, t, strlen(t));
 		p = enc->encode_str(p, "field", sizeof("field")-1);
 		p = enc->encode_uint(p, col);
-		if (collation != NULL){
+		if (pIndex->coll_array[i] != NULL) {
 			p = enc->encode_str(p, "collation", sizeof("collation")-1);
-			p = enc->encode_uint(p, collation->id);
+			p = enc->encode_uint(p, pIndex->coll_array[i]->id);
 		}
 		p = enc->encode_str(p, "is_nullable", 11);
 		p = enc->encode_bool(p, aCol[col].notNull == ON_CONFLICT_ACTION_NONE);
diff --git a/src/box/sql/alter.c b/src/box/sql/alter.c
index 129ef82..b30a973 100644
--- a/src/box/sql/alter.c
+++ b/src/box/sql/alter.c
@@ -296,7 +296,7 @@ sqlite3AlterBeginAddColumn(Parse * pParse, SrcList * pSrc)
 	for (i = 0; i < pNew->nCol; i++) {
 		Column *pCol = &pNew->aCol[i];
 		pCol->zName = sqlite3DbStrDup(db, pCol->zName);
-		pCol->zColl = 0;
+		pCol->coll = NULL;
 		pCol->pDflt = 0;
 	}
 	pNew->pSchema = db->pSchema;
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index 665bfbc..f0054c5 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -977,18 +977,13 @@ analyzeOneTable(Parse * pParse,	/* Parser context */
 				VdbeCoverage(v);
 			}
 			for (i = 0; i < nColTest; i++) {
-				const char *zCollName =
-					index_collation_name(pIdx, i);
-				char *pColl =
-				    (char *)sqlite3LocateCollSeq(pParse,
-								 pParse->db,
-								 zCollName);
+				struct coll *coll = sql_index_collation(pIdx, i);
 				sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
 				sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
 						  pIdx->aiColumn[i], regTemp);
 				aGotoChng[i] =
 				    sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
-						      regPrev + i, pColl,
+						      regPrev + i, (char *)coll,
 						      P4_COLLSEQ);
 				sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
 				VdbeCoverage(v);
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index c6185e4..74b231a 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -300,7 +300,6 @@ sqlite3DeleteColumnNames(sqlite3 * db, Table * pTable)
 		for (i = 0; i < pTable->nCol; i++, pCol++) {
 			sqlite3DbFree(db, pCol->zName);
 			sql_expr_free(db, pCol->pDflt, false);
-			sqlite3DbFree(db, pCol->zColl);
 		}
 		sqlite3DbFree(db, pTable->aCol);
 	}
@@ -952,6 +951,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
 	Table *p;
 	int i;
 	char *zColl;		/* Dequoted name of collation sequence */
+	struct coll *coll;
 	sqlite3 *db;
 
 	if ((p = pParse->pNewTable) == 0)
@@ -962,10 +962,10 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
 	if (!zColl)
 		return;
 
-	if (sqlite3LocateCollSeq(pParse, db, zColl)) {
+	coll =  sqlite3LocateCollSeq(pParse, db, zColl);
+	if (coll != NULL) {
 		Index *pIdx;
-		sqlite3DbFree(db, p->aCol[i].zColl);
-		p->aCol[i].zColl = zColl;
+		p->aCol[i].coll = coll;
 
 		/* If the column is declared as "<name> PRIMARY KEY COLLATE <type>",
 		 * then an index may have been created on this column before the
@@ -973,9 +973,8 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
 		 */
 		for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
 			assert(pIdx->nColumn == 1);
-			if (pIdx->aiColumn[0] == i) {
-				pIdx->azColl[0] = column_collation_name(p, i);
-			}
+			if (pIdx->aiColumn[0] == i)
+				pIdx->coll_array[0] = sql_column_collation(p, i);
 		}
 	} else {
 		sqlite3DbFree(db, zColl);
@@ -983,14 +982,14 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
 }
 
 /**
- * Return name of given column collation from table.
+ * Return collation of given column from table.
  *
  * @param table Table which is used to fetch column.
  * @param column Number of column.
- * @retval Pointer to collation's name.
+ * @retval Pointer to collation.
  */
-const char *
-column_collation_name(Table *table, uint32_t column)
+struct coll *
+sql_column_collation(Table *table, uint32_t column)
 {
 	assert(table != NULL);
 	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(table->tnum);
@@ -1007,15 +1006,12 @@ column_collation_name(Table *table, uint32_t column)
 	 * In cases mentioned above collation is fetched from
 	 * SQL specific structures.
 	 */
-	if (space == NULL || space_index(space, 0) == NULL)
-		return table->aCol[column].zColl;
-
-	/* "BINARY" is a name for default collation in SQL. */
-	char *coll_name = (char *)sqlite3StrBINARY;
-	if (space->format->fields[column].coll != NULL) {
-		coll_name = space->format->fields[column].coll->name;
+	if (space == NULL || space_index(space, 0) == NULL) {
+		assert(column < (uint32_t)table->nCol);
+		return table->aCol[column].coll;
 	}
-	return coll_name;
+
+	return space->format->fields[column].coll;
 }
 
 /**
@@ -1023,30 +1019,30 @@ column_collation_name(Table *table, uint32_t column)
  *
  * @param idx Index which is used to fetch column.
  * @param column Number of column.
- * @retval Pointer to collation's name.
+ * @retval Pointer to collation.
  */
-const char *
-index_collation_name(Index *idx, uint32_t column)
+struct coll *
+sql_index_collation(Index *idx, uint32_t column)
 {
 	assert(idx != NULL);
 	uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
 	struct space *space = space_by_id(space_id);
+
+	assert(column < idx->nColumn);
 	/*
 	 * If space is still under construction, or it is
 	 * an ephemeral space, then fetch collation from
 	 * SQL internal structure.
 	 */
-	if (space == NULL)
-		return (char *)idx->azColl[column];
+	if (space == NULL) {
+		assert(column < idx->nColumn);
+		return idx->coll_array[column];
+	}
 
 	uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
 	struct index *index = space_index(space, index_id);
 	assert(index != NULL && index->def->key_def->part_count >= column);
-	struct coll *coll = index->def->key_def->parts[column].coll;
-	/* "BINARY" is a name for default collation in SQL. */
-	if (coll == NULL)
-		return (char *)sqlite3StrBINARY;
-	return index->def->key_def->parts[column].coll->name;
+	return index->def->key_def->parts[column].coll;
 }
 
 /**
@@ -1117,6 +1113,9 @@ sqlite3LocateCollSeq(Parse * pParse, sqlite3 * db, const char *zName)
 	struct coll *pColl;
 	initbusy = db->init.busy;
 
+	if (sqlite3StrICmp(zName, "binary") == 0)
+		return NULL;
+
 	pColl = sqlite3FindCollSeq(zName);
 	if (!initbusy && (!pColl)) {
 		pColl = sqlite3GetCollSeq(pParse, pColl, zName);
@@ -2623,7 +2622,7 @@ sqlite3AllocateIndexObject(sqlite3 * db,	/* Database connection */
 	p = sqlite3DbMallocZero(db, nByte + nExtra);
 	if (p) {
 		char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
-		p->azColl = (const char **)pExtra;
+		p->coll_array = (struct coll **)pExtra;
 		pExtra += ROUND8(sizeof(char *) * nCol);
 		p->aiRowLogEst = (LogEst *) pExtra;
 		pExtra += sizeof(LogEst) * (nCol + 1);
@@ -2920,7 +2919,7 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 		goto exit_create_index;
 	}
 	assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
-	assert(EIGHT_BYTE_ALIGNMENT(pIndex->azColl));
+	assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
 	pIndex->zName = zExtra;
 	zExtra += nName + 1;
 	memcpy(pIndex->zName, zName, nName + 1);
@@ -2959,7 +2958,7 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 	for (i = 0, pListItem = pList->a; i < pList->nExpr; i++, pListItem++) {
 		Expr *pCExpr;	/* The i-th index expression */
 		int requestedSortOrder;	/* ASC or DESC on the i-th expression */
-		const char *zColl;	/* Collation sequence name */
+		struct coll *coll;
 		sqlite3ResolveSelfReference(pParse, pTab, NC_IdxExpr,
 					    pListItem->pExpr, 0);
 		if (pParse->nErr)
@@ -2978,25 +2977,21 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 			}
 			pIndex->aiColumn[i] = (i16) j;
 		}
-		zColl = 0;
+		coll = NULL;
+		const char *coll_name;
 		if (pListItem->pExpr->op == TK_COLLATE) {
-			int nColl;
-			zColl = pListItem->pExpr->u.zToken;
-			nColl = sqlite3Strlen30(zColl) + 1;
-			assert(nExtra >= nColl);
-			memcpy(zExtra, zColl, nColl);
-			zColl = zExtra;
-			zExtra += nColl;
-			nExtra -= nColl;
+			coll_name = pListItem->pExpr->u.zToken;
+			coll = sqlite3GetCollSeq(pParse, 0, coll_name);
+
+			if (coll == NULL &&
+			    sqlite3StrICmp(coll_name, "binary") != 0) {
+				goto exit_create_index;
+			}
 		} else if (j >= 0) {
-			zColl = column_collation_name(pTab, j);
-		}
-		if (!zColl)
-			zColl = sqlite3StrBINARY;
-		if (!db->init.busy && !sqlite3LocateCollSeq(pParse, db, zColl)) {
-			goto exit_create_index;
+			coll = sql_column_collation(pTab, j);
 		}
-		pIndex->azColl[i] = zColl;
+		pIndex->coll_array[i] = coll;
+
 		/* Tarantool: DESC indexes are not supported so far.
 		 * See gh-3016.
 		 */
@@ -3040,14 +3035,13 @@ sqlite3CreateIndex(Parse * pParse,	/* All information about this parse */
 			if (pIdx->nColumn != pIndex->nColumn)
 				continue;
 			for (k = 0; k < pIdx->nColumn; k++) {
-				const char *z1;
-				const char *z2;
 				assert(pIdx->aiColumn[k] >= 0);
 				if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
 					break;
-				z1 = index_collation_name(pIdx, k);
-				z2 = index_collation_name(pIndex, k);
-				if (strcmp(z1, z2))
+				struct coll *coll1, *coll2;
+				coll1 = sql_index_collation(pIdx, k);
+				coll2 = sql_index_collation(pIndex, k);
+				if (coll1 != coll2)
 					break;
 			}
 			if (k == pIdx->nColumn) {
@@ -3963,19 +3957,18 @@ sqlite3UniqueConstraint(Parse * pParse,	/* Parsing context */
  * true if it does and false if it does not.
  */
 #ifndef SQLITE_OMIT_REINDEX
-static int
-collationMatch(const char *zColl, Index * pIndex)
+static bool
+collationMatch(struct coll *coll, struct Index *index)
 {
 	int i;
-	assert(zColl != 0);
-	for (i = 0; i < pIndex->nColumn; i++) {
-		const char *z = index_collation_name(pIndex, i);
-		assert(z != 0 || pIndex->aiColumn[i] < 0);
-		if (pIndex->aiColumn[i] >= 0 && 0 == sqlite3StrICmp(z, zColl)) {
-			return 1;
-		}
+	assert(coll != 0);
+	for (i = 0; i < index->nColumn; i++) {
+		struct coll *idx_coll = sql_index_collation(index, i);
+		assert(idx_coll != 0 || index->aiColumn[i] < 0);
+		if (index->aiColumn[i] >= 0 && coll == idx_coll)
+			return true;
 	}
-	return 0;
+	return false;
 }
 #endif
 
@@ -3985,12 +3978,12 @@ collationMatch(const char *zColl, Index * pIndex)
  */
 #ifndef SQLITE_OMIT_REINDEX
 static void
-reindexTable(Parse * pParse, Table * pTab, char const *zColl)
+reindexTable(Parse * pParse, Table * pTab, struct coll *coll)
 {
 	Index *pIndex;		/* An index associated with pTab */
 
 	for (pIndex = pTab->pIndex; pIndex; pIndex = pIndex->pNext) {
-		if (zColl == 0 || collationMatch(zColl, pIndex)) {
+		if (coll == 0 || collationMatch(coll, pIndex)) {
 			sql_set_multi_write(pParse, false);
 			sqlite3RefillIndex(pParse, pIndex, -1);
 		}
@@ -4005,7 +3998,7 @@ reindexTable(Parse * pParse, Table * pTab, char const *zColl)
  */
 #ifndef SQLITE_OMIT_REINDEX
 static void
-reindexDatabases(Parse * pParse, char const *zColl)
+reindexDatabases(Parse * pParse, struct coll *coll)
 {
 	sqlite3 *db = pParse->db;	/* The database connection */
 	HashElem *k;		/* For looping over tables in pSchema */
@@ -4015,7 +4008,7 @@ reindexDatabases(Parse * pParse, char const *zColl)
 	for (k = sqliteHashFirst(&db->pSchema->tblHash); k;
 	     k = sqliteHashNext(k)) {
 		pTab = (Table *) sqliteHashData(k);
-		reindexTable(pParse, pTab, zColl);
+		reindexTable(pParse, pTab, coll);
 	}
 }
 #endif
@@ -4057,7 +4050,7 @@ sqlite3Reindex(Parse * pParse, Token * pName1, Token * pName2)
 			return;
 		pColl = sqlite3FindCollSeq(zColl);
 		if (pColl) {
-			reindexDatabases(pParse, zColl);
+			reindexDatabases(pParse, pColl);
 			sqlite3DbFree(db, zColl);
 			return;
 		}
@@ -4126,9 +4119,7 @@ sqlite3KeyInfoOfIndex(Parse * pParse, sqlite3 * db, Index * pIdx)
 	if (pKey) {
 		assert(sqlite3KeyInfoIsWriteable(pKey));
 		for (i = 0; i < nCol; i++) {
-			const char *zColl = index_collation_name(pIdx, i);
-			pKey->aColl[i] = zColl == sqlite3StrBINARY ? 0 :
-			    sqlite3LocateCollSeq(pParse, db, zColl);
+			pKey->aColl[i] = sql_index_collation(pIdx, i);
 			pKey->aSortOrder[i] = pIdx->aSortOrder[i];
 		}
 		if (pParse && pParse->nErr) {
diff --git a/src/box/sql/callback.c b/src/box/sql/callback.c
index b176931..0d34e6a 100644
--- a/src/box/sql/callback.c
+++ b/src/box/sql/callback.c
@@ -62,10 +62,9 @@ sqlite3GetCollSeq(Parse * pParse,	/* Parsing context */
 	struct coll *p;
 
 	p = pColl;
-	if (!p) {
+	if (p == NULL)
 		p = sqlite3FindCollSeq(zName);
-	}
-	if (p == 0) {
+	if (p == NULL && (sqlite3StrICmp(zName, "binary") != 0)) {
 		if (pParse)
 			sqlite3ErrorMsg(pParse,
 					"no such collation sequence: %s",
@@ -102,49 +101,6 @@ sqlite3CheckCollSeq(Parse * pParse, struct coll * pColl)
 	return SQLITE_OK;
 }
 
-/*
- * This is the default collating function named "BINARY" which is always
- * available.
- * It is hardcoded to support Tarantool's collation interface.
- */
-static int
-binCollFunc(const char *pKey1, size_t nKey1, const char *pKey2, size_t nKey2, const struct coll * collation)
-{
-	int rc;
-	size_t n;
-	(void) collation;
-	n = nKey1 < nKey2 ? nKey1 : nKey2;
-	/* EVIDENCE-OF: R-65033-28449 The built-in BINARY collation compares
-	 * strings byte by byte using the memcmp() function from the standard C
-	 * library.
-	 */
-	rc = memcmp(pKey1, pKey2, n);
-	if (rc == 0) {
-		rc = (int)nKey1 - (int)nKey2;
-	}
-	return rc;
-}
-
-/*
- * This hardcoded structure created just to be called the same way
- * as collations in Tarantool, to support binary collation easily.
- */
-
-struct coll_plus_name_struct{
-    struct coll collation;
-    char name[20]; /* max of possible name lengths */
-};
-static struct coll_plus_name_struct binary_coll_with_name =
-	{{0, 0, COLL_TYPE_ICU, {0}, binCollFunc, 0, sizeof("BINARY"), {}},
-		"BINARY"};
-static struct coll * binary_coll = (struct coll*)&binary_coll_with_name;
-
-struct coll *
-sql_default_coll()
-{
-	return binary_coll;
-}
-
 /**
  * Return the coll* pointer for the collation sequence named zName.
  *
@@ -158,9 +114,8 @@ sql_default_coll()
 struct coll *
 sqlite3FindCollSeq(const char *zName)
 {
-	if (zName == NULL || sqlite3StrICmp(zName, "binary")==0){
-		return binary_coll;
-	}
+	if (zName == NULL || sqlite3StrICmp(zName, "binary")==0)
+		return 0;
 	return coll_by_name(zName, strlen(zName));
 }
 
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 8071314..9ac67fb 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -158,21 +158,13 @@ sqlite3ExprSkipCollate(Expr * pExpr)
 	return pExpr;
 }
 
-/*
- * Return the collation sequence for the expression pExpr. If
- * there is no defined collating sequence, return NULL.
- *
- * The collating sequence might be determined by a COLLATE operator
- * or by the presence of a column with a defined collating sequence.
- * COLLATE operators take first precedence.  Left operands take
- * precedence over right operands.
- */
 struct coll *
-sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
+sql_expr_coll(Parse *parse, Expr *expr, bool *found)
 {
-	struct coll *pColl = 0;
-	Expr *p = pExpr;
-	while (p) {
+	struct coll *coll = 0;
+	Expr *p = expr;
+	*found = false;
+	while (p != NULL) {
 		int op = p->op;
 		if (p->flags & EP_Generic)
 			break;
@@ -180,23 +172,22 @@ sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
 			p = p->pLeft;
 			continue;
 		}
-		if (op == TK_COLLATE
-		    || (op == TK_REGISTER && p->op2 == TK_COLLATE)) {
-			pColl =
-			    sqlite3GetCollSeq(pParse, 0, p->u.zToken);
+		if (op == TK_COLLATE ||
+		    (op == TK_REGISTER && p->op2 == TK_COLLATE)) {
+			coll = sqlite3GetCollSeq(parse, NULL, p->u.zToken);
+			*found = true;
 			break;
 		}
-		if ((op == TK_AGG_COLUMN || op == TK_COLUMN
-		     || op == TK_REGISTER || op == TK_TRIGGER)
-		    && p->pTab != 0) {
+		if ((op == TK_AGG_COLUMN || op == TK_COLUMN ||
+		     op == TK_REGISTER || op == TK_TRIGGER) &&
+		    p->pTab != 0) {
 			/* op==TK_REGISTER && p->pTab!=0 happens when pExpr was originally
 			 * a TK_COLUMN but was previously evaluated and cached in a register
 			 */
 			int j = p->iColumn;
 			if (j >= 0) {
-				const char *zColl =
-					column_collation_name(p->pTab, j);
-				pColl = sqlite3FindCollSeq(zColl);
+				coll = sql_column_collation(p->pTab, j);
+				*found = true;
 			}
 			break;
 		}
@@ -204,40 +195,37 @@ sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr)
 			if (p->pLeft && (p->pLeft->flags & EP_Collate) != 0) {
 				p = p->pLeft;
 			} else {
-				Expr *pNext = p->pRight;
+				Expr *next = p->pRight;
 				/* The Expr.x union is never used at the same time as Expr.pRight */
 				assert(p->x.pList == 0 || p->pRight == 0);
 				/* p->flags holds EP_Collate and p->pLeft->flags does not.  And
 				 * p->x.pSelect cannot.  So if p->x.pLeft exists, it must hold at
 				 * least one EP_Collate. Thus the following two ALWAYS.
 				 */
-				if (p->x.pList != 0
-				    &&
+				if (p->x.pList != 0 &&
 				    ALWAYS(!ExprHasProperty(p, EP_xIsSelect))) {
-					int i;
-					for (i = 0;
+					for (int i = 0;
 					     ALWAYS(i < p->x.pList->nExpr);
 					     i++) {
-						if (ExprHasProperty
-						    (p->x.pList->a[i].pExpr,
-						     EP_Collate)) {
-							pNext =
-							    p->x.pList->a[i].
-							    pExpr;
+						Expr *e;
+						e = p->x.pList->a[i].pExpr;
+						if (ExprHasProperty(e,
+								    EP_Collate)) {
+							next = e;
 							break;
 						}
 					}
 				}
-				p = pNext;
+				p = next;
 			}
 		} else {
 			break;
 		}
 	}
-	if (sqlite3CheckCollSeq(pParse, pColl)) {
-		pColl = 0;
-	}
-	return pColl;
+	if (sqlite3CheckCollSeq(parse, coll))
+		parse = 0;
+
+	return coll;
 }
 
 /*
@@ -344,19 +332,19 @@ binaryCompareP5(Expr * pExpr1, Expr * pExpr2, int jumpIfNull)
 struct coll *
 sqlite3BinaryCompareCollSeq(Parse * pParse, Expr * pLeft, Expr * pRight)
 {
-	struct coll *pColl;
+	struct coll *coll;
+	bool found;
 	assert(pLeft);
 	if (pLeft->flags & EP_Collate) {
-		pColl = sqlite3ExprCollSeq(pParse, pLeft);
+		coll = sql_expr_coll(pParse, pLeft, &found);
 	} else if (pRight && (pRight->flags & EP_Collate) != 0) {
-		pColl = sqlite3ExprCollSeq(pParse, pRight);
+		coll = sql_expr_coll(pParse, pRight, &found);
 	} else {
-		pColl = sqlite3ExprCollSeq(pParse, pLeft);
-		if (!pColl) {
-			pColl = sqlite3ExprCollSeq(pParse, pRight);
-		}
+		coll = sql_expr_coll(pParse, pLeft, &found);
+		if (!found)
+			coll = sql_expr_coll(pParse, pRight, &found);
 	}
-	return pColl;
+	return coll;
 }
 
 /*
@@ -2520,14 +2508,15 @@ sqlite3FindInIndex(Parse * pParse,	/* Parsing context */
 						    (pParse, pLhs, pRhs);
 					int j;
 
-					assert(pReq != 0 || pParse->nErr);
 					for (j = 0; j < nExpr; j++) {
-						if (pIdx->aiColumn[j]
-						    != pRhs->iColumn)
+						if (pIdx->aiColumn[j] !=
+						    pRhs->iColumn) {
 							continue;
-						if (pReq != 0 && strcmp
-						    (pReq->name,
-						     index_collation_name(pIdx, j)) != 0) {
+						}
+						struct coll *idx_coll;
+						idx_coll = sql_index_collation(pIdx, j);
+						if (pReq != NULL &&
+						    pReq != idx_coll) {
 							continue;
 						}
 						break;
@@ -2879,11 +2868,13 @@ sqlite3CodeSubselect(Parse * pParse,	/* Parsing context */
 					affinity = SQLITE_AFF_BLOB;
 				}
 				if (pKeyInfo) {
+					bool found; /* Not Used.  */
 					assert(sqlite3KeyInfoIsWriteable
 					       (pKeyInfo));
 					pKeyInfo->aColl[0] =
-					    sqlite3ExprCollSeq(pParse,
-							       pExpr->pLeft);
+						sql_expr_coll(pParse,
+							      pExpr->pLeft,
+							      &found);
 				}
 
 				/* Loop through each expression in <exprlist>. */
@@ -3140,8 +3131,10 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
 	 * This is step (1) in the in-operator.md optimized algorithm.
 	 */
 	if (eType == IN_INDEX_NOOP) {
+		bool found; /* Not used. */
 		ExprList *pList = pExpr->x.pList;
-		struct coll *pColl = sqlite3ExprCollSeq(pParse, pExpr->pLeft);
+		struct coll *coll = sql_expr_coll(pParse, pExpr->pLeft,
+						   &found);
 		int labelOk = sqlite3VdbeMakeLabel(v);
 		int r2, regToFree;
 		int regCkNull = 0;
@@ -3161,14 +3154,14 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
 			}
 			if (ii < pList->nExpr - 1 || destIfNull != destIfFalse) {
 				sqlite3VdbeAddOp4(v, OP_Eq, rLhs, labelOk, r2,
-						  (void *)pColl, P4_COLLSEQ);
+						  (void *)coll, P4_COLLSEQ);
 				VdbeCoverageIf(v, ii < pList->nExpr - 1);
 				VdbeCoverageIf(v, ii == pList->nExpr - 1);
 				sqlite3VdbeChangeP5(v, zAff[0]);
 			} else {
 				assert(destIfNull == destIfFalse);
 				sqlite3VdbeAddOp4(v, OP_Ne, rLhs, destIfFalse,
-						  r2, (void *)pColl,
+						  r2, (void *)coll,
 						  P4_COLLSEQ);
 				VdbeCoverage(v);
 				sqlite3VdbeChangeP5(v,
@@ -3277,9 +3270,10 @@ sqlite3ExprCodeIN(Parse * pParse,	/* Parsing and code generating context */
 	for (i = 0; i < nVector; i++) {
 		Expr *p;
 		struct coll *pColl;
+		bool found;
 		int r3 = sqlite3GetTempReg(pParse);
 		p = sqlite3VectorFieldSubexpr(pLeft, i);
-		pColl = sqlite3ExprCollSeq(pParse, p);
+		pColl = sql_expr_coll(pParse, p, &found);
 		/* Tarantool: Replace i -> aiMap [i], since original order of columns
 		 * is preserved.
 		 */
@@ -4066,7 +4060,7 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
 			u32 constMask = 0;	/* Mask of function arguments that are constant */
 			int i;	/* Loop counter */
 			sqlite3 *db = pParse->db;	/* The database connection */
-			struct coll *pColl = 0;	/* A collating sequence */
+			struct coll *coll = 0;	/* A collating sequence */
 
 			assert(!ExprHasProperty(pExpr, EP_xIsSelect));
 			if (ExprHasProperty(pExpr, EP_TokenOnly)) {
@@ -4133,11 +4127,12 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
 					constMask |= MASKBIT32(i);
 				}
 				if ((pDef->funcFlags & SQLITE_FUNC_NEEDCOLL) !=
-				    0 && !pColl) {
-					pColl =
-					    sqlite3ExprCollSeq(pParse,
-							       pFarg->a[i].
-							       pExpr);
+				    0 && coll == NULL) {
+					bool found; /* Not used.  */
+					coll = sql_expr_coll(pParse,
+							      pFarg->a[i].
+							      pExpr,
+							      &found);
 				}
 			}
 			if (pFarg) {
@@ -4186,10 +4181,8 @@ sqlite3ExprCodeTarget(Parse * pParse, Expr * pExpr, int target)
 				r1 = 0;
 			}
 			if (pDef->funcFlags & SQLITE_FUNC_NEEDCOLL) {
-				if (!pColl)
-					pColl = sql_default_coll();
 				sqlite3VdbeAddOp4(v, OP_CollSeq, 0, 0, 0,
-						  (char *)pColl, P4_COLLSEQ);
+						  (char *)coll, P4_COLLSEQ);
 			}
 			sqlite3VdbeAddOp4(v, OP_Function0, constMask, r1,
 					  target, (char *)pDef, P4_FUNCDEF);
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index f56b6d9..a0a35e0 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -287,7 +287,6 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
 				int i, j;
 				for (i = 0; i < nCol; i++) {
 					i16 iCol = pIdx->aiColumn[i];	/* Index of column in parent tbl */
-					const char *zDfltColl;	/* Def. collation for column */
 					char *zIdxCol;	/* Name of indexed column */
 
 					if (iCol < 0)
@@ -297,11 +296,12 @@ sqlite3FkLocateIndex(Parse * pParse,	/* Parse context to store any error in */
 					 * the default collation sequence for the column, this index is
 					 * unusable. Bail out early in this case.
 					 */
-					zDfltColl =
-						column_collation_name(pParent,
-								      iCol);
-					if (strcmp
-					    (index_collation_name(pIdx, i), zDfltColl))
+					struct coll *def_coll;
+					def_coll = sql_column_collation(pParent,
+									iCol);
+					struct coll *coll;
+					coll = sql_index_collation(pIdx, i);
+					if (def_coll != coll)
 						break;
 
 					zIdxCol = pParent->aCol[iCol].zName;
@@ -526,7 +526,6 @@ exprTableRegister(Parse * pParse,	/* Parsing and code generating context */
 {
 	Expr *pExpr;
 	Column *pCol;
-	const char *zColl;
 	sqlite3 *db = pParse->db;
 
 	pExpr = sqlite3Expr(db, TK_REGISTER, 0);
@@ -535,9 +534,13 @@ exprTableRegister(Parse * pParse,	/* Parsing and code generating context */
 			pCol = &pTab->aCol[iCol];
 			pExpr->iTable = regBase + iCol + 1;
 			pExpr->affinity = pCol->affinity;
-			zColl = column_collation_name(pTab, iCol);
-			pExpr =
-			    sqlite3ExprAddCollateString(pParse, pExpr, zColl);
+			const char *coll_name;
+			if (pCol->coll == NULL && pCol->coll != NULL)
+				coll_name = pCol->coll->name;
+			else
+				coll_name = "binary";
+			pExpr = sqlite3ExprAddCollateString(pParse, pExpr,
+							    coll_name);
 		} else {
 			pExpr->iTable = regBase;
 			pExpr->affinity = SQLITE_AFF_INTEGER;
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index 47b45de..dcac22c 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -54,7 +54,7 @@ sqlite3GetFuncCollSeq(sqlite3_context * context)
 	assert(context->pVdbe != 0);
 	pOp = &context->pVdbe->aOp[context->iOp - 1];
 	assert(pOp->opcode == OP_CollSeq);
-	assert(pOp->p4type == P4_COLLSEQ);
+	assert(pOp->p4type == P4_COLLSEQ || pOp->p4.pColl == NULL);
 	return pOp->p4.pColl;
 }
 
@@ -82,7 +82,6 @@ minmaxFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
 	assert(argc > 1);
 	mask = sqlite3_user_data(context) == 0 ? 0 : -1;
 	pColl = sqlite3GetFuncCollSeq(context);
-	assert(pColl);
 	assert(mask == -1 || mask == 0);
 	iBest = 0;
 	if (sqlite3_value_type(argv[0]) == SQLITE_NULL)
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index ae8dafb..f04496a 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -1419,9 +1419,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,		/* The parser context */
 					      regIdx : regR);
 
 				for (i = 0; i < nPkCol; i++) {
-					char *p4 = (char *)
-						sqlite3LocateCollSeq(pParse, db,
-								     index_collation_name(pPk, i));
+					char *p4 = (char *)sql_index_collation(pPk, i);
 					x = pPk->aiColumn[i];
 					assert(x >= 0);
 					if (i == (nPkCol - 1)) {
@@ -1666,8 +1664,8 @@ xferCompatibleIndex(Index * pDest, Index * pSrc)
 		if (pSrc->aSortOrder[i] != pDest->aSortOrder[i]) {
 			return 0;	/* Different sort orders */
 		}
-		if (strcasecmp(index_collation_name(pSrc, i),
-			       index_collation_name(pDest, i)) != 0) {
+		if (sql_index_collation(pSrc, i) !=
+		    sql_index_collation(pDest, i)) {
 			return 0;	/* Different collating sequences */
 		}
 	}
@@ -1806,8 +1804,8 @@ xferOptimization(Parse * pParse,	/* Parser context */
 		if (pDestCol->affinity != pSrcCol->affinity) {
 			return 0;	/* Affinity must be the same on all columns */
 		}
-		if (strcasecmp(column_collation_name(pDest, i),
-			       column_collation_name(pSrc, i)) != 0) {
+		if (sql_column_collation(pDest, i) !=
+		    sql_column_collation(pSrc, i)) {
 			return 0;	/* Collating sequence must be the same on all columns */
 		}
 		if (!table_column_is_nullable(pDest, i)
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index b724c98..a2a6391 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -452,13 +452,20 @@ sqlite3Pragma(Parse * pParse, Token * pId,	/* First part of [schema.]id field */
 								     aCol[cnum].
 								     zName);
 						if (pPragma->iArg) {
+							const char *c_n;
+							struct coll *coll;
+							coll = sql_index_collation(pIdx, i);
+							if (coll != NULL)
+								c_n = coll->name;
+							else
+								c_n = "BINARY";
 							sqlite3VdbeMultiLoad(v,
 									     4,
 									     "isi",
 									     pIdx->
 									     aSortOrder
 									     [i],
-									     index_collation_name(pIdx, i),
+									     c_n,
 									     i <
 									     mx);
 						}
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index d97e466..5a39435 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -908,10 +908,12 @@ selectInnerLoop(Parse * pParse,		/* The parser context */
 
 				iJump = sqlite3VdbeCurrentAddr(v) + nResultCol;
 				for (i = 0; i < nResultCol; i++) {
-					struct coll *pColl =
-					    sqlite3ExprCollSeq(pParse,
-							       pEList->a[i].
-							       pExpr);
+					bool found;
+					struct coll *coll =
+					    sql_expr_coll(pParse,
+							  pEList->a[i].
+							  pExpr,
+							  &found);
 					if (i < nResultCol - 1) {
 						sqlite3VdbeAddOp3(v, OP_Ne,
 								  regResult + i,
@@ -925,9 +927,11 @@ selectInnerLoop(Parse * pParse,		/* The parser context */
 								  regPrev + i);
 						VdbeCoverage(v);
 					}
-					sqlite3VdbeChangeP4(v, -1,
-							    (const char *)pColl,
-							    P4_COLLSEQ);
+					if (found) {
+						sqlite3VdbeChangeP4(v, -1,
+								    (const char *)coll,
+								    P4_COLLSEQ);
+					}
 					sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
 				}
 				assert(sqlite3VdbeCurrentAddr(v) == iJump
@@ -1288,11 +1292,12 @@ keyInfoFromExprList(Parse * pParse,	/* Parsing context */
 		assert(sqlite3KeyInfoIsWriteable(pInfo));
 		for (i = iStart, pItem = pList->a + iStart; i < nExpr;
 		     i++, pItem++) {
-			struct coll *pColl;
-			pColl = sqlite3ExprCollSeq(pParse, pItem->pExpr);
-			if (!pColl)
-				pColl = sql_default_coll();
-			pInfo->aColl[i - iStart] = pColl;
+			bool found; /* Not used.  */
+			struct coll *coll;
+			coll = sql_expr_coll(pParse,
+					     pItem->pExpr,
+					     &found);
+			pInfo->aColl[i - iStart] = coll;
 			pInfo->aSortOrder[i - iStart] = pItem->sortOrder;
 		}
 	}
@@ -1935,7 +1940,6 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
 	sqlite3 *db = pParse->db;
 	NameContext sNC;
 	Column *pCol;
-	struct coll *pColl;
 	int i;
 	Expr *p;
 	struct ExprList_item *a;
@@ -1950,6 +1954,7 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
 	sNC.pSrcList = pSelect->pSrc;
 	a = pSelect->pEList->a;
 	for (i = 0, pCol = pTab->aCol; i < pTab->nCol; i++, pCol++) {
+		struct coll *coll;
 		enum field_type type;
 		p = a[i].pExpr;
 		type = columnType(&sNC, p, 0, 0, 0, &pCol->szEst);
@@ -1959,10 +1964,10 @@ sqlite3SelectAddColumnTypeAndCollation(Parse * pParse,		/* Parsing contexts */
 
 		if (pCol->affinity == 0)
 			pCol->affinity = SQLITE_AFF_BLOB;
-		pColl = sqlite3ExprCollSeq(pParse, p);
-		if (pColl && pCol->zColl == 0) {
-			pCol->zColl = sqlite3DbStrDup(db, pColl->name);
-		}
+		bool found;
+		coll = sql_expr_coll(pParse, p, &found);
+		if (coll && pCol->coll == 0)
+			pCol->coll = coll;
 	}
 	pTab->szTabRow = sqlite3LogEst(szAll * 4);
 }
@@ -2123,23 +2128,24 @@ computeLimitRegisters(Parse * pParse, Select * p, int iBreak)
  * left-most term of the select that has a collating sequence.
  */
 static struct coll *
-multiSelectCollSeq(Parse * pParse, Select * p, int iCol)
+multiSelectCollSeq(Parse * pParse, Select * p, int iCol, bool *found)
 {
-	struct coll *pRet;
-	if (p->pPrior) {
-		pRet = multiSelectCollSeq(pParse, p->pPrior, iCol);
-	} else {
-		pRet = 0;
-	}
+	struct coll *coll;
+	if (p->pPrior)
+		coll = multiSelectCollSeq(pParse, p->pPrior, iCol, found);
+	else
+		coll = NULL;
 	assert(iCol >= 0);
 	/* iCol must be less than p->pEList->nExpr.  Otherwise an error would
 	 * have been thrown during name resolution and we would not have gotten
 	 * this far
 	 */
-	if (pRet == 0 && ALWAYS(iCol < p->pEList->nExpr)) {
-		pRet = sqlite3ExprCollSeq(pParse, p->pEList->a[iCol].pExpr);
+	if (!(*found) && ALWAYS(iCol < p->pEList->nExpr)) {
+		coll = sql_expr_coll(pParse,
+				     p->pEList->a[iCol].pExpr,
+				     found);
 	}
-	return pRet;
+	return coll;
 }
 
 /*
@@ -2163,22 +2169,29 @@ multiSelectOrderByKeyInfo(Parse * pParse, Select * p, int nExtra)
 		for (i = 0; i < nOrderBy; i++) {
 			struct ExprList_item *pItem = &pOrderBy->a[i];
 			Expr *pTerm = pItem->pExpr;
-			struct coll *pColl;
+			struct coll *coll;
 
 			if (pTerm->flags & EP_Collate) {
-				pColl = sqlite3ExprCollSeq(pParse, pTerm);
+				bool found; /* Not used.  */
+				coll = sql_expr_coll(pParse, pTerm, &found);
 			} else {
-				pColl =
+				bool found = false;
+				coll =
 				    multiSelectCollSeq(pParse, p,
-						       pItem->u.x.iOrderByCol - 1);
-				if (pColl == 0)
-					pColl = sql_default_coll();
-				pOrderBy->a[i].pExpr =
-				    sqlite3ExprAddCollateString(pParse, pTerm,
-								pColl->name);
+						       pItem->u.x.iOrderByCol - 1,
+						       &found);
+				if (coll != NULL) {
+					pOrderBy->a[i].pExpr =
+						sqlite3ExprAddCollateString(pParse, pTerm,
+									    coll->name);
+				} else {
+					pOrderBy->a[i].pExpr =
+						sqlite3ExprAddCollateString(pParse, pTerm,
+									    "BINARY");
+				}
 			}
 			assert(sqlite3KeyInfoIsWriteable(pRet));
-			pRet->aColl[i] = pColl;
+			pRet->aColl[i] = coll;
 			pRet->aSortOrder[i] = pOrderBy->a[i].sortOrder;
 		}
 	}
@@ -2827,10 +2840,8 @@ multiSelect(Parse * pParse,	/* Parsing context */
 			goto multi_select_end;
 		}
 		for (i = 0, apColl = pKeyInfo->aColl; i < nCol; i++, apColl++) {
-			*apColl = multiSelectCollSeq(pParse, p, i);
-			if (0 == *apColl) {
-				*apColl = sql_default_coll();
-			}
+			bool found = false; /* Not used.  */
+			*apColl = multiSelectCollSeq(pParse, p, i, &found);
 		}
 
 		for (pLoop = p; pLoop; pLoop = pLoop->pPrior) {
@@ -3260,8 +3271,9 @@ multiSelectOrderBy(Parse * pParse,	/* Parsing context */
 		if (pKeyDup) {
 			assert(sqlite3KeyInfoIsWriteable(pKeyDup));
 			for (i = 0; i < nExpr; i++) {
+				bool found = false; /* Not used. */
 				pKeyDup->aColl[i] =
-				    multiSelectCollSeq(pParse, p, i);
+					multiSelectCollSeq(pParse, p, i, &found);
 				pKeyDup->aSortOrder[i] = 0;
 			}
 		}
@@ -5241,22 +5253,21 @@ updateAccumulator(Parse * pParse, AggInfo * pAggInfo)
 				     regAgg);
 		}
 		if (pF->pFunc->funcFlags & SQLITE_FUNC_NEEDCOLL) {
-			struct coll *pColl = 0;
+			struct coll *coll = NULL;
 			struct ExprList_item *pItem;
 			int j;
 			assert(pList != 0);	/* pList!=0 if pF->pFunc has NEEDCOLL */
-			for (j = 0, pItem = pList->a; !pColl && j < nArg;
+			bool found = false;
+			for (j = 0, pItem = pList->a; !found && j < nArg;
 			     j++, pItem++) {
-				pColl =
-				    sqlite3ExprCollSeq(pParse, pItem->pExpr);
-			}
-			if (!pColl) {
-				pColl = sql_default_coll();
+				coll = sql_expr_coll(pParse,
+						     pItem->pExpr,
+						     &found);
 			}
 			if (regHit == 0 && pAggInfo->nAccumulator)
 				regHit = ++pParse->nMem;
 			sqlite3VdbeAddOp4(v, OP_CollSeq, regHit, 0, 0,
-					  (char *)pColl, P4_COLLSEQ);
+					  (char *)coll, P4_COLLSEQ);
 		}
 		sqlite3VdbeAddOp3(v, OP_AggStep0, 0, regAgg, pF->iMem);
 		sqlite3VdbeAppendP4(v, pF->pFunc, P4_FUNCDEF);
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 59662cf..3c59589 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -1878,10 +1878,13 @@ struct Column {
 	char *zName;		/* Name of this column */
 	enum field_type type;	/* Column type. */
 	Expr *pDflt;		/* Default value of this column */
-	char *zColl;		/* Collating sequence.  If NULL, use the default */
-	enum on_conflict_action notNull;  /* An ON_CONFLICT_ACTION code for
-					   * handling a NOT NULL constraint
-					   */
+	/** Collating sequence. */
+	struct coll *coll;
+	/**
+	 * An ON_CONFLICT_ACTION code for handling a NOT NULL
+	 * constraint.
+	 */
+	enum on_conflict_action notNull;
 	char affinity;		/* One of the SQLITE_AFF_... values */
 	u8 szEst;		/* Estimated size of value in this column. sizeof(INT)==1 */
 	u8 is_primkey;		/* Boolean propertie for being PK */
@@ -2148,7 +2151,8 @@ struct Index {
 	Index *pNext;		/* The next index associated with the same table */
 	Schema *pSchema;	/* Schema containing this index */
 	u8 *aSortOrder;		/* for each column: True==DESC, False==ASC */
-	const char **azColl;	/* Array of collation sequence names for index */
+	/**  Array of collation sequences for index. */
+	struct coll **coll_array;
 	Expr *pPartIdxWhere;	/* WHERE clause for partial indices */
 	ExprList *aColExpr;	/* Column expressions */
 	int tnum;		/* DB Page containing root of this index */
@@ -3548,14 +3552,20 @@ void sqlite3AddPrimaryKey(Parse *, ExprList *, int, int, int);
 void sqlite3AddCheckConstraint(Parse *, Expr *);
 void sqlite3AddDefaultValue(Parse *, ExprSpan *);
 void sqlite3AddCollateType(Parse *, Token *);
+
 const char *
 column_collation_name(Table *, uint32_t);
+struct coll *
+sql_column_collation(Table *, uint32_t);
 const char *
 index_collation_name(Index *, uint32_t);
 struct coll *
+sql_index_collation(Index *idx, uint32_t column);
+struct coll *
 sql_default_coll();
 bool
 space_is_view(Table *);
+
 void sqlite3EndTable(Parse *, Token *, Token *, Select *);
 int
 emit_open_cursor(Parse *, int, int);
@@ -3845,7 +3855,25 @@ const char *sqlite3ErrName(int);
 const char *sqlite3ErrStr(int);
 struct coll *sqlite3FindCollSeq(const char *);
 struct coll *sqlite3LocateCollSeq(Parse * pParse, sqlite3 * db, const char *zName);
-struct coll *sqlite3ExprCollSeq(Parse * pParse, Expr * pExpr);
+
+/**
+ * Return the collation sequence for the expression pExpr. If
+ * there is no defined collating sequence, return NULL.
+ *
+ * The collating sequence might be determined by a COLLATE operator
+ * or by the presence of a column with a defined collating sequence.
+ * COLLATE operators take first precedence.  Left operands take
+ * precedence over right operands.
+ *
+ * @param parse Parsing context.
+ * @param expr Expression to scan
+ * @param found Flag set if collation was found
+ *
+ * @retval Pointer to collation.
+ */
+struct coll *
+sql_expr_coll(Parse * pParse, Expr * pExpr, bool *found);
+
 Expr *sqlite3ExprAddCollateToken(Parse * pParse, Expr *, const Token *, int);
 Expr *sqlite3ExprAddCollateString(Parse *, Expr *, const char *);
 Expr *sqlite3ExprSkipCollate(Expr *);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 8237183..321ce2f 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1660,7 +1660,7 @@ case OP_Remainder: {           /* same as TK_REM, in1, in2, out3 */
  * publicly.  Only built-in functions have access to this feature.
  */
 case OP_CollSeq: {
-	assert(pOp->p4type==P4_COLLSEQ);
+	assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl == NULL);
 	if (pOp->p1) {
 		sqlite3VdbeMemSetInt64(&aMem[pOp->p1], 0);
 	}
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index bb121a3..e8a0917 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -3840,6 +3840,13 @@ sqlite3MemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pCol
 		 */
 		if (pColl) {
 			return vdbeCompareMemString(pMem1, pMem2, pColl, 0);
+		} else {
+			size_t n = pMem1->n < pMem2->n ? pMem1->n : pMem2->n;
+			int res;
+			res = memcmp(pMem1->z, pMem2->z, n);
+			if (res == 0)
+				res = (int)pMem1->n - (int)pMem2->n;
+			return res;
 		}
 		/* If a NULL pointer was passed as the collate function, fall through
 		 * to the blob case and use memcmp().
diff --git a/src/box/sql/vdbesort.c b/src/box/sql/vdbesort.c
index fc10ef6..be3cc4c 100644
--- a/src/box/sql/vdbesort.c
+++ b/src/box/sql/vdbesort.c
@@ -930,9 +930,7 @@ sqlite3VdbeSorterInit(sqlite3 * db,	/* Database connection (for malloc()) */
 		}
 
 		if ((pKeyInfo->nField + pKeyInfo->nXField) < 13
-		    && (pKeyInfo->aColl[0] == 0
-			|| pKeyInfo->aColl[0] == sql_default_coll())
-		    ) {
+		    && (pKeyInfo->aColl[0] == NULL)) {
 			pSorter->typeMask =
 			    SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
 		}
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 2a26302..0b4ab47 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -298,26 +298,20 @@ whereScanNext(WhereScan * pScan)
 					if ((pTerm->eOperator & pScan->
 					     opMask) != 0) {
 						/* Verify the affinity and collating sequence match */
-						if (pScan->zCollName
-						    && (pTerm->eOperator & WO_ISNULL) == 0) {
-							struct coll *pColl;
-							Parse *pParse =
-							    pWC->pWInfo->pParse;
+						if ((pTerm->eOperator & WO_ISNULL) == 0) {
 							pX = pTerm->pExpr;
-							if (!sqlite3IndexAffinityOk(pX, pScan->idxaff)) {
-								continue;
-							}
-							assert(pX->pLeft);
-							pColl =
-							    sqlite3BinaryCompareCollSeq
-							    (pParse, pX->pLeft,
-							     pX->pRight);
-							if (pColl == 0)
-								pColl =
-									sql_default_coll();
-							if (strcmp(pColl->name,
-									   pScan->zCollName)) {
+							if (!sqlite3IndexAffinityOk(pX, pScan->idxaff))
 								continue;
+							if (pScan->column_seen) {
+								Parse *pParse =
+									pWC->pWInfo->pParse;
+								struct coll *coll;
+								assert(pX->pLeft);
+								coll = sqlite3BinaryCompareCollSeq
+									(pParse, pX->pLeft,
+									 pX->pRight);
+								if (coll != pScan->coll)
+									continue;
 							}
 						}
 						if ((pTerm->eOperator & (WO_EQ | WO_IS)) != 0
@@ -376,7 +370,8 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
 	pScan->pWC = pWC;
 	pScan->pIdxExpr = 0;
 	pScan->idxaff = 0;
-	pScan->zCollName = 0;
+	pScan->coll = NULL;
+	pScan->column_seen = false;
 	if (pIdx) {
 		int j = iColumn;
 		iColumn = pIdx->aiColumn[j];
@@ -384,7 +379,8 @@ whereScanInit(WhereScan * pScan,	/* The WhereScan object being initialized */
 			pScan->pIdxExpr = pIdx->aColExpr->a[j].pExpr;
 		} else if (iColumn >= 0) {
 			pScan->idxaff = pIdx->pTable->aCol[iColumn].affinity;
-			pScan->zCollName = index_collation_name(pIdx, j);
+			pScan->coll = sql_index_collation(pIdx, j);
+			pScan->column_seen = true;
 		}
 	} else if (iColumn == XN_EXPR) {
 		return 0;
@@ -465,16 +461,17 @@ findIndexCol(Parse * pParse,	/* Parse context */
 	     Index * pIdx,	/* Index to match column of */
 	     int iCol)		/* Column of index to match */
 {
-	int i;
-	const char *zColl = index_collation_name(pIdx, iCol);
-
-	for (i = 0; i < pList->nExpr; i++) {
+	for (int i = 0; i < pList->nExpr; i++) {
 		Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
-		if (p->op == TK_COLUMN && p->iColumn == pIdx->aiColumn[iCol]
-		    && p->iTable == iBase) {
-			struct coll *pColl =
-			    sqlite3ExprCollSeq(pParse, pList->a[i].pExpr);
-			if (pColl && 0 == strcmp(pColl->name, zColl)) {
+		if (p->op == TK_COLUMN &&
+		    p->iColumn == pIdx->aiColumn[iCol] &&
+		    p->iTable == iBase) {
+			bool found;
+			struct coll *coll = sql_expr_coll(pParse,
+							  pList->a[i].pExpr,
+							  &found);
+			if (found &&
+			    coll == sql_index_collation(pIdx, iCol)) {
 				return i;
 			}
 		}
@@ -1174,7 +1171,7 @@ whereRangeSkipScanEst(Parse * pParse,		/* Parsing & code generating context */
 	sqlite3_value *p2 = 0;	/* Value extracted from pUpper */
 	sqlite3_value *pVal = 0;	/* Value extracted from record */
 
-	pColl = sqlite3LocateCollSeq(pParse, db, index_collation_name(p, nEq));
+	pColl = sql_index_collation(p, nEq);
 	if (pLower) {
 		rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
 					       aff, &p1);
@@ -2246,8 +2243,7 @@ whereRangeVectorLen(Parse * pParse,	/* Parsing context */
 		pColl = sqlite3BinaryCompareCollSeq(pParse, pLhs, pRhs);
 		if (pColl == 0)
 			break;
-		const char *zCollName = index_collation_name(pIdx, i + nEq);
-		if (zCollName && strcmp(pColl->name, zCollName))
+	        if (sql_index_collation(pIdx, i + nEq) != pColl)
 			break;
 	}
 	return i;
@@ -3147,7 +3143,6 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 	WhereLoop *pLoop = 0;	/* Current WhereLoop being processed. */
 	WhereTerm *pTerm;	/* A single term of the WHERE clause */
 	Expr *pOBExpr;		/* An expression from the ORDER BY clause */
-	struct coll *pColl;		/* COLLATE function from an ORDER BY clause term */
 	Index *pIndex;		/* The index associated with pLoop */
 	sqlite3 *db = pWInfo->pParse->db;	/* Database connection */
 	Bitmask obSat = 0;	/* Mask of ORDER BY terms satisfied so far */
@@ -3235,20 +3230,15 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 			}
 			if ((pTerm->eOperator & (WO_EQ | WO_IS)) != 0
 			    && pOBExpr->iColumn >= 0) {
-				const char *z1, *z2;
-				pColl =
-				    sqlite3ExprCollSeq(pWInfo->pParse,
-						       pOrderBy->a[i].pExpr);
-				if (!pColl)
-					pColl = sql_default_coll();
-				z1 = pColl->name;
-				pColl =
-				    sqlite3ExprCollSeq(pWInfo->pParse,
-						       pTerm->pExpr);
-				if (!pColl)
-					pColl = sql_default_coll();
-				z2 = pColl->name;
-				if (strcmp(z1, z2) != 0)
+				struct coll *coll1, *coll2;
+				bool found; /* Not used. */
+				coll1 = sql_expr_coll(pWInfo->pParse,
+						      pOrderBy->a[i].pExpr,
+						      &found);
+				coll2 = sql_expr_coll(pWInfo->pParse,
+						      pTerm->pExpr,
+						      &found);
+				if (coll1 != coll2)
 					continue;
 				testcase(pTerm->pExpr->op == TK_IS);
 			}
@@ -3372,15 +3362,15 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,	/* The WHERE clause */
 						}
 					}
 					if (iColumn >= 0) {
-						pColl =
-						    sqlite3ExprCollSeq(pWInfo->pParse,
-								       pOrderBy->a[i].pExpr);
-						if (!pColl)
-							pColl = sql_default_coll();
-						const char *zCollName =
-							index_collation_name(pIndex, j);
-						if (strcmp(pColl->name,
-							   zCollName) != 0)
+						bool found;
+						struct coll *coll;
+						coll = sql_expr_coll(pWInfo->pParse,
+								     pOrderBy->a[i].pExpr,
+								     &found);
+						struct coll *idx_coll;
+						idx_coll = sql_index_collation(pIndex,
+									       j);
+						if (found && coll != idx_coll)
 							continue;
 					}
 					isMatch = 1;
diff --git a/src/box/sql/whereInt.h b/src/box/sql/whereInt.h
index 381a1d2..b85cd73 100644
--- a/src/box/sql/whereInt.h
+++ b/src/box/sql/whereInt.h
@@ -293,7 +293,12 @@ struct WhereTerm {
 struct WhereScan {
 	WhereClause *pOrigWC;	/* Original, innermost WhereClause */
 	WhereClause *pWC;	/* WhereClause currently being scanned */
-	const char *zCollName;	/* Required collating sequence, if not NULL */
+	/** Required collating sequence. */
+	struct coll *coll;
+	/** Explicitly specified BINARY collation. */
+	bool cool_binary;
+	/** Flag is set if actual column was encountered. */
+	bool column_seen;
 	Expr *pIdxExpr;		/* Search for this index expression */
 	char idxaff;		/* Must match this affinity, if zCollName!=NULL */
 	unsigned char nEquiv;	/* Number of entries in aEquiv[] */
diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
index ccdff46..917c2dc 100644
--- a/src/box/sql/whereexpr.c
+++ b/src/box/sql/whereexpr.c
@@ -165,12 +165,19 @@ exprCommute(Parse * pParse, Expr * pExpr)
 			 * used by clearing the EP_Collate flag from Y.
 			 */
 			pExpr->pRight->flags &= ~EP_Collate;
-		} else if (sqlite3ExprCollSeq(pParse, pExpr->pLeft) != 0) {
-			/* Neither X nor Y have COLLATE operators, but X has a non-default
-			 * collating sequence.  So add the EP_Collate marker on X to cause
-			 * it to be searched first.
-			 */
-			pExpr->pLeft->flags |= EP_Collate;
+		} else {
+			bool found;
+			sql_expr_coll(pParse, pExpr->pLeft, &found);
+			if (found) {
+				/* Neither X nor Y have COLLATE
+				 * operators, but X has a
+				 * non-default collating sequence.
+				 * So add the EP_Collate marker on
+				 * X to cause it to be searched
+				 * first.
+				 */
+				pExpr->pLeft->flags |= EP_Collate;
+			}
 		}
 	}
 	SWAP(pExpr->pRight, pExpr->pLeft);
@@ -822,7 +829,6 @@ static int
 termIsEquivalence(Parse * pParse, Expr * pExpr)
 {
 	char aff1, aff2;
-	struct coll *pColl;
 	const char *zColl1, *zColl2;
 	if (!OptimizationEnabled(pParse->db, SQLITE_Transitive))
 		return 0;
@@ -838,14 +844,16 @@ termIsEquivalence(Parse * pParse, Expr * pExpr)
 	    ) {
 		return 0;
 	}
-	pColl =
+	struct coll *coll;
+	coll =
 	    sqlite3BinaryCompareCollSeq(pParse, pExpr->pLeft, pExpr->pRight);
-	if (pColl == 0 || sqlite3StrICmp(pColl->name, "BINARY") == 0)
+	if (coll == NULL || sqlite3StrICmp(coll->name, "BINARY") == 0)
 		return 1;
-	pColl = sqlite3ExprCollSeq(pParse, pExpr->pLeft);
-	zColl1 = pColl ? pColl->name : 0;
-	pColl = sqlite3ExprCollSeq(pParse, pExpr->pRight);
-	zColl2 = pColl ? pColl->name : 0;
+	bool found;
+	coll = sql_expr_coll(pParse, pExpr->pLeft, &found);
+	zColl1 = coll ? coll->name : NULL;
+	coll = sql_expr_coll(pParse, pExpr->pRight, &found);
+	zColl2 = coll ? coll->name : NULL;
 	return sqlite3_stricmp(zColl1, zColl2) == 0;
 }
 
-- 
2.11.0





More information about the Tarantool-patches mailing list