[Tarantool-patches] [PATCH v5 20/52] sql: introduce mem_compare()
imeevma at tarantool.org
imeevma at tarantool.org
Fri Apr 9 20:57:26 MSK 2021
Thank you for the review! My answers and new patch below.
On 30.03.2021 02:03, Vladislav Shpilevoy wrote:
> Thanks for the patch!
>
> I don't understand. Why are there still sqlMemCompare(), sqlBlobCompare()
> if you also have mem_compare(), compare_blobs(), compare_numbers()?
>
Functions compare_blobs(), compare_numbers() and compare_strings() are used to
simplify mem_compare(). Also, now I use them in sqlMemCompare(). Function
mem_compare() is used to perform comparison of two values in opcodes OP_Ge,
OP_Le, etc. Function sqlMemCompare() is used for comparison in other parts.
There two functions Have a bit different way to compare MEMs, which is whong, I
believe. Still, removing each of them will change behaviour, so I decided to
left them both be for now. Function sqlBlobCompare() is the same as function
compare_blobs() (actually it is vice versa), so I removed this function.
>> diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
>> index 6120939d8..8119644ed 100644
>> --- a/src/box/sql/mem.c
>> +++ b/src/box/sql/mem.c
>> @@ -556,6 +556,247 @@ mem_arithmetic(const struct Mem *left, const struct Mem *right,
>> return 0;
>> }
>>
>> +static int
>> +compare_blobs(const struct Mem *left, const struct Mem *right, int *result)> +{
>> + int nl = left->n;
>> + int nr = right->n;
>> + int minlen = MIN(nl, nr);
>> +
>> + /*
>> + * It is possible to have a Blob value that has some non-zero content
>> + * followed by zero content. But that only comes up for Blobs formed
>> + * by the OP_MakeRecord opcode, and such Blobs never get passed into
>> + * mem_compare().
>> + */
>> + assert((left->flags & MEM_Zero) == 0 || nl == 0);
>> + assert((right->flags & MEM_Zero) == 0 || nr == 0);
>> +
>> + if (left->flags & right->flags & MEM_Zero) {
>
> Please, use explicit != 0, in the other places below too.
Fixed.
New patch:
commit 5a71515faac5f305fc3bd1ebbc20d1dd65bf027c
Author: Mergen Imeev <imeevma at gmail.com>
Date: Sun Mar 14 13:13:34 2021 +0300
sql: introduce mem_compare()
This patch introduces mem_compare(). Function mem_compare() compares the
first and the second MEMs and returns result of comparison.
Part of #5818
diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 859e337aa..eee72a7fe 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -624,6 +624,211 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
return 0;
}
+static int
+compare_blobs(const struct Mem *a, const struct Mem *b, int *result)
+{
+ int an = a->n;
+ int bn = b->n;
+ int minlen = MIN(an, bn);
+
+ /*
+ * It is possible to have a Blob value that has some non-zero content
+ * followed by zero content. But that only comes up for Blobs formed
+ * by the OP_MakeRecord opcode, and such Blobs never get passed into
+ * mem_compare().
+ */
+ assert((a->flags & MEM_Zero) == 0 || an == 0);
+ assert((b->flags & MEM_Zero) == 0 || bn == 0);
+
+ if ((a->flags & b->flags & MEM_Zero) != 0) {
+ *result = a->u.nZero - b->u.nZero;
+ return 0;
+ }
+ if ((a->flags & MEM_Zero) != 0) {
+ for (int i = 0; i < minlen; ++i) {
+ if (b->z[i] != 0) {
+ *result = -1;
+ return 0;
+ }
+ }
+ *result = a->u.nZero - bn;
+ return 0;
+ }
+ if ((b->flags & MEM_Zero) != 0) {
+ for (int i = 0; i < minlen; ++i) {
+ if (a->z[i] != 0){
+ *result = 1;
+ return 0;
+ }
+ }
+ *result = b->u.nZero - an;
+ return 0;
+ }
+ *result = memcmp(a->z, b->z, minlen);
+ if (*result != 0)
+ return 0;
+ *result = an - bn;
+ return 0;
+}
+
+static int
+compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
+{
+ struct sql_num a, b;
+ /* TODO: Here should be check for right value type. */
+ if (get_number(right, &b) != 0) {
+ *result = -1;
+ return 0;
+ }
+ if (get_number(left, &a) != 0) {
+ diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
+ "numeric");
+ return -1;
+ }
+ if (a.type == MEM_Real) {
+ if (b.type == MEM_Real) {
+ if (a.d > b.d)
+ *result = 1;
+ else if (a.d < b.d)
+ *result = -1;
+ else
+ *result = 0;
+ return 0;
+ }
+ if (b.type == MEM_Int)
+ *result = double_compare_nint64(a.d, b.i, 1);
+ else
+ *result = double_compare_uint64(a.d, b.u, 1);
+ return 0;
+ }
+ if (a.type == MEM_Int) {
+ if (b.type == MEM_Int) {
+ if (a.i > b.i)
+ *result = 1;
+ else if (a.i < b.i)
+ *result = -1;
+ else
+ *result = 0;
+ return 0;
+ }
+ if (b.type == MEM_UInt)
+ *result = -1;
+ else
+ *result = double_compare_nint64(b.d, a.i, -1);
+ return 0;
+ }
+ assert(a.type == MEM_UInt);
+ if (b.type == MEM_UInt) {
+ if (a.u > b.u)
+ *result = 1;
+ else if (a.u < b.u)
+ *result = -1;
+ else
+ *result = 0;
+ return 0;
+ }
+ if (b.type == MEM_Int)
+ *result = 1;
+ else
+ *result = double_compare_uint64(b.d, a.u, -1);
+ return 0;
+}
+
+static int
+compare_strings(const struct Mem *left, const struct Mem *right, int *result,
+ const struct coll *coll)
+{
+ char *a;
+ uint32_t an;
+ char bufl[BUF_SIZE];
+ if ((left->flags & MEM_Str) != 0) {
+ a = left->z;
+ an = left->n;
+ } else {
+ assert((left->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+ a = &bufl[0];
+ if ((left->flags & MEM_Int) != 0)
+ sql_snprintf(BUF_SIZE, a, "%lld", left->u.i);
+ else if ((left->flags & MEM_UInt) != 0)
+ sql_snprintf(BUF_SIZE, a, "%llu", left->u.u);
+ else
+ sql_snprintf(BUF_SIZE, a, "%!.15g", left->u.r);
+ an = strlen(a);
+ }
+
+ char *b;
+ uint32_t bn;
+ char bufr[BUF_SIZE];
+ if ((right->flags & MEM_Str) != 0) {
+ b = right->z;
+ bn = right->n;
+ } else {
+ assert((right->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+ b = &bufr[0];
+ if ((right->flags & MEM_Int) != 0)
+ sql_snprintf(BUF_SIZE, b, "%lld", right->u.i);
+ else if ((right->flags & MEM_UInt) != 0)
+ sql_snprintf(BUF_SIZE, b, "%llu", right->u.u);
+ else
+ sql_snprintf(BUF_SIZE, b, "%!.15g", right->u.r);
+ bn = strlen(b);
+ }
+ if (coll) {
+ *result = coll->cmp(a, an, b, bn, coll);
+ return 0;
+ }
+ uint32_t minlen = MIN(an, bn);
+ *result = memcmp(a, b, minlen);
+ if (*result != 0)
+ return 0;
+ *result = an - bn;
+ return 0;
+}
+
+int
+mem_compare(const struct Mem *left, const struct Mem *right, int *result,
+ enum field_type type, struct coll *coll)
+{
+ assert(((left->flags | right->flags) & MEM_Null) == 0);
+ int flags_any = left->flags | right->flags;
+ int flags_all = left->flags & right->flags;
+
+ if ((flags_all & MEM_Bool) != 0) {
+ if (left->u.b == right->u.b)
+ *result = 0;
+ else if (left->u.b)
+ *result = 1;
+ else
+ *result = -1;
+ return 0;
+ }
+ if ((flags_any & MEM_Bool) != 0) {
+ char *str = (left->flags & MEM_Bool) == 0 ?
+ mem_type_to_str(left) : mem_type_to_str(right);
+ diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "boolean");
+ return -1;
+ }
+
+ if ((flags_all & MEM_Blob) != 0)
+ return compare_blobs(left, right, result);
+ if ((flags_any & MEM_Blob) != 0) {
+ char *str = (left->flags & MEM_Blob) == 0 ?
+ mem_type_to_str(left) : mem_type_to_str(right);
+ diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "varbinary");
+ return -1;
+ }
+
+ if (type == FIELD_TYPE_STRING)
+ return compare_strings(left, right, result, coll);
+
+ if (sql_type_is_numeric(type) ||
+ (flags_any & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
+ return compare_numbers(left, right, result);
+
+ assert((left->flags & MEM_Str) != 0 && (right->flags & MEM_Str) != 0);
+ return compare_strings(left, right, result, coll);
+}
+
static inline bool
mem_has_msgpack_subtype(struct Mem *mem)
{
@@ -2071,45 +2276,6 @@ sqlVdbeMemTooBig(Mem * p)
return 0;
}
-/*
- * Compare two blobs. Return negative, zero, or positive if the first
- * is less than, equal to, or greater than the second, respectively.
- * If one blob is a prefix of the other, then the shorter is the lessor.
- */
-static SQL_NOINLINE int
-sqlBlobCompare(const Mem * pB1, const Mem * pB2)
-{
- int c;
- int n1 = pB1->n;
- int n2 = pB2->n;
-
- /* It is possible to have a Blob value that has some non-zero content
- * followed by zero content. But that only comes up for Blobs formed
- * by the OP_MakeRecord opcode, and such Blobs never get passed into
- * sqlMemCompare().
- */
- assert((pB1->flags & MEM_Zero) == 0 || n1 == 0);
- assert((pB2->flags & MEM_Zero) == 0 || n2 == 0);
-
- if ((pB1->flags | pB2->flags) & MEM_Zero) {
- if (pB1->flags & pB2->flags & MEM_Zero) {
- return pB1->u.nZero - pB2->u.nZero;
- } else if (pB1->flags & MEM_Zero) {
- if (!isAllZero(pB2->z, pB2->n))
- return -1;
- return pB1->u.nZero - n2;
- } else {
- if (!isAllZero(pB1->z, pB1->n))
- return +1;
- return n1 - pB2->u.nZero;
- }
- }
- c = memcmp(pB1->z, pB2->z, n1 > n2 ? n2 : n1);
- if (c)
- return c;
- return n1 - n2;
-}
-
/*
* Compare the values contained by the two memory cells, returning
* negative, zero or positive if pMem1 is less than, equal to, or greater
@@ -2123,6 +2289,7 @@ int
sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
{
int f1, f2;
+ int res;
int combined_flags;
f1 = pMem1->flags;
@@ -2152,57 +2319,12 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
/* At least one of the two values is a number
*/
if ((combined_flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0) {
- if ((f1 & f2 & MEM_Int) != 0) {
- if (pMem1->u.i < pMem2->u.i)
- return -1;
- if (pMem1->u.i > pMem2->u.i)
- return +1;
- return 0;
- }
- if ((f1 & f2 & MEM_UInt) != 0) {
- if (pMem1->u.u < pMem2->u.u)
- return -1;
- if (pMem1->u.u > pMem2->u.u)
- return +1;
- return 0;
- }
- if ((f1 & f2 & MEM_Real) != 0) {
- if (pMem1->u.r < pMem2->u.r)
- return -1;
- if (pMem1->u.r > pMem2->u.r)
- return +1;
- return 0;
- }
- if ((f1 & MEM_Int) != 0) {
- if ((f2 & MEM_Real) != 0) {
- return double_compare_nint64(pMem2->u.r,
- pMem1->u.i, -1);
- } else {
- return -1;
- }
- }
- if ((f1 & MEM_UInt) != 0) {
- if ((f2 & MEM_Real) != 0) {
- return double_compare_uint64(pMem2->u.r,
- pMem1->u.u, -1);
- } else if ((f2 & MEM_Int) != 0) {
- return +1;
- } else {
- return -1;
- }
- }
- if ((f1 & MEM_Real) != 0) {
- if ((f2 & MEM_Int) != 0) {
- return double_compare_nint64(pMem1->u.r,
- pMem2->u.i, 1);
- } else if ((f2 & MEM_UInt) != 0) {
- return double_compare_uint64(pMem1->u.r,
- pMem2->u.u, 1);
- } else {
- return -1;
- }
- }
- return +1;
+ if ((f1 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+ return +1;
+ if ((f2 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+ return -1;
+ compare_numbers(pMem1, pMem2, &res);
+ return res;
}
/* If one value is a string and the other is a blob, the string is less.
@@ -2215,27 +2337,13 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
if ((f2 & MEM_Str) == 0) {
return -1;
}
- /* The collation sequence must be defined at this point, even if
- * the user deletes the collation sequence after the vdbe program is
- * compiled (this was not always the case).
- */
- if (pColl) {
- return vdbeCompareMemString(pMem1, pMem2, pColl);
- } else {
- size_t n = pMem1->n < pMem2->n ? pMem1->n : pMem2->n;
- int res;
- res = memcmp(pMem1->z, pMem2->z, n);
- if (res == 0)
- res = (int)pMem1->n - (int)pMem2->n;
- return res;
- }
- /* If a NULL pointer was passed as the collate function, fall through
- * to the blob case and use memcmp().
- */
+ compare_strings(pMem1, pMem2, &res, pColl);
+ return res;
}
/* Both values must be blobs. Compare using memcmp(). */
- return sqlBlobCompare(pMem1, pMem2);
+ compare_blobs(pMem1, pMem2, &res);
+ return res;
}
bool
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index 69a7d9f7a..6c022d8d8 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -226,6 +226,11 @@ mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
int
mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
+/** Compare two non-NULL MEMs and return the result of comparison. */
+int
+mem_compare(const struct Mem *left, const struct Mem *right, int *result,
+ enum field_type type, struct coll *coll);
+
/* One or more of the following flags are set to indicate the validOK
* representations of the value stored in the Mem struct.
*
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 67e1cc85a..6a923a8b6 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1607,15 +1607,9 @@ case OP_Le: /* same as TK_LE, jump, in1, in3 */
case OP_Gt: /* same as TK_GT, jump, in1, in3 */
case OP_Ge: { /* same as TK_GE, jump, in1, in3 */
int res, res2; /* Result of the comparison of pIn1 against pIn3 */
- u32 flags1; /* Copy of initial value of pIn1->flags */
- u32 flags3; /* Copy of initial value of pIn3->flags */
pIn1 = &aMem[pOp->p1];
pIn3 = &aMem[pOp->p3];
- flags1 = pIn1->flags;
- flags3 = pIn3->flags;
- enum field_type ft_p1 = pIn1->field_type;
- enum field_type ft_p3 = pIn3->field_type;
if (mem_is_any_null(pIn1, pIn3)) {
/* One or both operands are NULL */
if (pOp->p5 & SQL_NULLEQ) {
@@ -1649,82 +1643,11 @@ case OP_Ge: { /* same as TK_GE, jump, in1, in3 */
}
break;
}
- } else if (mem_is_bool(pIn1) || mem_is_bool(pIn3) ||
- mem_is_bin(pIn1) || mem_is_bin(pIn3)) {
- if (!mem_is_same_type(pIn1, pIn3)) {
- char *inconsistent_type = mem_is_bool(pIn1) ||
- mem_is_bin(pIn1) ?
- mem_type_to_str(pIn3) :
- mem_type_to_str(pIn1);
- char *expected_type = mem_is_bool(pIn1) ||
- mem_is_bin(pIn1) ?
- mem_type_to_str(pIn1) :
- mem_type_to_str(pIn3);
- diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
- inconsistent_type, expected_type);
- goto abort_due_to_error;
- }
- res = sqlMemCompare(pIn3, pIn1, NULL);
} else {
enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
- if (sql_type_is_numeric(type)) {
- if (mem_is_str(pIn1)) {
- mem_apply_numeric_type(pIn1);
- flags3 = pIn3->flags;
- }
- if (mem_is_str(pIn3)) {
- if (mem_apply_numeric_type(pIn3) != 0) {
- diag_set(ClientError,
- ER_SQL_TYPE_MISMATCH,
- mem_str(pIn3),
- "numeric");
- goto abort_due_to_error;
- }
- }
- /* Handle the common case of integer comparison here, as an
- * optimization, to avoid a call to sqlMemCompare()
- */
- if (mem_is_int(pIn1) && mem_is_int(pIn3)) {
- if (!mem_is_uint(pIn1) && !mem_is_uint(pIn3)) {
- if (pIn3->u.i > pIn1->u.i)
- res = +1;
- else if (pIn3->u.i < pIn1->u.i)
- res = -1;
- else
- res = 0;
- goto compare_op;
- }
- if (mem_is_uint(pIn1) && mem_is_uint(pIn3)) {
- if (pIn3->u.u > pIn1->u.u)
- res = +1;
- else if (pIn3->u.u < pIn1->u.u)
- res = -1;
- else
- res = 0;
- goto compare_op;
- }
- if (mem_is_uint(pIn1) && !mem_is_uint(pIn3)) {
- res = -1;
- goto compare_op;
- }
- res = 1;
- goto compare_op;
- }
- } else if (type == FIELD_TYPE_STRING) {
- if (!mem_is_str(pIn1) && mem_is_num(pIn1)) {
- sqlVdbeMemStringify(pIn1);
- flags1 = (pIn1->flags & ~MEM_TypeMask) | (flags1 & MEM_TypeMask);
- assert(pIn1!=pIn3);
- }
- if (!mem_is_str(pIn3) && mem_is_num(pIn3)) {
- sqlVdbeMemStringify(pIn3);
- flags3 = (pIn3->flags & ~MEM_TypeMask) | (flags3 & MEM_TypeMask);
- }
- }
- assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl==0);
- res = sqlMemCompare(pIn3, pIn1, pOp->p4.pColl);
+ if (mem_compare(pIn3, pIn1, &res, type, pOp->p4.pColl))
+ goto abort_due_to_error;
}
- compare_op:
switch( pOp->opcode) {
case OP_Eq: res2 = res==0; break;
case OP_Ne: res2 = res; break;
@@ -1734,14 +1657,6 @@ case OP_Ge: { /* same as TK_GE, jump, in1, in3 */
default: res2 = res>=0; break;
}
- /* Undo any changes made by mem_apply_type() to the input registers. */
- assert((pIn1->flags & MEM_Dyn) == (flags1 & MEM_Dyn));
- pIn1->flags = flags1;
- pIn1->field_type = ft_p1;
- assert((pIn3->flags & MEM_Dyn) == (flags3 & MEM_Dyn));
- pIn3->flags = flags3;
- pIn3->field_type = ft_p3;
-
if (pOp->p5 & SQL_STOREP2) {
iCompare = res;
res2 = res2!=0; /* For this path res2 must be exactly 0 or 1 */
More information about the Tarantool-patches
mailing list