[patches] [PATCH v2 2/2] sql: like func: fix index optimization, add uncode supprot

AKhatskevich avkhatskevich at tarantool.org
Tue Feb 6 15:53:14 MSK 2018


A like pattern of the form "x LIKE 'aBc%'" is optimized like
`x>='ABC' AND x<'abd' AND x LIKE 'aBc%'` if there is a "unicode_ci" index.
Other indexes are not supported yet.

This commit also implements unicode support in like and glob functions.

Case sensitive comparison is implemented by u_tolower (instead of upper)
to work with Turkish 'İ' letter.

Closes #3007
---
 src/box/sql/func.c              | 179 ++++++++++++++++++++++------------------
 src/box/sql/whereexpr.c         |   3 +-
 test/sql-tap/collation.test.lua |  49 ++++++++++-
 3 files changed, 148 insertions(+), 83 deletions(-)

diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index fa4923403..4f33bf43b 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -38,6 +38,11 @@
 #include "vdbeInt.h"
 #include <unicode/ustring.h>
 #include <unicode/ucasemap.h>
+#include <unicode/ucnv.h>
+#include <unicode/uchar.h>
+
+static UCaseMap *pUCaseMap;
+static UConverter* pUtf8conv;
 
 /*
  * Return the collating function associated with a function.
@@ -477,8 +482,6 @@ contextMalloc(sqlite3_context * context, i64 nByte)
  * Implementation of the upper() and lower() SQL functions.
  */
 
-static UCaseMap *pUCaseMap;
-
 #define ICU_CASE_CONVERT(case_type)                                            \
 static void                                                                    \
 case_type##ICUFunc(sqlite3_context *context, int argc, sqlite3_value **argv)   \
