[patches] [PATCH v2 2/2] sql: like func: fix index optimization, add uncode supprot
AKhatskevich
avkhatskevich at tarantool.org
Tue Feb 6 15:53:14 MSK 2018
A like pattern of the form "x LIKE 'aBc%'" is optimized like
`x>='ABC' AND x<'abd' AND x LIKE 'aBc%'` if there is a "unicode_ci" index.
Other indexes are not supported yet.
This commit also implements unicode support in like and glob functions.
Case sensitive comparison is implemented by u_tolower (instead of upper)
to work with Turkish 'İ' letter.
Closes #3007
---
src/box/sql/func.c | 179 ++++++++++++++++++++++------------------
src/box/sql/whereexpr.c | 3 +-
test/sql-tap/collation.test.lua | 49 ++++++++++-
3 files changed, 148 insertions(+), 83 deletions(-)
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index fa4923403..4f33bf43b 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -38,6 +38,11 @@
#include "vdbeInt.h"
#include <unicode/ustring.h>
#include <unicode/ucasemap.h>
+#include <unicode/ucnv.h>
+#include <unicode/uchar.h>
+
+static UCaseMap *pUCaseMap;
+static UConverter* pUtf8conv;
/*
* Return the collating function associated with a function.
@@ -477,8 +482,6 @@ contextMalloc(sqlite3_context * context, i64 nByte)
* Implementation of the upper() and lower() SQL functions.
*/
-static UCaseMap *pUCaseMap;
-
#define ICU_CASE_CONVERT(case_type) \
static void \
case_type##ICUFunc(sqlite3_context *context, int argc, sqlite3_value **argv) \
@@ -642,7 +645,7 @@ struct compareInfo {
#define sqlite3Utf8Read(A) (*((*A)++))
#define Utf8Read(A) (*(A++))
#else
-#define Utf8Read(A) (A[0]<0x80?*(A++):sqlite3Utf8Read(&A))
+#define Utf8Read(s, e) ucnv_getNextUChar(pUtf8conv, &s, e, &status)
#endif
static const struct compareInfo globInfo = { '*', '?', '[', 0 };
@@ -703,36 +706,43 @@ static const struct compareInfo likeInfoAlt = { '%', '_', 0, 0 };
* This routine is usually quick, but can be N**2 in the worst case.
*/
static int
-patternCompare(const u8 * zPattern, /* The glob pattern */
- const u8 * zString, /* The string to compare against the glob */
+patternCompare(const char * pattern, /* The glob pattern */
+ const char * string, /* The string to compare against the glob */
const struct compareInfo *pInfo, /* Information about how to do the compare */
- u32 matchOther /* The escape char (LIKE) or '[' (GLOB) */
+ UChar32 matchOther /* The escape char (LIKE) or '[' (GLOB) */
)
{
- u32 c, c2; /* Next pattern and input string chars */
- u32 matchOne = pInfo->matchOne; /* "?" or "_" */
- u32 matchAll = pInfo->matchAll; /* "*" or "%" */
- u8 noCase = pInfo->noCase; /* True if uppercase==lowercase */
- const u8 *zEscaped = 0; /* One past the last escaped input char */
+ UChar32 c, c2; /* Next pattern and input string chars */
+ UChar32 matchOne = pInfo->matchOne; /* "?" or "_" */
+ UChar32 matchAll = pInfo->matchAll; /* "*" or "%" */
+ UChar32 noCase = pInfo->noCase; /* True if uppercase==lowercase */
+ const char *zEscaped = 0; /* One past the last escaped input char */
+ const char * pattern_end = pattern + strlen(pattern);
+ const char * string_end = string + strlen(string);
+ UErrorCode status = U_ZERO_ERROR;
- while ((c = Utf8Read(zPattern)) != 0) {
+ while (pattern < pattern_end){
+ c = Utf8Read(pattern, pattern_end);
if (c == matchAll) { /* Match "*" */
/* Skip over multiple "*" characters in the pattern. If there
* are also "?" characters, skip those as well, but consume a
* single character of the input string for each "?" skipped
*/
- while ((c = Utf8Read(zPattern)) == matchAll
- || c == matchOne) {
+ while (pattern < pattern_end){
+ c = Utf8Read(pattern, pattern_end);
+ if (c != matchAll && c != matchOne)
+ break;
if (c == matchOne
- && sqlite3Utf8Read(&zString) == 0) {
+ && Utf8Read(string, string_end) == 0) {
return SQLITE_NOWILDCARDMATCH;
}
}
- if (c == 0) {
- return SQLITE_MATCH; /* "*" at the end of the pattern matches */
- } else if (c == matchOther) {
+ /* "*" at the end of the pattern matches */
+ if (pattern == pattern_end)
+ return SQLITE_MATCH;
+ if (c == matchOther) {
if (pInfo->matchSet == 0) {
- c = sqlite3Utf8Read(&zPattern);
+ c = Utf8Read(pattern, pattern_end);
if (c == 0)
return SQLITE_NOWILDCARDMATCH;
} else {
@@ -740,16 +750,15 @@ patternCompare(const u8 * zPattern, /* The glob pattern */
* recursive search in this case, but it is an unusual case.
*/
assert(matchOther < 0x80); /* '[' is a single-byte character */
- while (*zString) {
+ while (string < string_end) {
int bMatch =
- patternCompare(&zPattern
- [-1],
- zString,
+ patternCompare(&pattern[-1],
+ string,
pInfo,
matchOther);
if (bMatch != SQLITE_NOMATCH)
return bMatch;
- SQLITE_SKIP_UTF8(zString);
+ Utf8Read(string, string_end);
}
return SQLITE_NOWILDCARDMATCH;
}
@@ -764,66 +773,63 @@ patternCompare(const u8 * zPattern, /* The glob pattern */
* c but in the other case and search the input string for either
* c or cx.
*/
- if (c <= 0x80) {
- u32 cx;
- int bMatch;
- if (noCase) {
- cx = sqlite3Toupper(c);
- c = sqlite3Tolower(c);
- } else {
- cx = c;
- }
- while ((c2 = *(zString++)) != 0) {
- if (c2 != c && c2 != cx)
- continue;
- bMatch =
- patternCompare(zPattern, zString,
- pInfo, matchOther);
- if (bMatch != SQLITE_NOMATCH)
- return bMatch;
- }
- } else {
- int bMatch;
- while ((c2 = Utf8Read(zString)) != 0) {
+
+ int bMatch;
+ if (noCase)
+ c = u_tolower(c);
+ while (string < string_end){
+ /**
+ * This loop could have been implemented
+ * without if converting c2 to lower case
+ * (by holding c_upper and c_lower), however
+ * it is implemented this way because lower
+ * works better with German and Turkish
+ * languages.
+ */
+ c2 = Utf8Read(string, string_end);
+ if (!noCase) {
if (c2 != c)
continue;
- bMatch =
- patternCompare(zPattern, zString,
- pInfo, matchOther);
- if (bMatch != SQLITE_NOMATCH)
- return bMatch;
+ } else {
+ if (c2 != c && u_tolower(c2) != c)
+ continue;
}
+ bMatch =
+ patternCompare(pattern, string,
+ pInfo, matchOther);
+ if (bMatch != SQLITE_NOMATCH)
+ return bMatch;
}
return SQLITE_NOWILDCARDMATCH;
}
if (c == matchOther) {
if (pInfo->matchSet == 0) {
- c = sqlite3Utf8Read(&zPattern);
+ c = Utf8Read(pattern, pattern_end);
if (c == 0)
return SQLITE_NOMATCH;
- zEscaped = zPattern;
+ zEscaped = pattern;
} else {
- u32 prior_c = 0;
+ UChar32 prior_c = 0;
int seen = 0;
int invert = 0;
- c = sqlite3Utf8Read(&zString);
- if (c == 0)
+ c = Utf8Read(string, string_end);
+ if (string == string_end)
return SQLITE_NOMATCH;
- c2 = sqlite3Utf8Read(&zPattern);
+ c2 = Utf8Read(pattern, pattern_end);
if (c2 == '^') {
invert = 1;
- c2 = sqlite3Utf8Read(&zPattern);
+ c2 = Utf8Read(pattern, pattern_end);
}
if (c2 == ']') {
if (c == ']')
seen = 1;
- c2 = sqlite3Utf8Read(&zPattern);
+ c2 = Utf8Read(pattern, pattern_end);
}
while (c2 && c2 != ']') {
- if (c2 == '-' && zPattern[0] != ']'
- && zPattern[0] != 0
+ if (c2 == '-' && pattern[0] != ']'
+ && pattern < pattern_end
&& prior_c > 0) {
- c2 = sqlite3Utf8Read(&zPattern);
+ c2 = Utf8Read(pattern, pattern_end);
if (c >= prior_c && c <= c2)
seen = 1;
prior_c = 0;
@@ -833,26 +839,34 @@ patternCompare(const u8 * zPattern, /* The glob pattern */
}
prior_c = c2;
}
- c2 = sqlite3Utf8Read(&zPattern);
+ c2 = Utf8Read(pattern, pattern_end);
}
- if (c2 == 0 || (seen ^ invert) == 0) {
+ if (pattern == pattern_end || (seen ^ invert) == 0) {
return SQLITE_NOMATCH;
}
continue;
}
}
- c2 = Utf8Read(zString);
+ c2 = Utf8Read(string, string_end);
if (c == c2)
continue;
- if (noCase && sqlite3Tolower(c) == sqlite3Tolower(c2)
- && c < 0x80 && c2 < 0x80) {
- continue;
+ if (noCase){
+ /**
+ * Small optimisation. Reduce number of calls
+ * to u_tolower function.
+ * SQL standards suggest use to_upper for symbol
+ * normalisation. However, using to_lower allows to
+ * respect Turkish 'İ' in default locale.
+ */
+ if (u_tolower(c) == c2 ||
+ c == u_tolower(c2))
+ continue;
}
- if (c == matchOne && zPattern != zEscaped && c2 != 0)
+ if (c == matchOne && pattern != zEscaped && c2 != 0)
continue;
return SQLITE_NOMATCH;
}
- return *zString == 0 ? SQLITE_MATCH : SQLITE_NOMATCH;
+ return string == string_end ? SQLITE_MATCH : SQLITE_NOMATCH;
}
/*
@@ -862,7 +876,7 @@ patternCompare(const u8 * zPattern, /* The glob pattern */
int
sqlite3_strglob(const char *zGlobPattern, const char *zString)
{
- return patternCompare((u8 *) zGlobPattern, (u8 *) zString, &globInfo,
+ return patternCompare(zGlobPattern, zString, &globInfo,
'[');
}
@@ -873,7 +887,7 @@ sqlite3_strglob(const char *zGlobPattern, const char *zString)
int
sqlite3_strlike(const char *zPattern, const char *zStr, unsigned int esc)
{
- return patternCompare((u8 *) zPattern, (u8 *) zStr, &likeInfoNorm, esc);
+ return patternCompare(zPattern, zStr, &likeInfoNorm, esc);
}
/*
@@ -900,7 +914,7 @@ int sqlite3_like_count = 0;
static void
likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
{
- const unsigned char *zA, *zB;
+ const char *zA, *zB;
u32 escape;
int nPat;
sqlite3 *db = sqlite3_context_db_handle(context);
@@ -916,8 +930,8 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
return;
}
#endif
- zB = sqlite3_value_text(argv[0]);
- zA = sqlite3_value_text(argv[1]);
+ zB = (const char *) sqlite3_value_text(argv[0]);
+ zA = (const char *) sqlite3_value_text(argv[1]);
/* Limit the length of the LIKE or GLOB pattern to avoid problems
* of deep recursion and N*N behavior in patternCompare().
@@ -930,7 +944,8 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
"LIKE or GLOB pattern too complex", -1);
return;
}
- assert(zB == sqlite3_value_text(argv[0])); /* Encoding did not change */
+ /* Encoding did not change */
+ assert(zB == (const char *) sqlite3_value_text(argv[0]));
if (argc == 3) {
/* The escape character string must consist of a single UTF-8 character.
@@ -949,14 +964,14 @@ likeFunc(sqlite3_context * context, int argc, sqlite3_value ** argv)
} else {
escape = pInfo->matchSet;
}
- if (zA && zB) {
+ if (!zA || !zB)
+ return;
#ifdef SQLITE_TEST
- sqlite3_like_count++;
+ sqlite3_like_count++;
#endif
- sqlite3_result_int(context,
- patternCompare(zB, zA, pInfo,
- escape) == SQLITE_MATCH);
- }
+ int res;
+ res = patternCompare(zB, zA, pInfo, escape);
+ sqlite3_result_int(context, res == SQLITE_MATCH);
}
/*
@@ -1839,6 +1854,8 @@ sqlite3RegisterBuiltinFunctions(void)
pUCaseMap = ucasemap_open(NULL, 0, &status);
assert(pUCaseMap);
+ pUtf8conv = ucnv_open("utf8", &status);
+ assert(pUtf8conv);
/*
* The following array holds FuncDef structures for all of the functions
* defined in this file.
diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c
index 737fbb9c8..e52951d0a 100644
--- a/src/box/sql/whereexpr.c
+++ b/src/box/sql/whereexpr.c
@@ -1211,7 +1211,8 @@ exprAnalyze(SrcList * pSrc, /* the FROM clause */
}
*pC = c + 1;
}
- zCollSeqName = noCase ? "NOCASE" : "BINARY";
+ /* Support only for unicode_ci indexes by now */
+ zCollSeqName = noCase ? "unicode_ci" : "BINARY";
pNewExpr1 = sqlite3ExprDup(db, pLeft, 0);
pNewExpr1 = sqlite3PExpr(pParse, TK_GE,
sqlite3ExprAddCollateString(pParse,
diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation.test.lua
index 85652dc7d..8a98de987 100755
--- a/test/sql-tap/collation.test.lua
+++ b/test/sql-tap/collation.test.lua
@@ -1,6 +1,6 @@
#!/usr/bin/env tarantool
test = require("sqltester")
-test:plan(162)
+test:plan(173)
local prefix = "collation-"
@@ -202,4 +202,51 @@ for _, data_collation in ipairs(data_collations) do
end
end
+-- Like uses collation (only for unicode_ci and binary)
+local like_testcases =
+{
+ {"2.0",
+ [[
+ CREATE TABLE tx1 (s1 CHAR(5) PRIMARY KEY);
+ CREATE INDEX I1 on tx1(s1 collate "unicode_ci");
+ INSERT INTO tx1 VALUES('aaa');
+ INSERT INTO tx1 VALUES('Aab');
+ INSERT INTO tx1 VALUES('İac');
+ INSERT INTO tx1 VALUES('iad');
+ ]], {0}},
+ {"2.1.1",
+ "SELECT * FROM tx1 WHERE s1 LIKE 'A%' order by s1;",
+ {0, {"Aab", "aaa"}} },
+ {"2.1.2",
+ "EXPLAIN QUERY PLAN SELECT * FROM tx1 WHERE s1 LIKE 'A%';",
+ {0, {0, 0, 0, "/USING COVERING INDEX I1/"}} },
+ {"2.2.0",
+ "PRAGMA case_sensitive_like = true;",
+ {0}},
+ {"2.2.1",
+ "SELECT * FROM tx1 WHERE s1 LIKE 'A%' order by s1;",
+ {0, {"Aab"}} },
+ {"2.2.2",
+ "EXPLAIN QUERY PLAN SELECT * FROM tx1 WHERE s1 LIKE 'A%';",
+ {0, {0, 0, 0, "/USING PRIMARY KEY/"}} },
+ {"2.3.0",
+ "PRAGMA case_sensitive_like = false;",
+ {0}},
+ {"2.3.1",
+ "SELECT * FROM tx1 WHERE s1 LIKE 'i%' order by s1;",
+ {0, {"iad", "İac"}}},
+ {"2.3.2",
+ "SELECT * FROM tx1 WHERE s1 LIKE 'İ%'order by s1;",
+ {0, {"iad", "İac"}} },
+ {"2.4.0",
+ [[
+ INSERT INTO tx1 VALUES('ЯЁЮ');
+ ]], {0} },
+ {"2.4.1",
+ "SELECT * FROM tx1 WHERE s1 LIKE 'яёю';",
+ {0, {"ЯЁЮ"}} },
+}
+
+test:do_catchsql_set_test(like_testcases, prefix)
+
test:finish_test()
--
2.14.1
More information about the Tarantool-patches
mailing list