[Tarantool-patches] [PATCH v5 20/52] sql: introduce mem_compare()

Mergen Imeev imeevma at tarantool.org
Tue Apr 13 21:33:04 MSK 2021


Thank you for the review! My answers, diff and new patch below. The patch was
almost completely rewritten.

On Sun, Apr 11, 2021 at 08:16:30PM +0200, Vladislav Shpilevoy wrote:
> Nice fixes!
> 
> This is the last email for today, I will continue the review of
> the patchset tomorrow.
> 
> See 6 comments below.
> 
> > diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
> > index 859e337aa..eee72a7fe 100644
> > --- a/src/box/sql/mem.c
> > +++ b/src/box/sql/mem.c
> > @@ -624,6 +624,211 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
> >  	return 0;
> >  }
> >  
> > +static int
> > +compare_blobs(const struct Mem *a, const struct Mem *b, int *result)
> > +{
> 
> 1. Would be good to have an assertion here that both types are MEM_Blob.
> 
Instead of assert I added 'if' here. Also, now this function is not static. See
the last comment.

> > +	int an = a->n;
> > +	int bn = b->n;
> > +	int minlen = MIN(an, bn);
> > +
> > +	/*
> > +	 * It is possible to have a Blob value that has some non-zero content
> > +	 * followed by zero content.  But that only comes up for Blobs formed
> > +	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
> > +	 * mem_compare().
> > +	 */
> > +	assert((a->flags & MEM_Zero) == 0 || an == 0);
> > +	assert((b->flags & MEM_Zero) == 0 || bn == 0);
> > +
> > +	if ((a->flags & b->flags & MEM_Zero) != 0) {
> > +		*result = a->u.nZero - b->u.nZero;
> > +		return 0;
> > +	}
> > +	if ((a->flags & MEM_Zero) != 0) {
> > +		for (int i = 0; i < minlen; ++i) {
> > +			if (b->z[i] != 0) {
> > +				*result = -1;
> > +				return 0;
> > +			}
> > +		}
> > +		*result = a->u.nZero - bn;
> > +		return 0;
> > +	}
> > +	if ((b->flags & MEM_Zero) != 0) {
> > +		for (int i = 0; i < minlen; ++i) {
> > +			if (a->z[i] != 0){
> > +				*result = 1;
> > +				return 0;
> > +			}
> > +		}
> > +		*result = b->u.nZero - an;
> > +		return 0;
> > +	}
> > +	*result = memcmp(a->z, b->z, minlen);
> > +	if (*result != 0)
> > +		return 0;
> > +	*result = an - bn;
> > +	return 0;
> 
> 2. compare_blobs never fails. So you can drop result out argument
> and return the comparison result as 'return'.
> 
See the last comment.

> > +}
> > +
> > +static int
> > +compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
> > +{
> > +	struct sql_num a, b;
> > +	/* TODO: Here should be check for right value type. */
> 
> 3. What if 'b' is a string, which can't be converted to a number?
> 
Then string will be greater than number. See issue #5891.

> > +	if (get_number(right, &b) != 0) {
> > +		*result = -1;
> > +		return 0;
> > +	}
> > +	if (get_number(left, &a) != 0) {> +		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
> > +			 "numeric");
> > +		return -1;
> > +	}
> > +	if (a.type == MEM_Real) {
> > +		if (b.type == MEM_Real) {
> > +			if (a.d > b.d)
> > +				*result = 1;
> > +			else if (a.d < b.d)
> > +				*result = -1;
> > +			else
> > +				*result = 0;
> > +			return 0;
> > +		}
> > +		if (b.type == MEM_Int)
> > +			*result = double_compare_nint64(a.d, b.i, 1);
> > +		else
> > +			*result = double_compare_uint64(a.d, b.u, 1);
> > +		return 0;
> > +	}
> > +	if (a.type == MEM_Int) {
> > +		if (b.type == MEM_Int) {
> > +			if (a.i > b.i)
> > +				*result = 1;
> > +			else if (a.i < b.i)
> > +				*result = -1;
> > +			else
> > +				*result = 0;
> > +			return 0;
> > +		}
> > +		if (b.type == MEM_UInt)
> > +			*result = -1;
> > +		else
> > +			*result = double_compare_nint64(b.d, a.i, -1);
> > +		return 0;
> > +	}
> > +	assert(a.type == MEM_UInt);
> > +	if (b.type == MEM_UInt) {
> > +		if (a.u > b.u)
> > +			*result = 1;
> > +		else if (a.u < b.u)
> > +			*result = -1;
> > +		else
> > +			*result = 0;
> > +		return 0;
> > +	}
> > +	if (b.type == MEM_Int)
> > +		*result = 1;
> > +	else
> > +		*result = double_compare_uint64(b.d, a.u, -1);
> > +	return 0;
> > +}
> > +
> > +static int
> > +compare_strings(const struct Mem *left, const struct Mem *right, int *result,
> > +		const struct coll *coll)
> > +{
> > +	char *a;
> > +	uint32_t an;
> > +	char bufl[BUF_SIZE];
> > +	if ((left->flags & MEM_Str) != 0) {
> > +		a = left->z;
> > +		an = left->n;
> > +	} else {
> > +		assert((left->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
> > +		a = &bufl[0];
> > +		if ((left->flags & MEM_Int) != 0)
> > +			sql_snprintf(BUF_SIZE, a, "%lld", left->u.i);
> > +		else if ((left->flags & MEM_UInt) != 0)
> > +			sql_snprintf(BUF_SIZE, a, "%llu", left->u.u);
> > +		else
> > +			sql_snprintf(BUF_SIZE, a, "%!.15g", left->u.r);
> > +		an = strlen(a);
> > +	}
> > +
> > +	char *b;
> > +	uint32_t bn;
> > +	char bufr[BUF_SIZE];
> > +	if ((right->flags & MEM_Str) != 0) {
> > +		b = right->z;
> > +		bn = right->n;
> > +	} else {
> > +		assert((right->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
> > +		b = &bufr[0];
> > +		if ((right->flags & MEM_Int) != 0)
> > +			sql_snprintf(BUF_SIZE, b, "%lld", right->u.i);
> > +		else if ((right->flags & MEM_UInt) != 0)
> > +			sql_snprintf(BUF_SIZE, b, "%llu", right->u.u);
> > +		else
> > +			sql_snprintf(BUF_SIZE, b, "%!.15g", right->u.r);
> > +		bn = strlen(b);
> > +	}
> > +	if (coll) {
> > +		*result = coll->cmp(a, an, b, bn, coll);
> > +		return 0;
> > +	}
> > +	uint32_t minlen = MIN(an, bn);
> > +	*result = memcmp(a, b, minlen);
> > +	if (*result != 0)
> > +		return 0;
> > +	*result = an - bn;
> > +	return 0;
> 
> 4. It can't fail either. You can return result as 'return'.
> 
True, but I left it as it is since now it is now static. See the last comment.

> > +}
> > +
> > +int
> > +mem_compare(const struct Mem *left, const struct Mem *right, int *result,
> > +	    enum field_type type, struct coll *coll)
> > +{
> > +	assert(((left->flags | right->flags) & MEM_Null) == 0);
> > +	int flags_any = left->flags | right->flags;
> 
> 5. 'any' isn't needed until the next branch. You can move it right above
> 
Dropped this function.

> 	if ((flags_any & MEM_Bool) != 0) {
> 
> > +	int flags_all = left->flags & right->flags;
> > +
> > +	if ((flags_all & MEM_Bool) != 0) {
> > +		if (left->u.b == right->u.b)
> > +			*result = 0;
> > +		else if (left->u.b)
> > +			*result = 1;
> > +		else
> > +			*result = -1;
> > +		return 0;
> > +	}
> > +	if ((flags_any & MEM_Bool) != 0) {
> > +		char *str = (left->flags & MEM_Bool) == 0 ?
> > +			    mem_type_to_str(left) : mem_type_to_str(right);
> > +		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "boolean");
> > +		return -1;
> > +	}
> > +
> > +	if ((flags_all & MEM_Blob) != 0)
> > +		return compare_blobs(left, right, result);
> > +	if ((flags_any & MEM_Blob) != 0) {
> > +		char *str = (left->flags & MEM_Blob) == 0 ?
> > +			    mem_type_to_str(left) : mem_type_to_str(right);
> > +		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "varbinary");
> > +		return -1;
> > +	}
> > +
> > +	if (type == FIELD_TYPE_STRING)
> > +		return compare_strings(left, right, result, coll);
> > +
> > +	if (sql_type_is_numeric(type) ||
> > +	    (flags_any & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
> > +		return compare_numbers(left, right, result);
> > +
> > +	assert((left->flags & MEM_Str) != 0 && (right->flags & MEM_Str) != 0);
> > +	return compare_strings(left, right, result, coll);
> > +}
> > diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
> > index 69a7d9f7a..6c022d8d8 100644
> > --- a/src/box/sql/mem.h
> > +++ b/src/box/sql/mem.h
> > @@ -226,6 +226,11 @@ mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
> >  int
> >  mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
> >  
> > +/** Compare two non-NULL MEMs and return the result of comparison. */
> > +int
> > +mem_compare(const struct Mem *left, const struct Mem *right, int *result,
> > +	    enum field_type type, struct coll *coll);
> 
> 6. What is 'type'?
> 
I believe that is field type used during '... a IN (...);' Not sure that it is
the only place where type is given to the opcodes.

> Can you keep it out of the comparator somehow? For example, make it
> 3 functions: mem_cmp_as_str, mem_cmp_as_num, and just a generic mem_cmp
> without any types calling the first 2 comparators. Otherwise the type
> thing is super ugly. It leaks the details of opcodes into mem.c.
Thank you for suggestion. I was bothered by this too, but couldn't think of a
solution. I dropped mem_compare() and instead I now have set of mem_cmp_*()
functions. In their descriptions I wrote that they convert their values
according to implicit cast rules but this is not exactly true since now
our implicit cast rules are in mess. Though, in the end we should come to
some conclusion, I believe. And then we should fix all comparison opcodes. I
hope.


Diff:


diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 5116af519..9de57bcb4 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -478,9 +478,25 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
 	return 0;
 }
 
-static int
-compare_blobs(const struct Mem *a, const struct Mem *b, int *result)
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result)
 {
+	if ((a->flags & MEM_Bool) == 0 || (b->flags & MEM_Bool) == 0)
+		return -1;
+	if (a->u.b == b->u.b)
+		*result = 0;
+	else if (a->u.b)
+		*result = 1;
+	else
+		*result = -1;
+	return 0;
+}
+
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result)
+{
+	if ((a->flags & MEM_Blob) == 0 || (b->flags & MEM_Blob) == 0)
+		return -1;
 	int an = a->n;
 	int bn = b->n;
 	int minlen = MIN(an, bn);
@@ -525,8 +541,8 @@ compare_blobs(const struct Mem *a, const struct Mem *b, int *result)
 	return 0;
 }
 
-static int
-compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
+int
+mem_cmp_num(const struct Mem *left, const struct Mem *right, int *result)
 {
 	struct sql_num a, b;
 	/* TODO: Here should be check for right value type. */
@@ -534,11 +550,8 @@ compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
 		*result = -1;
 		return 0;
 	}
-	if (get_number(left, &a) != 0) {
-		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
-			 "numeric");
+	if (get_number(left, &a) != 0)
 		return -1;
-	}
 	if (a.type == MEM_Real) {
 		if (b.type == MEM_Real) {
 			if (a.d > b.d)
@@ -588,9 +601,9 @@ compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
 	return 0;
 }
 
-static int
-compare_strings(const struct Mem *left, const struct Mem *right, int *result,
-		const struct coll *coll)
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll)
 {
 	char *a;
 	uint32_t an;
@@ -639,50 +652,6 @@ compare_strings(const struct Mem *left, const struct Mem *right, int *result,
 	return 0;
 }
 
-int
-mem_compare(const struct Mem *left, const struct Mem *right, int *result,
-	    enum field_type type, struct coll *coll)
-{
-	assert(((left->flags | right->flags) & MEM_Null) == 0);
-	int flags_any = left->flags | right->flags;
-	int flags_all = left->flags & right->flags;
-
-	if ((flags_all & MEM_Bool) != 0) {
-		if (left->u.b == right->u.b)
-			*result = 0;
-		else if (left->u.b)
-			*result = 1;
-		else
-			*result = -1;
-		return 0;
-	}
-	if ((flags_any & MEM_Bool) != 0) {
-		char *str = (left->flags & MEM_Bool) == 0 ?
-			    mem_type_to_str(left) : mem_type_to_str(right);
-		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "boolean");
-		return -1;
-	}
-
-	if ((flags_all & MEM_Blob) != 0)
-		return compare_blobs(left, right, result);
-	if ((flags_any & MEM_Blob) != 0) {
-		char *str = (left->flags & MEM_Blob) == 0 ?
-			    mem_type_to_str(left) : mem_type_to_str(right);
-		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str, "varbinary");
-		return -1;
-	}
-
-	if (type == FIELD_TYPE_STRING)
-		return compare_strings(left, right, result, coll);
-
-	if (sql_type_is_numeric(type) ||
-	    (flags_any & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
-		return compare_numbers(left, right, result);
-
-	assert((left->flags & MEM_Str) != 0 && (right->flags & MEM_Str) != 0);
-	return compare_strings(left, right, result, coll);
-}
-
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
@@ -2177,7 +2146,7 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 			return +1;
 		if ((f2 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
 			return -1;
-		compare_numbers(pMem1, pMem2, &res);
+		mem_cmp_num(pMem1, pMem2, &res);
 		return res;
 	}
 
@@ -2191,12 +2160,12 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 		if ((f2 & MEM_Str) == 0) {
 			return -1;
 		}
-		compare_strings(pMem1, pMem2, &res, pColl);
+		mem_cmp_str(pMem1, pMem2, &res, pColl);
 		return res;
 	}
 
 	/* Both values must be blobs.  Compare using memcmp().  */
-	compare_blobs(pMem1, pMem2, &res);
+	mem_cmp_bin(pMem1, pMem2, &res);
 	return res;
 }
 
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index 80793ef29..7e498356b 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -364,10 +364,38 @@ mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
 int
 mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
 
-/** Compare two non-NULL MEMs and return the result of comparison. */
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * BOOLEAN type or their values are converted to VARBINARY according to implicit
+ * cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * VARBINARY type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * STRING type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * NUMBER type or their values are converted to NUMBER according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
 int
-mem_compare(const struct Mem *left, const struct Mem *right, int *result,
-	    enum field_type type, struct coll *coll);
+mem_cmp_num(const struct Mem *a, const struct Mem *b, int *result);
 
 /**
  * Simple type to str convertor. It is used to simplify
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 65fd63b2b..f917c7de9 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1610,6 +1610,7 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 
 	pIn1 = &aMem[pOp->p1];
 	pIn3 = &aMem[pOp->p3];
+	enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
 	if (mem_is_any_null(pIn1, pIn3)) {
 		/* One or both operands are NULL */
 		if (pOp->p5 & SQL_NULLEQ) {
@@ -1643,11 +1644,54 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 			}
 			break;
 		}