@@ -642,7 +645,7 @@ struct compareInfo {
 #define sqlite3Utf8Read(A)        (*((*A)++))
 #define Utf8Read(A)               (*(A++))
 #else
-#define Utf8Read(A)               (A[0]<0x80?*(A++):sqlite3Utf8Read(&A))
+#define Utf8Read(s, e)    ucnv_getNextUChar(pUtf8conv, &s, e, &status)
 #endif
 
 static const struct compareInfo globInfo = { '*', '?', '[', 0 };
@@ -703,36 +706,43 @@ static const struct compareInfo likeInfoAlt = { '%', '_', 0, 0 };
  * This routine is usually quick, but can be N**2 in the worst case.
  */
 static int
-patternCompare(const u8 * zPattern,	/* The glob pattern */
-	       const u8 * zString,	/* The string to compare against the glob */
+patternCompare(const char * pattern,	/* The glob pattern */
+	       const char * string,	/* The string to compare against the glob */
 	       const struct compareInfo *pInfo,	/* Information about how to do the compare */
-	       u32 matchOther	/* The escape char (LIKE) or '[' (GLOB) */
+	       UChar32 matchOther	/* The escape char (LIKE) or '[' (GLOB) */
     )
 {
-	u32 c, c2;		/* Next pattern and input string chars */
-	u32 matchOne = pInfo->matchOne;	/* "?" or "_" */
-	u32 matchAll = pInfo->matchAll;	/* "*" or "%" */
-	u8 noCase = pInfo->noCase;	/* True if uppercase==lowercase */
-	const u8 *zEscaped = 0;	/* One past the last escaped input char */
+	UChar32 c, c2;		/* Next pattern and input string chars */
+	UChar32 matchOne = pInfo->matchOne;	/* "?" or "_" */
+	UChar32 matchAll = pInfo->matchAll;	/* "*" or "%" */
+	UChar32 noCase = pInfo->noCase;	/* True if uppercase==lowercase */
+	const char *zEscaped = 0;	/* One past the last escaped input char */
+	const char * pattern_end = pattern + strlen(pattern);
+	const char * string_end = string + strlen(string);
+	UErrorCode status = U_ZERO_ERROR;
 
-	while ((c = Utf8Read(zPattern)) != 0) {
+	while (pattern < pattern_end){
+		c = Utf8Read(pattern, pattern_end);
 		if (c == matchAll) {	/* Match "*" */
 			/* Skip over multiple "*" characters in the pattern.  If there
 			 * are also "?" characters, skip those as well, but consume a
 			 * single character of the input string for each "?" skipped
 			 */
-			while ((c = Utf8Read(zPattern)) == matchAll
-			       || c == matchOne) {
+			while (pattern < pattern_end){
+				c = Utf8Read(pattern, pattern_end);
+				if (c != matchAll && c != matchOne)
+					break;
 				if (c == matchOne
-				    && sqlite3Utf8Read(&zString) == 0) {
+				    && Utf8Read(string, string_end) == 0) {
 					return SQLITE_NOWILDCARDMATCH;
 				}
 			}
-			if (c == 0) {
-				return SQLITE_MATCH;	/* "*" at the end of the pattern matches */
-			} else if (c == matchOther) {
+			/* "*" at the end of the pattern matches */
+			if (pattern == pattern_end)
+				return SQLITE_MATCH;
+			if (c == matchOther) {
 				if (pInfo->matchSet == 0) {
-					c = sqlite3Utf8Read(&zPattern);
+					c = Utf8Read(pattern, pattern_end);
 					if (c == 0)
 						return SQLITE_NOWILDCARDMATCH;
 				} else {
@@ -740,16 +750,15 @@ patternCompare(const u8 * zPattern,	/* The glob pattern */
 					 * recursive search in this case, but it is an unusual case.
 					 */
 					assert(matchOther < 0x80);	/* '[' is a single-byte character */
-					while (*zString) {
+					while (string < string_end) {
 						int bMatch =
-						    patternCompare(&zPattern
-								   [-1],
-								   zString,
+						    patternCompare(&pattern[-1],
+								   string,
 								   pInfo,
 								   matchOther);
 						if (bMatch != SQLITE_NOMATCH)
 							return bMatch;
-						SQLITE_SKIP_UTF8(zString);
+						Utf8Read(string, string_end);
 					}
 					return SQLITE_NOWILDCARDMATCH;
 				}
@@ -764,66 +773,63 @@ patternCompare(const u8 * zPattern,	/* The glob pattern */
 			 * c but in the other case and search the input string for either
 			 * c or cx.
 			 */
-			if (c <= 0x80) {
-				u32 cx;
-				int bMatch;
-				if (noCase) {
-					cx = sqlite3Toupper(c);
-					c = sqlite3Tolower(c);
-				} else {
-					cx = c;
-				}
-				while ((c2 = *(zString++)) != 0) {
-					if (c2 != c && c2 != cx)
-						continue;
-					bMatch =
-					    patternCompare(zPattern, zString,
-							   pInfo, matchOther);
-					if (bMatch != SQLITE_NOMATCH)
-						return bMatch;
-				}
-			} else {
-				int bMatch;
-				while ((c2 = Utf8Read(zString)) != 0) {
+
+			int bMatch;
+			if (noCase)
+				c = u_tolower(c);
+			while (string < string_end){
+				/**
+				 * This loop could have been implemented
+				 * without if converting c2 to lower case
+				 * (by holding c_upper and c_lower), however
+				 * it is implemented this way because lower
+				 * works better with German and Turkish
+				 * languages.
+				 */
+				c2 = Utf8Read(string, string_end);
+				if (!noCase) {
 					if (c2 != c)
 						continue;
-					bMatch =
-					    patternCompare(zPattern, zString,
-							   pInfo, matchOther);
-					if (bMatch != SQLITE_NOMATCH)
-						return bMatch;
+				} else {
+					if (c2 != c && u_tolower(c2) != c)
+						continue;
 				}
+				bMatch =
+				    patternCompare(pattern, string,
+						   pInfo, matchOther);
+				if (bMatch != SQLITE_NOMATCH)
+					return bMatch;
 			}
 			return SQLITE_NOWILDCARDMATCH;
 		}
 		if (c == matchOther) {
 			if (pInfo->matchSet == 0) {
-				c = sqlite3Utf8Read(&zPattern);
+				c = Utf8Read(pattern, pattern_end);
 				if (c == 0)
 					return SQLITE_NOMATCH;
-				zEscaped = zPattern;
+				zEscaped = pattern;
 			} else {
-				u32 prior_c = 0;
+				UChar32 prior_c = 0;
 				int seen = 0;
 				int invert = 0;
-				c = sqlite3Utf8Read(&zString);
-				if (c == 0)
+				c = Utf8Read(string, string_end);
+				if (string == string_end)
 					return SQLITE_NOMATCH;
-				c2 = sqlite3Utf8Read(&zPattern);
+				c2 = Utf8Read(pattern, pattern_end);
 				if (c2 == '^') {
 					invert = 1;
-					c2 = sqlite3Utf8Read(&zPattern);
+					c2 = Utf8Read(pattern, pattern_end);
 				}
 				if (c2 == ']') {
 					if (c == ']')
 						seen = 1;
-					c2 = sqlite3Utf8Read(&zPattern);
+					c2 = Utf8Read(pattern, pattern_end);
 				}
 				while (c2 && c2 != ']') {
-					if (c2 == '-' && zPattern[0] != ']'
-					    && zPattern[0] != 0
+					if (c2 == '-' && pattern[0] != ']'
+					    && pattern < pattern_end
 					    && prior_c > 0) {
-						c2 = sqlite3Utf8Read(&zPattern);
+						c2 = Utf8Read(pattern, pattern_end);
 						if (c >= prior_c && c <= c2)
 							seen = 1;
 						prior_c = 0;
@@ -833,26 +839,34 @@ patternCompare(const u8 * zPattern,	/* The glob pattern */
 						}
 						prior_c = c2;
 					}
-					c2 = sqlite3Utf8Read(&zPattern);
+					c2 = Utf8Read(pattern, pattern_end);
 				}
-				if (c2 == 0 || (seen ^ invert) == 0) {
+				if (pattern == pattern_end || (seen ^ invert) == 0) {
 					return SQLITE_NOMATCH;
 				}
 				continue;
 			}
 		}
-		c2 = Utf8Read(zString);
+		c2 = Utf8Read(string, string_end);
 		if (c == c2)
 			continue;
-		if (noCase && sqlite3Tolower(c) == sqlite3Tolower(c2)
-		    && c < 0x80 && c2 < 0x80) {
-			continue;
+		if (noCase){
+			/**
+			 * Small optimisation. Reduce number of calls
+			 * to u_tolower function.
+			 * SQL standards suggest use to_upper for symbol
+			 * normalisation. However, using to_lower allows to
+			 * respect Turkish 'İ' in default locale.
+			 */
+			if (u_tolower(c) == c2 ||
+			    c == u_tolower(c2))
+				continue;
 		}
-		if (c == matchOne && zPattern != zEscaped && c2 != 0)
+		if (c == matchOne && pattern != zEscaped && c2 != 0)
 			continue;
 		return SQLITE_NOMATCH;
 	}
-	return *zString == 0 ? SQLITE_MATCH : SQLITE_NOMATCH;
+	return string == string_end ? SQLITE_MATCH : SQLITE_NOMATCH;
 }
 
 /*
@@ -862,7 +876,7 @@ patternCompare(const u8 * zPattern,	/* The glob pattern */
 int
 sqlite3_strglob(const char *zGlobPattern, const char *zString)
 {
-	return patternCompare((u8 *) zGlobPattern, (u8 *) zString, &globInfo,
+	return patternCompare(zGlobPattern, zString, &globInfo,
 			      '[');
 }
 
@@ -873,7 +887,7 @@ sqlite3_strglob(const char *zGlobPattern, const char *zString)
 int
 sqlite3_strlike(const char *zPattern, const char *zStr, unsigned int esc)
 {
-	return patternCompare((u8 *) zPattern, (u8 *) zStr, &likeInfoNorm, esc);
+	return patternCompare(zPattern, zStr, &likeInfoNorm, esc);
 }
 
 /*
@@ -900,7 +914,7 @@ int sqlite3_like_count = 0;
 static void
 likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
 {
-	const unsigned char *zA, *zB;
+	const char *zA, *zB;
 	u32 escape;
 	int nPat;
 	sqlite3 *db = sqlite3_context_db_handle(context);
@@ -916,8 +930,8 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
 		return;
 	}
 #endif
-	zB = sqlite3_value_text(argv[0]);
-	zA = sqlite3_value_text(argv[1]);
+	zB = (const char *) sqlite3_value_text(argv[0]);
+	zA = (const char *) sqlite3_value_text(argv[1]);
 
 	/* Limit the length of the LIKE or GLOB pattern to avoid problems
 	 * of deep recursion and N*N behavior in patternCompare().
@@ -930,7 +944,8 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
 				     "LIKE or GLOB pattern too complex", -1);
 		return;
 	}
-	assert(zB == sqlite3_value_text(argv[0]));	/* Encoding did not change */
+	/* Encoding did not change */
+	assert(zB == (const char *) sqlite3_value_text(argv[0]));
 
 	if (argc == 3) {
 		/* The escape character string must consist of a single UTF-8 character.
@@ -949,14 +964,14 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
 	} else {
 		escape = pInfo->matchSet;
 	}
-	if (zA && zB) {
+	if (!zA || !zB)
+		return;
 #ifdef SQLITE_TEST
-		sqlite3_like_count++;
+	sqlite3_like_count++;
 #endif
-		sqlite3_result_int(context,
-				   patternCompare(zB, zA, pInfo,
-						  escape) == SQLITE_MATCH);
-	}
+	int res;
+	res = patternCompare(zB, zA, pInfo, escape);
+	sqlite3_result_int(context, res == SQLITE_MATCH);
 }
 
 /*
@@ -1839,6 +1854,8 @@ sqlite3RegisterBuiltinFunctions(void)
 
 	pUCaseMap = ucasemap_open(NULL, 0, &status);
 	assert(pUCaseMap);
+	pUtf8conv = ucnv_open("utf8", &status);
+	assert(pUtf8conv);
 	/*
 	 * The following array holds FuncDef structures for all of the functions
 	 * defined in this file.
diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
index 737fbb9c8..e52951d0a 100644
--- a/src/box/sql/whereexpr.c
+++ b/src/box/sql/whereexpr.c
@@ -1211,7 +1211,8 @@ exprAnalyze(SrcList * pSrc,	/* the FROM clause */
 			}
 			*pC = c + 1;
 		}
-		zCollSeqName = noCase ? "NOCASE" : "BINARY";
+		/* Support only for unicode_ci indexes by now */
+		zCollSeqName = noCase ? "unicode_ci" : "BINARY";
 		pNewExpr1 = sqlite3ExprDup(db, pLeft, 0);
 		pNewExpr1 = sqlite3PExpr(pParse, TK_GE,
 					 sqlite3ExprAddCollateString(pParse,
diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation.test.lua
index 85652dc7d..8a98de987 100755
--- a/test/sql-tap/collation.test.lua
+++ b/test/sql-tap/collation.test.lua
@@ -1,6 +1,6 @@
 #!/usr/bin/env tarantool
 test = require("sqltester")
-test:plan(162)
+test:plan(173)
 
 local prefix = "collation-"
 
@@ -202,4 +202,51 @@ for _, data_collation in ipairs(data_collations) do
     end
 end
 
+-- Like uses collation (only for unicode_ci and binary)
+local like_testcases =
+{
+    {"2.0",
+    [[
+        CREATE TABLE tx1 (s1 CHAR(5) PRIMARY KEY);
+        CREATE INDEX I1 on tx1(s1 collate "unicode_ci");
+        INSERT INTO tx1 VALUES('aaa');
+        INSERT INTO tx1 VALUES('Aab');
+        INSERT INTO tx1 VALUES('İac');
+        INSERT INTO tx1 VALUES('iad');
+    ]], {0}},
+    {"2.1.1",
+        "SELECT * FROM tx1 WHERE s1 LIKE 'A%' order by s1;",
+        {0, {"Aab", "aaa"}} },
+    {"2.1.2",
+        "EXPLAIN QUERY PLAN SELECT * FROM tx1 WHERE s1 LIKE 'A%';",
+        {0, {0, 0, 0, "/USING COVERING INDEX I1/"}} },
+    {"2.2.0",
+        "PRAGMA case_sensitive_like = true;",
+        {0}},
+    {"2.2.1",
+        "SELECT * FROM tx1 WHERE s1 LIKE 'A%' order by s1;",
+        {0, {"Aab"}} },
+    {"2.2.2",
+        "EXPLAIN QUERY PLAN SELECT * FROM tx1 WHERE s1 LIKE 'A%';",
+        {0, {0, 0, 0, "/USING PRIMARY KEY/"}} },
+    {"2.3.0",
+        "PRAGMA case_sensitive_like = false;",
+        {0}},
+    {"2.3.1",
+        "SELECT * FROM tx1 WHERE s1 LIKE 'i%' order by s1;",
+        {0, {"iad", "İac"}}},
+    {"2.3.2",
+        "SELECT * FROM tx1 WHERE s1 LIKE 'İ%'order by s1;",
+        {0, {"iad", "İac"}} },
+    {"2.4.0",
+    [[
+        INSERT INTO tx1 VALUES('ЯЁЮ');
+    ]], {0} },
+    {"2.4.1",
+        "SELECT * FROM tx1 WHERE s1 LIKE 'яёю';",
+        {0, {"ЯЁЮ"}} },
+}
+
+test:do_catchsql_set_test(like_testcases, prefix)
+
 test:finish_test()
-- 
2.14.1




More information about the Tarantool-patches mailing list