From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0A60825784 for ; Sat, 13 Jul 2019 18:51:12 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id X2w10i7HpR_9 for ; Sat, 13 Jul 2019 18:51:11 -0400 (EDT) Received: from smtp50.i.mail.ru (smtp50.i.mail.ru [94.100.177.110]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 8EDA62577D for ; Sat, 13 Jul 2019 18:51:11 -0400 (EDT) From: Roman Khabibov Subject: [tarantool-patches] [PATCH 2/2] sql: make LIKE predicate dependent on collation Date: Sun, 14 Jul 2019 01:51:08 +0300 Message-Id: <67ec7b5425d16078e45571c99ba9b58859b3c7b8.1563057282.git.roman.habibov@tarantool.org> In-Reply-To: References: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-Help: List-Unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-Subscribe: List-Owner: List-post: List-Archive: To: tarantool-patches@freelists.org Cc: korablev@tarantool.org According to ANSI, LIKE should match characters taking into account passed collation. ISO/IEC JTC 1/SC 32 2011, Part 2: Foundation, page 445 Closes #3589 --- src/box/sql/func.c | 97 +++++++++++-------- src/box/sql/sqlInt.h | 2 +- src/box/sql/whereexpr.c | 62 ++++-------- test/sql-tap/collation.test.lua | 35 +++---- ...gh-3251-string-pattern-comparison.test.lua | 90 ++++++++++++++++- test/sql-tap/like3.test.lua | 4 +- test/sql-tap/whereG.test.lua | 8 +- 7 files changed, 187 insertions(+), 111 deletions(-) diff --git a/src/box/sql/func.c b/src/box/sql/func.c index 4e4d14cf7..b660d05c5 100644 --- a/src/box/sql/func.c +++ b/src/box/sql/func.c @@ -44,6 +44,7 @@ #include #include #include +#include "box/coll_id_cache.h" static UConverter* pUtf8conv; @@ -673,6 +674,29 @@ enum pattern_match_status { INVALID_PATTERN = 3 }; +/** + * Read an UTF-8 character from @a str and move it to to @a + * char_len of that character. + * + * @param[out] str Ptr to ptr to string. + * @param str_end Ptr to string last symbol. + * @param[out] char_ptr Ptr to ptr to the UTF-8 character. + * @param[out] char_len Ptr to length of the UTF-8 character in + * bytes. + * + * @retval UTF-8 character. + */ +static UChar32 +step_utf8_char(const char **str, const char *str_end, const char **char_ptr, + int *char_len) +{ + UErrorCode status = U_ZERO_ERROR; + *char_ptr = *str; + UChar32 next_utf8 = Utf8Read(*str, str_end); + *char_len = *str - *char_ptr; + return next_utf8; +} + /** * Compare two UTF-8 strings for equality where the first string * is a LIKE expression. @@ -699,7 +723,7 @@ enum pattern_match_status { * @param string String being compared. * @param pattern_end Ptr to pattern last symbol. * @param string_end Ptr to string last symbol. - * @param is_like_ci true if LIKE is case insensitive. + * @param coll Pointer to collation. * @param match_other The escape char for LIKE. * * @retval One of pattern_match_status values. @@ -709,7 +733,7 @@ sql_utf8_pattern_compare(const char *pattern, const char *string, const char *pattern_end, const char *string_end, - const int is_like_ci, + struct coll *coll, UChar32 match_other) { /* Next pattern and input string chars. */ @@ -717,9 +741,13 @@ sql_utf8_pattern_compare(const char *pattern, /* One past the last escaped input char. */ const char *zEscaped = 0; UErrorCode status = U_ZERO_ERROR; + const char *pat_char_ptr; + const char *str_char_ptr; + int pat_char_len; + int str_char_len; while (pattern < pattern_end) { - c = Utf8Read(pattern, pattern_end); + c = step_utf8_char(&pattern, pattern_end, &pat_char_ptr, &pat_char_len); if (c == SQL_INVALID_UTF8_SYMBOL) return INVALID_PATTERN; if (c == MATCH_ALL_WILDCARD) { @@ -730,7 +758,9 @@ sql_utf8_pattern_compare(const char *pattern, * consume a single character of the * input string for each "_" skipped. */ - while ((c = Utf8Read(pattern, pattern_end)) != + while ((c = step_utf8_char(&pattern, pattern_end, + &pat_char_ptr, + &pat_char_len)) != SQL_END_OF_STRING) { if (c == SQL_INVALID_UTF8_SYMBOL) return INVALID_PATTERN; @@ -751,7 +781,9 @@ sql_utf8_pattern_compare(const char *pattern, return MATCH; } if (c == match_other) { - c = Utf8Read(pattern, pattern_end); + c = step_utf8_char(&pattern, pattern_end, + &pat_char_ptr, + &pat_char_len); if (c == SQL_INVALID_UTF8_SYMBOL) return INVALID_PATTERN; if (c == SQL_END_OF_STRING) @@ -773,8 +805,6 @@ sql_utf8_pattern_compare(const char *pattern, */ int bMatch; - if (is_like_ci) - c = u_tolower(c); while (string < string_end){ /* * This loop could have been @@ -786,21 +816,20 @@ sql_utf8_pattern_compare(const char *pattern, * lower works better with German * and Turkish languages. */ - c2 = Utf8Read(string, string_end); + c2 = step_utf8_char(&string, string_end, + &str_char_ptr, + &str_char_len); if (c2 == SQL_INVALID_UTF8_SYMBOL) return NO_MATCH; - if (!is_like_ci) { - if (c2 != c) - continue; - } else { - if (c2 != c && u_tolower(c2) != c) - continue; - } + if (coll->cmp(pat_char_ptr, pat_char_len, + str_char_ptr, str_char_len, coll) + != 0) + continue; bMatch = sql_utf8_pattern_compare(pattern, string, pattern_end, string_end, - is_like_ci, + coll, match_other); if (bMatch != NO_MATCH) return bMatch; @@ -808,30 +837,21 @@ sql_utf8_pattern_compare(const char *pattern, return NO_WILDCARD_MATCH; } if (c == match_other) { - c = Utf8Read(pattern, pattern_end); + c = step_utf8_char(&pattern, pattern_end, &pat_char_ptr, + &pat_char_len); if (c == SQL_INVALID_UTF8_SYMBOL) return INVALID_PATTERN; if (c == SQL_END_OF_STRING) return NO_MATCH; zEscaped = pattern; } - c2 = Utf8Read(string, string_end); + c2 = step_utf8_char(&string, string_end, &str_char_ptr, + &str_char_len); if (c2 == SQL_INVALID_UTF8_SYMBOL) return NO_MATCH; - if (c == c2) + if (coll->cmp(pat_char_ptr, pat_char_len, str_char_ptr, + str_char_len, coll) == 0) continue; - if (is_like_ci) { - /* - * Small optimization. Reduce number of - * calls to u_tolower function. SQL - * standards suggest use to_upper for - * symbol normalisation. However, using - * to_lower allows to respect Turkish 'İ' - * in default locale. - */ - if (u_tolower(c) == c2 || c == u_tolower(c2)) - continue; - } if (c == MATCH_ONE_WILDCARD && pattern != zEscaped && c2 != SQL_END_OF_STRING) continue; @@ -847,9 +867,10 @@ sql_utf8_pattern_compare(const char *pattern, int sql_strlike_cs(const char *zPattern, const char *zStr, unsigned int esc) { + struct coll_id *p = coll_by_name("unicode", strlen("unicode")); return sql_utf8_pattern_compare(zPattern, zStr, zPattern + strlen(zPattern), - zStr + strlen(zStr), 0, esc); + zStr + strlen(zStr), p->coll, esc); } /** @@ -859,9 +880,10 @@ sql_strlike_cs(const char *zPattern, const char *zStr, unsigned int esc) int sql_strlike_ci(const char *zPattern, const char *zStr, unsigned int esc) { + struct coll_id *p = coll_by_name("unicode_ci", strlen("unicode_ci")); return sql_utf8_pattern_compare(zPattern, zStr, zPattern + strlen(zPattern), - zStr + strlen(zStr), 1, esc); + zStr + strlen(zStr), p->coll, esc); } /** @@ -883,7 +905,6 @@ likeFunc(sql_context *context, int argc, sql_value **argv) u32 escape = SQL_END_OF_STRING; int nPat; sql *db = sql_context_db_handle(context); - int is_like_ci = SQL_PTR_TO_INT(sql_user_data(context)); int rhs_type = sql_value_type(argv[0]); int lhs_type = sql_value_type(argv[1]); @@ -940,8 +961,9 @@ likeFunc(sql_context *context, int argc, sql_value **argv) if (!zA || !zB) return; int res; - res = sql_utf8_pattern_compare(zB, zA, zB_end, zA_end, - is_like_ci, escape); + struct coll *coll = sqlGetFuncCollSeq(context); + res = sql_utf8_pattern_compare(zB, zA, zB_end, zA_end, coll, escape); + if (res == INVALID_PATTERN) { diag_set(ClientError, ER_SQL_EXECUTE, "LIKE pattern can only "\ "contain UTF-8 characters"); @@ -1792,7 +1814,7 @@ sqlRegisterPerConnectionBuiltinFunctions(sql * db) } int -sql_is_like_func(struct sql *db, struct Expr *expr, int *is_like_ci) +sql_is_like_func(struct sql *db, struct Expr *expr) { if (expr->op != TK_FUNCTION || !expr->x.pList || expr->x.pList->nExpr != 2) @@ -1802,7 +1824,6 @@ sql_is_like_func(struct sql *db, struct Expr *expr, int *is_like_ci) assert(func != NULL); if ((func->funcFlags & SQL_FUNC_LIKE) == 0) return 0; - *is_like_ci = (func->funcFlags & SQL_FUNC_CASE) == 0; return 1; } diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h index 57da8ec5d..989499abe 100644 --- a/src/box/sql/sqlInt.h +++ b/src/box/sql/sqlInt.h @@ -1356,7 +1356,7 @@ enum trim_side_mask { {nArg, SQL_FUNC_SLOCHNG|(bNC*SQL_FUNC_NEEDCOLL), \ pArg, 0, xFunc, 0, #zName, {SQL_AFF_STRING, {0}}, false} #define LIKEFUNC(zName, nArg, arg, flags, type) \ - {nArg, SQL_FUNC_CONSTANT|flags, \ + {nArg, SQL_FUNC_NEEDCOLL|SQL_FUNC_CONSTANT|flags, \ (void *)(SQL_INT_TO_PTR(arg)), 0, likeFunc, 0, #zName, {0}, type, false } #define AGGREGATE(zName, nArg, arg, nc, xStep, xFinal, type) \ {nArg, (nc*SQL_FUNC_NEEDCOLL), \ diff --git a/src/box/sql/whereexpr.c b/src/box/sql/whereexpr.c index 0a0e900bd..a97e768cf 100644 --- a/src/box/sql/whereexpr.c +++ b/src/box/sql/whereexpr.c @@ -228,13 +228,11 @@ operatorMask(int op) * In order for the operator to be optimizible, the RHS must be a * string literal that does not begin with a wildcard. The LHS * must be a column that may only be NULL, a string, or a BLOB, - * never a number. The collating sequence for the column on the - * LHS must be appropriate for the operator. + * never a number. * * @param pParse Parsing and code generating context. * @param pExpr Test this expression. - * @param ppPrefix Pointer to TK_STRING expression with - * pattern prefix. + * @param ppPrefix Pointer to expression with pattern prefix. * @param pisComplete True if the only wildcard is '%' in the * last character. * @retval True if the given expr is a LIKE operator & is @@ -276,9 +274,20 @@ like_optimization_is_valid(Parse *pParse, Expr *pExpr, Expr **ppPrefix, */ return 0; } + + /* Only for "binary" and "unicode_ci" collations. */ + struct field_def *field_def = pLeft->space_def->fields + pLeft->iColumn; + if (field_def->coll_id != 0 && field_def->coll_id != 2 && + field_def->coll_id != 3) + return 0; assert(pLeft->iColumn != (-1)); /* Because IPK never has AFF_TEXT */ pRight = sqlExprSkipCollate(pList->a[0].pExpr); + const char *coll_name = NULL; + if (pRight != pList->a[0].pExpr) { + assert(pList->a[0].pExpr->op == TK_COLLATE); + coll_name = pList->a[0].pExpr->u.zToken; + } op = pRight->op; if (op == TK_VARIABLE) { Vdbe *pReprepare = pParse->pReprepare; @@ -308,6 +317,10 @@ like_optimization_is_valid(Parse *pParse, Expr *pExpr, Expr **ppPrefix, pParse->is_aborted = true; else pPrefix->u.zToken[cnt] = 0; + if (coll_name != NULL) + pPrefix = sqlExprAddCollateString(pParse, + pPrefix, + coll_name); *ppPrefix = pPrefix; if (op == TK_VARIABLE) { Vdbe *v = pParse->pVdbe; @@ -977,8 +990,6 @@ exprAnalyze(SrcList * pSrc, /* the FROM clause */ Expr *pStr1 = 0; /* RHS of LIKE ends with wildcard. */ int isComplete = 0; - /* uppercase equivalent to lowercase. */ - int noCase = 0; /* Top-level operator. pExpr->op. */ int op; /* Parsing context. */ @@ -1165,59 +1176,22 @@ exprAnalyze(SrcList * pSrc, /* the FROM clause */ const u16 wtFlags = TERM_LIKEOPT | TERM_VIRTUAL | TERM_DYNAMIC; pLeft = pExpr->x.pList->a[1].pExpr; - pStr2 = sqlExprDup(db, pStr1, 0); - - /* - * Convert the lower bound to upper-case and the - * upper bound to lower-case (upper-case is less - * than lower-case in ASCII) so that the range - * constraints also work for BLOBs. - */ - if (noCase && !pParse->db->mallocFailed) { - int i; - char c; - pTerm->wtFlags |= TERM_LIKE; - for (i = 0; (c = pStr1->u.zToken[i]) != 0; i++) { - pStr1->u.zToken[i] = sqlToupper(c); - pStr2->u.zToken[i] = sqlTolower(c); - } - } + pStr2 = sqlExprDup(db, sqlExprSkipCollate(pStr1), 0); if (!db->mallocFailed) { u8 c, *pC; /* Last character before the first wildcard */ pC = (u8 *) & pStr2->u. zToken[sqlStrlen30(pStr2->u.zToken) - 1]; c = *pC; - if (noCase) { - /* The point is to increment the last character before the first - * wildcard. But if we increment '@', that will push it into the - * alphabetic range where case conversions will mess up the - * inequality. To avoid this, make sure to also run the full - * LIKE on all candidate expressions by clearing the isComplete flag - */ - if (c == 'A' - 1) - isComplete = 0; - c = sqlUpperToLower[c]; - } *pC = c + 1; } pNewExpr1 = sqlExprDup(db, pLeft, 0); - if (noCase) { - pNewExpr1 = - sqlExprAddCollateString(pParse, pNewExpr1, - "unicode_ci"); - } pNewExpr1 = sqlPExpr(pParse, TK_GE, pNewExpr1, pStr1); transferJoinMarkings(pNewExpr1, pExpr); idxNew1 = whereClauseInsert(pWC, pNewExpr1, wtFlags); testcase(idxNew1 == 0); exprAnalyze(pSrc, pWC, idxNew1); pNewExpr2 = sqlExprDup(db, pLeft, 0); - if (noCase) { - pNewExpr2 = - sqlExprAddCollateString(pParse, pNewExpr2, - "unicode_ci"); - } pNewExpr2 = sqlPExpr(pParse, TK_LT, pNewExpr2, pStr2); transferJoinMarkings(pNewExpr2, pExpr); idxNew2 = whereClauseInsert(pWC, pNewExpr2, wtFlags); diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation.test.lua index 79547361c..9e404282a 100755 --- a/test/sql-tap/collation.test.lua +++ b/test/sql-tap/collation.test.lua @@ -1,6 +1,6 @@ #!/usr/bin/env tarantool test = require("sqltester") -test:plan(174) +test:plan(173) local prefix = "collation-" @@ -477,13 +477,13 @@ for _, data_collation in ipairs(data_collations) do end end --- Like uses collation (only for unicode_ci and binary) +-- uses collation. If has explicit , use it +-- instead of implicit. local like_testcases = { {"2.0", [[ - CREATE TABLE tx1 (s1 VARCHAR(5) PRIMARY KEY); - CREATE INDEX I1 on tx1(s1 collate "unicode_ci"); + CREATE TABLE tx1 (s1 VARCHAR(5) PRIMARY KEY COLLATE "unicode_ci"); INSERT INTO tx1 VALUES('aaa'); INSERT INTO tx1 VALUES('Aab'); INSERT INTO tx1 VALUES('İac'); @@ -491,35 +491,32 @@ local like_testcases = ]], {0}}, {"2.1.1", "SELECT * FROM tx1 WHERE s1 LIKE 'A%' order by s1;", - {0, {"Aab", "aaa"}} }, + {0, {"aaa","Aab"}} }, {"2.1.2", "EXPLAIN QUERY PLAN SELECT * FROM tx1 WHERE s1 LIKE 'A%';", - {0, {0, 0, 0, "SEARCH TABLE TX1 USING COVERING INDEX I1 (S1>? AND S1? AND S1 1, "abc", 2, "ABX", 4, "abc", 5, "ABX" @@ -82,7 +82,7 @@ test:do_execsql_test( test:do_execsql_test( "like3-2.1", [[ - SELECT a, b FROM t2 WHERE +b LIKE 'ab%' ORDER BY +a; + SELECT a, b FROM t2 WHERE +b LIKE 'ab%' COLLATE "unicode_ci" ORDER BY +a; ]], { -- 1, "abc", 2, "ABX", 4, "abc", 5, "ABX" diff --git a/test/sql-tap/whereG.test.lua b/test/sql-tap/whereG.test.lua index 177d9d14e..590027023 100755 --- a/test/sql-tap/whereG.test.lua +++ b/test/sql-tap/whereG.test.lua @@ -89,7 +89,7 @@ test:do_execsql_test( [[ SELECT DISTINCT aname FROM album, composer, track - WHERE unlikely(cname LIKE '%bach%') + WHERE unlikely(cname LIKE '%bach%' COLLATE "unicode_ci") AND composer.cid=track.cid AND album.aid=track.aid; ]], { @@ -118,7 +118,7 @@ test:do_execsql_test( [[ SELECT DISTINCT aname FROM album, composer, track - WHERE likelihood(cname LIKE '%bach%', 0.5) + WHERE likelihood(cname LIKE '%bach%' COLLATE "unicode_ci", 0.5) AND composer.cid=track.cid AND album.aid=track.aid; ]], { @@ -146,7 +146,7 @@ test:do_execsql_test( [[ SELECT DISTINCT aname FROM album, composer, track - WHERE cname LIKE '%bach%' + WHERE cname LIKE '%bach%' COLLATE "unicode_ci" AND composer.cid=track.cid AND album.aid=track.aid; ]], { @@ -174,7 +174,7 @@ test:do_execsql_test( [[ SELECT DISTINCT aname FROM album, composer, track - WHERE cname LIKE '%bach%' + WHERE cname LIKE '%bach%' COLLATE "unicode_ci" AND unlikely(composer.cid=track.cid) AND unlikely(album.aid=track.aid); ]], { -- 2.20.1 (Apple Git-117)