+	} else if (mem_is_bool(pIn3) || mem_is_bool(pIn1)) {
+		if (mem_cmp_bool(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bool(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "boolean");
+			goto abort_due_to_error;
+		}
+	} else if (mem_is_bin(pIn3) || mem_is_bin(pIn1)) {
+		if (mem_cmp_bin(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bin(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "varbinary");
+			goto abort_due_to_error;
+		}
+	} else if (type == FIELD_TYPE_STRING) {
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
+			goto abort_due_to_error;
+		}
+	} else if (sql_type_is_numeric(type) || mem_is_num(pIn3) ||
+		   mem_is_num(pIn1)) {
+		type = FIELD_TYPE_NUMBER;
+		if (mem_cmp_num(pIn3, pIn1, &res) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "numeric");
+			goto abort_due_to_error;
+		}
 	} else {
-		enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
-		if (mem_compare(pIn3, pIn1, &res, type, pOp->p4.pColl))
+		type = FIELD_TYPE_STRING;
+		assert(mem_is_str(pIn3) && mem_is_same_type(pIn3, pIn1));
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
 			goto abort_due_to_error;
+		}
 	}
+
 	switch( pOp->opcode) {
 	case OP_Eq:    res2 = res==0;     break;
 	case OP_Ne:    res2 = res;        break;



New patch:


commit ed2c851550f8d658dfea7f14f758222acef1665e
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Sun Mar 14 13:13:34 2021 +0300

    sql: introduce mem_cmp_*() functions
    
    This patch introduces set of mem_cmp_*() functions. These functions are
    used to compare MEMs.
    
    Part of #5818

diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 9a4a9b92e..9de57bcb4 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -478,6 +478,180 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
 	return 0;
 }
 
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result)
+{
+	if ((a->flags & MEM_Bool) == 0 || (b->flags & MEM_Bool) == 0)
+		return -1;
+	if (a->u.b == b->u.b)
+		*result = 0;
+	else if (a->u.b)
+		*result = 1;
+	else
+		*result = -1;
+	return 0;
+}
+
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result)
+{
+	if ((a->flags & MEM_Blob) == 0 || (b->flags & MEM_Blob) == 0)
+		return -1;
+	int an = a->n;
+	int bn = b->n;
+	int minlen = MIN(an, bn);
+
+	/*
+	 * It is possible to have a Blob value that has some non-zero content
+	 * followed by zero content.  But that only comes up for Blobs formed
+	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
+	 * mem_compare().
+	 */
+	assert((a->flags & MEM_Zero) == 0 || an == 0);
+	assert((b->flags & MEM_Zero) == 0 || bn == 0);
+
+	if ((a->flags & b->flags & MEM_Zero) != 0) {
+		*result = a->u.nZero - b->u.nZero;
+		return 0;
+	}
+	if ((a->flags & MEM_Zero) != 0) {
+		for (int i = 0; i < minlen; ++i) {
+			if (b->z[i] != 0) {
+				*result = -1;
+				return 0;
+			}
+		}
+		*result = a->u.nZero - bn;
+		return 0;
+	}
+	if ((b->flags & MEM_Zero) != 0) {
+		for (int i = 0; i < minlen; ++i) {
+			if (a->z[i] != 0){
+				*result = 1;
+				return 0;
+			}
+		}
+		*result = b->u.nZero - an;
+		return 0;
+	}
+	*result = memcmp(a->z, b->z, minlen);
+	if (*result != 0)
+		return 0;
+	*result = an - bn;
+	return 0;
+}
+
+int
+mem_cmp_num(const struct Mem *left, const struct Mem *right, int *result)
+{
+	struct sql_num a, b;
+	/* TODO: Here should be check for right value type. */
+	if (get_number(right, &b) != 0) {
+		*result = -1;
+		return 0;
+	}
+	if (get_number(left, &a) != 0)
+		return -1;
+	if (a.type == MEM_Real) {
+		if (b.type == MEM_Real) {
+			if (a.d > b.d)
+				*result = 1;
+			else if (a.d < b.d)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		if (b.type == MEM_Int)
+			*result = double_compare_nint64(a.d, b.i, 1);
+		else
+			*result = double_compare_uint64(a.d, b.u, 1);
+		return 0;
+	}
+	if (a.type == MEM_Int) {
+		if (b.type == MEM_Int) {
+			if (a.i > b.i)
+				*result = 1;
+			else if (a.i < b.i)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		if (b.type == MEM_UInt)
+			*result = -1;
+		else
+			*result = double_compare_nint64(b.d, a.i, -1);
+		return 0;
+	}
+	assert(a.type == MEM_UInt);
+	if (b.type == MEM_UInt) {
+		if (a.u > b.u)
+			*result = 1;
+		else if (a.u < b.u)
+			*result = -1;
+		else
+			*result = 0;
+		return 0;
+	}
+	if (b.type == MEM_Int)
+		*result = 1;
+	else
+		*result = double_compare_uint64(b.d, a.u, -1);
+	return 0;
+}
+
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll)
+{
+	char *a;
+	uint32_t an;
+	char bufl[BUF_SIZE];
+	if ((left->flags & MEM_Str) != 0) {
+		a = left->z;
+		an = left->n;
+	} else {
+		assert((left->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		a = &bufl[0];
+		if ((left->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, a, "%lld", left->u.i);
+		else if ((left->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, a, "%llu", left->u.u);
+		else
+			sql_snprintf(BUF_SIZE, a, "%!.15g", left->u.r);
+		an = strlen(a);
+	}
+
+	char *b;
+	uint32_t bn;
+	char bufr[BUF_SIZE];
+	if ((right->flags & MEM_Str) != 0) {
+		b = right->z;
+		bn = right->n;
+	} else {
+		assert((right->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		b = &bufr[0];
+		if ((right->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, b, "%lld", right->u.i);
+		else if ((right->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, b, "%llu", right->u.u);
+		else
+			sql_snprintf(BUF_SIZE, b, "%!.15g", right->u.r);
+		bn = strlen(b);
+	}
+	if (coll) {
+		*result = coll->cmp(a, an, b, bn, coll);
+		return 0;
+	}
+	uint32_t minlen = MIN(an, bn);
+	*result = memcmp(a, b, minlen);
+	if (*result != 0)
+		return 0;
+	*result = an - bn;
+	return 0;
+}
+
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
@@ -1925,45 +2099,6 @@ sqlVdbeMemTooBig(Mem * p)
 	return 0;
 }
 
-/*
- * Compare two blobs.  Return negative, zero, or positive if the first
- * is less than, equal to, or greater than the second, respectively.
- * If one blob is a prefix of the other, then the shorter is the lessor.
- */
-static SQL_NOINLINE int
-sqlBlobCompare(const Mem * pB1, const Mem * pB2)
-{
-	int c;
-	int n1 = pB1->n;
-	int n2 = pB2->n;
-
-	/* It is possible to have a Blob value that has some non-zero content
-	 * followed by zero content.  But that only comes up for Blobs formed
-	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
-	 * sqlMemCompare().
-	 */
-	assert((pB1->flags & MEM_Zero) == 0 || n1 == 0);
-	assert((pB2->flags & MEM_Zero) == 0 || n2 == 0);
-
-	if ((pB1->flags | pB2->flags) & MEM_Zero) {
-		if (pB1->flags & pB2->flags & MEM_Zero) {
-			return pB1->u.nZero - pB2->u.nZero;
-		} else if (pB1->flags & MEM_Zero) {
-			if (!isAllZero(pB2->z, pB2->n))
-				return -1;
-			return pB1->u.nZero - n2;
-		} else {
-			if (!isAllZero(pB1->z, pB1->n))
-				return +1;
-			return n1 - pB2->u.nZero;
-		}
-	}
-	c = memcmp(pB1->z, pB2->z, n1 > n2 ? n2 : n1);
-	if (c)
-		return c;
-	return n1 - n2;
-}
-
 /*
  * Compare the values contained by the two memory cells, returning
  * negative, zero or positive if pMem1 is less than, equal to, or greater
@@ -1977,6 +2112,7 @@ int
 sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 {
 	int f1, f2;
+	int res;
 	int combined_flags;
 
 	f1 = pMem1->flags;
@@ -2006,57 +2142,12 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 	/* At least one of the two values is a number
 	 */
 	if ((combined_flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0) {
-		if ((f1 & f2 & MEM_Int) != 0) {
-			if (pMem1->u.i < pMem2->u.i)
-				return -1;
-			if (pMem1->u.i > pMem2->u.i)
-				return +1;
-			return 0;
-		}
-		if ((f1 & f2 & MEM_UInt) != 0) {
-			if (pMem1->u.u < pMem2->u.u)
-				return -1;
-			if (pMem1->u.u > pMem2->u.u)
-				return +1;
-			return 0;
-		}
-		if ((f1 & f2 & MEM_Real) != 0) {
-			if (pMem1->u.r < pMem2->u.r)
-				return -1;
-			if (pMem1->u.r > pMem2->u.r)
-				return +1;
-			return 0;
-		}
-		if ((f1 & MEM_Int) != 0) {
-			if ((f2 & MEM_Real) != 0) {
-				return double_compare_nint64(pMem2->u.r,
-							     pMem1->u.i, -1);
-			} else {
-				return -1;
-			}
-		}
-		if ((f1 & MEM_UInt) != 0) {
-			if ((f2 & MEM_Real) != 0) {
-				return double_compare_uint64(pMem2->u.r,
-							     pMem1->u.u, -1);
-			} else if ((f2 & MEM_Int) != 0) {
-				return +1;
-			} else {
-				return -1;
-			}
-		}
-		if ((f1 & MEM_Real) != 0) {
-			if ((f2 & MEM_Int) != 0) {
-				return double_compare_nint64(pMem1->u.r,
-							     pMem2->u.i, 1);
-			} else if ((f2 & MEM_UInt) != 0) {
-				return double_compare_uint64(pMem1->u.r,
-							     pMem2->u.u, 1);
-			} else {
-				return -1;
-			}
-		}
-		return +1;
+		if ((f1 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+			return +1;
+		if ((f2 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+			return -1;
+		mem_cmp_num(pMem1, pMem2, &res);
+		return res;
 	}
 
 	/* If one value is a string and the other is a blob, the string is less.
@@ -2069,27 +2160,13 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 		if ((f2 & MEM_Str) == 0) {
 			return -1;
 		}
-		/* The collation sequence must be defined at this point, even if
-		 * the user deletes the collation sequence after the vdbe program is
-		 * compiled (this was not always the case).
-		 */
-		if (pColl) {
-			return vdbeCompareMemString(pMem1, pMem2, pColl);
-		} else {
-			size_t n = pMem1->n < pMem2->n ? pMem1->n : pMem2->n;
-			int res;
-			res = memcmp(pMem1->z, pMem2->z, n);
-			if (res == 0)
-				res = (int)pMem1->n - (int)pMem2->n;
-			return res;
-		}
-		/* If a NULL pointer was passed as the collate function, fall through
-		 * to the blob case and use memcmp().
-		 */
+		mem_cmp_str(pMem1, pMem2, &res, pColl);
+		return res;
 	}
 
 	/* Both values must be blobs.  Compare using memcmp().  */
-	return sqlBlobCompare(pMem1, pMem2);
+	mem_cmp_bin(pMem1, pMem2, &res);
+	return res;
 }
 
 bool
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index 9539fbbd1..7e498356b 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -364,6 +364,39 @@ mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
 int
 mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
 
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * BOOLEAN type or their values are converted to VARBINARY according to implicit
+ * cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * VARBINARY type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * STRING type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * NUMBER type or their values are converted to NUMBER according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_num(const struct Mem *a, const struct Mem *b, int *result);
+
 /**
  * Simple type to str convertor. It is used to simplify
  * error reporting.
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 0849875e2..f917c7de9 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1607,15 +1607,10 @@ case OP_Le:               /* same as TK_LE, jump, in1, in3 */
 case OP_Gt:               /* same as TK_GT, jump, in1, in3 */
 case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	int res, res2;      /* Result of the comparison of pIn1 against pIn3 */
-	u32 flags1;         /* Copy of initial value of pIn1->flags */
-	u32 flags3;         /* Copy of initial value of pIn3->flags */
 
 	pIn1 = &aMem[pOp->p1];
 	pIn3 = &aMem[pOp->p3];
-	flags1 = pIn1->flags;
-	flags3 = pIn3->flags;
-	enum field_type ft_p1 = pIn1->field_type;
-	enum field_type ft_p3 = pIn3->field_type;
+	enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
 	if (mem_is_any_null(pIn1, pIn3)) {
 		/* One or both operands are NULL */
 		if (pOp->p5 & SQL_NULLEQ) {
@@ -1649,82 +1644,54 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 			}
 			break;
 		}
-	} else if (mem_is_bool(pIn1) || mem_is_bool(pIn3) ||
-		   mem_is_bin(pIn1) || mem_is_bin(pIn3)) {
-		if (!mem_is_same_type(pIn1, pIn3)) {
-			char *inconsistent_type = mem_is_bool(pIn1) ||
-						  mem_is_bin(pIn1) ?
-						  mem_type_to_str(pIn3) :
-						  mem_type_to_str(pIn1);
-			char *expected_type = mem_is_bool(pIn1) ||
-					      mem_is_bin(pIn1) ?
-					      mem_type_to_str(pIn1) :
-					      mem_type_to_str(pIn3);
-			diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-				 inconsistent_type, expected_type);
+	} else if (mem_is_bool(pIn3) || mem_is_bool(pIn1)) {
+		if (mem_cmp_bool(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bool(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "boolean");
+			goto abort_due_to_error;
+		}
+	} else if (mem_is_bin(pIn3) || mem_is_bin(pIn1)) {
+		if (mem_cmp_bin(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bin(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "varbinary");
+			goto abort_due_to_error;
+		}
+	} else if (type == FIELD_TYPE_STRING) {
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
+			goto abort_due_to_error;
+		}
+	} else if (sql_type_is_numeric(type) || mem_is_num(pIn3) ||
+		   mem_is_num(pIn1)) {
+		type = FIELD_TYPE_NUMBER;
+		if (mem_cmp_num(pIn3, pIn1, &res) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "numeric");
 			goto abort_due_to_error;
 		}
-		res = sqlMemCompare(pIn3, pIn1, NULL);
 	} else {
-		enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
-		if (sql_type_is_numeric(type)) {
-			if (mem_is_str(pIn1)) {
-				mem_apply_numeric_type(pIn1);
-				flags3 = pIn3->flags;
-			}
-			if (mem_is_str(pIn3)) {
-				if (mem_apply_numeric_type(pIn3) != 0) {
-					diag_set(ClientError,
-						 ER_SQL_TYPE_MISMATCH,
-						 mem_str(pIn3),
-						 "numeric");
-					goto abort_due_to_error;
-				}
-			}
-			/* Handle the common case of integer comparison here, as an
-			 * optimization, to avoid a call to sqlMemCompare()
-			 */
-			if (mem_is_int(pIn1) && mem_is_int(pIn3)) {
-				if (mem_is_nint(pIn1) && mem_is_nint(pIn3)) {
-					if (pIn3->u.i > pIn1->u.i)
-						res = +1;
-					else if (pIn3->u.i < pIn1->u.i)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_uint(pIn1) && mem_is_uint(pIn3)) {
-					if (pIn3->u.u > pIn1->u.u)
-						res = +1;
-					else if (pIn3->u.u < pIn1->u.u)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_uint(pIn1) && mem_is_nint(pIn3)) {
-					res = -1;
-					goto compare_op;
-				}
-				res = 1;
-				goto compare_op;
-			}
-		} else if (type == FIELD_TYPE_STRING) {
-			if (!mem_is_str(pIn1) && mem_is_num(pIn1)) {
-				sqlVdbeMemStringify(pIn1);
-				flags1 = (pIn1->flags & ~MEM_TypeMask) | (flags1 & MEM_TypeMask);
-				assert(pIn1!=pIn3);
-			}
-			if (!mem_is_str(pIn3) && mem_is_num(pIn3)) {
-				sqlVdbeMemStringify(pIn3);
-				flags3 = (pIn3->flags & ~MEM_TypeMask) | (flags3 & MEM_TypeMask);
-			}
+		type = FIELD_TYPE_STRING;
+		assert(mem_is_str(pIn3) && mem_is_same_type(pIn3, pIn1));
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
+			goto abort_due_to_error;
 		}
-		assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl==0);
-		res = sqlMemCompare(pIn3, pIn1, pOp->p4.pColl);
 	}
-			compare_op:
+
 	switch( pOp->opcode) {
 	case OP_Eq:    res2 = res==0;     break;
 	case OP_Ne:    res2 = res;        break;
@@ -1734,14 +1701,6 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	default:       res2 = res>=0;     break;
 	}
 
-	/* Undo any changes made by mem_apply_type() to the input registers. */
-	assert((pIn1->flags & MEM_Dyn) == (flags1 & MEM_Dyn));
-	pIn1->flags = flags1;
-	pIn1->field_type = ft_p1;
-	assert((pIn3->flags & MEM_Dyn) == (flags3 & MEM_Dyn));
-	pIn3->flags = flags3;
-	pIn3->field_type = ft_p3;
-
 	if (pOp->p5 & SQL_STOREP2) {
 		iCompare = res;
 		res2 = res2!=0;  /* For this path res2 must be exactly 0 or 1 */


More information about the Tarantool-patches mailing list