[Tarantool-patches] [PATCH v5 20/52] sql: introduce mem_compare()

Mergen Imeev imeevma at tarantool.org
Thu Apr 15 02:40:29 MSK 2021


Thank you for review and suggestion! I applied your diff and tested. New patch
below.

On Thu, Apr 15, 2021 at 01:20:21AM +0200, Vladislav Shpilevoy wrote:
> Nice fixes!
> 
> Consider mine below:
> 
> ====================
> diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
> index 9de57bcb4..a7c2c16cb 100644
> --- a/src/box/sql/mem.c
> +++ b/src/box/sql/mem.c
> @@ -481,7 +481,7 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
>  int
>  mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result)
>  {
> -	if ((a->flags & MEM_Bool) == 0 || (b->flags & MEM_Bool) == 0)
> +	if ((a->flags & b->flags & MEM_Bool) == 0)
>  		return -1;
>  	if (a->u.b == b->u.b)
>  		*result = 0;
> @@ -495,7 +495,7 @@ mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result)
>  int
>  mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result)
>  {
> -	if ((a->flags & MEM_Blob) == 0 || (b->flags & MEM_Blob) == 0)
> +	if ((a->flags & b->flags & MEM_Blob) == 0)
>  		return -1;
>  	int an = a->n;
>  	int bn = b->n;
> @@ -640,7 +640,7 @@ mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
>  			sql_snprintf(BUF_SIZE, b, "%!.15g", right->u.r);
>  		bn = strlen(b);
>  	}
> -	if (coll) {
> +	if (coll != NULL) {
>  		*result = coll->cmp(a, an, b, bn, coll);
>  		return 0;
>  	}


New patch:

commit 37f32ea303fe8c01683a4f77ac87350b15f4a82e
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Sun Mar 14 13:13:34 2021 +0300

    sql: introduce mem_cmp_*() functions
    
    This patch introduces set of mem_cmp_*() functions. These functions are
    used to compare MEMs.
    
    Part of #5818

diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index a8bbfd3ea..529982624 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -479,6 +479,180 @@ mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
 	return 0;
 }
 
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result)
+{
+	if ((a->flags & b->flags & MEM_Bool) == 0)
+		return -1;
+	if (a->u.b == b->u.b)
+		*result = 0;
+	else if (a->u.b)
+		*result = 1;
+	else
+		*result = -1;
+	return 0;
+}
+
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result)
+{
+	if ((a->flags & b->flags & MEM_Blob) == 0)
+		return -1;
+	int an = a->n;
+	int bn = b->n;
+	int minlen = MIN(an, bn);
+
+	/*
+	 * It is possible to have a Blob value that has some non-zero content
+	 * followed by zero content.  But that only comes up for Blobs formed
+	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
+	 * mem_compare().
+	 */
+	assert((a->flags & MEM_Zero) == 0 || an == 0);
+	assert((b->flags & MEM_Zero) == 0 || bn == 0);
+
+	if ((a->flags & b->flags & MEM_Zero) != 0) {
+		*result = a->u.nZero - b->u.nZero;
+		return 0;
+	}
+	if ((a->flags & MEM_Zero) != 0) {
+		for (int i = 0; i < minlen; ++i) {
+			if (b->z[i] != 0) {
+				*result = -1;
+				return 0;
+			}
+		}
+		*result = a->u.nZero - bn;
+		return 0;
+	}
+	if ((b->flags & MEM_Zero) != 0) {
+		for (int i = 0; i < minlen; ++i) {
+			if (a->z[i] != 0){
+				*result = 1;
+				return 0;
+			}
+		}
+		*result = b->u.nZero - an;
+		return 0;
+	}
+	*result = memcmp(a->z, b->z, minlen);
+	if (*result != 0)
+		return 0;
+	*result = an - bn;
+	return 0;
+}
+
+int
+mem_cmp_num(const struct Mem *left, const struct Mem *right, int *result)
+{
+	struct sql_num a, b;
+	/* TODO: Here should be check for right value type. */
+	if (get_number(right, &b) != 0) {
+		*result = -1;
+		return 0;
+	}
+	if (get_number(left, &a) != 0)
+		return -1;
+	if (a.type == MEM_Real) {
+		if (b.type == MEM_Real) {
+			if (a.d > b.d)
+				*result = 1;
+			else if (a.d < b.d)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		if (b.type == MEM_Int)
+			*result = double_compare_nint64(a.d, b.i, 1);
+		else
+			*result = double_compare_uint64(a.d, b.u, 1);
+		return 0;
+	}
+	if (a.type == MEM_Int) {
+		if (b.type == MEM_Int) {
+			if (a.i > b.i)
+				*result = 1;
+			else if (a.i < b.i)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		if (b.type == MEM_UInt)
+			*result = -1;
+		else
+			*result = double_compare_nint64(b.d, a.i, -1);
+		return 0;
+	}
+	assert(a.type == MEM_UInt);
+	if (b.type == MEM_UInt) {
+		if (a.u > b.u)
+			*result = 1;
+		else if (a.u < b.u)
+			*result = -1;
+		else
+			*result = 0;
+		return 0;
+	}
+	if (b.type == MEM_Int)
+		*result = 1;
+	else
+		*result = double_compare_uint64(b.d, a.u, -1);
+	return 0;
+}
+
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll)
+{
+	char *a;
+	uint32_t an;
+	char bufl[BUF_SIZE];
+	if ((left->flags & MEM_Str) != 0) {
+		a = left->z;
+		an = left->n;
+	} else {
+		assert((left->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		a = &bufl[0];
+		if ((left->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, a, "%lld", left->u.i);
+		else if ((left->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, a, "%llu", left->u.u);
+		else
+			sql_snprintf(BUF_SIZE, a, "%!.15g", left->u.r);
+		an = strlen(a);
+	}
+
+	char *b;
+	uint32_t bn;
+	char bufr[BUF_SIZE];
+	if ((right->flags & MEM_Str) != 0) {
+		b = right->z;
+		bn = right->n;
+	} else {
+		assert((right->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		b = &bufr[0];
+		if ((right->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, b, "%lld", right->u.i);
+		else if ((right->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, b, "%llu", right->u.u);
+		else
+			sql_snprintf(BUF_SIZE, b, "%!.15g", right->u.r);
+		bn = strlen(b);
+	}
+	if (coll != NULL) {
+		*result = coll->cmp(a, an, b, bn, coll);
+		return 0;
+	}
+	uint32_t minlen = MIN(an, bn);
+	*result = memcmp(a, b, minlen);
+	if (*result != 0)
+		return 0;
+	*result = an - bn;
+	return 0;
+}
+
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
@@ -1926,45 +2100,6 @@ sqlVdbeMemTooBig(Mem * p)
 	return 0;
 }
 
-/*
- * Compare two blobs.  Return negative, zero, or positive if the first
- * is less than, equal to, or greater than the second, respectively.
- * If one blob is a prefix of the other, then the shorter is the lessor.
- */
-static SQL_NOINLINE int
-sqlBlobCompare(const Mem * pB1, const Mem * pB2)
-{
-	int c;
-	int n1 = pB1->n;
-	int n2 = pB2->n;
-
-	/* It is possible to have a Blob value that has some non-zero content
-	 * followed by zero content.  But that only comes up for Blobs formed
-	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
-	 * sqlMemCompare().
-	 */
-	assert((pB1->flags & MEM_Zero) == 0 || n1 == 0);
-	assert((pB2->flags & MEM_Zero) == 0 || n2 == 0);
-
-	if ((pB1->flags | pB2->flags) & MEM_Zero) {
-		if (pB1->flags & pB2->flags & MEM_Zero) {
-			return pB1->u.nZero - pB2->u.nZero;
-		} else if (pB1->flags & MEM_Zero) {
-			if (!isAllZero(pB2->z, pB2->n))
-				return -1;
-			return pB1->u.nZero - n2;
-		} else {
-			if (!isAllZero(pB1->z, pB1->n))
-				return +1;
-			return n1 - pB2->u.nZero;
-		}
-	}
-	c = memcmp(pB1->z, pB2->z, n1 > n2 ? n2 : n1);
-	if (c)
-		return c;
-	return n1 - n2;
-}
-
 /*
  * Compare the values contained by the two memory cells, returning
  * negative, zero or positive if pMem1 is less than, equal to, or greater
@@ -1978,6 +2113,7 @@ int
 sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 {
 	int f1, f2;
+	int res;
 	int combined_flags;
 
 	f1 = pMem1->flags;
@@ -2007,57 +2143,12 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 	/* At least one of the two values is a number
 	 */
 	if ((combined_flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0) {
-		if ((f1 & f2 & MEM_Int) != 0) {
-			if (pMem1->u.i < pMem2->u.i)
-				return -1;
-			if (pMem1->u.i > pMem2->u.i)
-				return +1;
-			return 0;
-		}
-		if ((f1 & f2 & MEM_UInt) != 0) {
-			if (pMem1->u.u < pMem2->u.u)
-				return -1;
-			if (pMem1->u.u > pMem2->u.u)
-				return +1;
-			return 0;
-		}
-		if ((f1 & f2 & MEM_Real) != 0) {
-			if (pMem1->u.r < pMem2->u.r)
-				return -1;
-			if (pMem1->u.r > pMem2->u.r)
-				return +1;
-			return 0;
-		}
-		if ((f1 & MEM_Int) != 0) {
-			if ((f2 & MEM_Real) != 0) {
-				return double_compare_nint64(pMem2->u.r,
-							     pMem1->u.i, -1);
-			} else {
-				return -1;
-			}
-		}
-		if ((f1 & MEM_UInt) != 0) {
-			if ((f2 & MEM_Real) != 0) {
-				return double_compare_uint64(pMem2->u.r,
-							     pMem1->u.u, -1);
-			} else if ((f2 & MEM_Int) != 0) {
-				return +1;
-			} else {
-				return -1;
-			}
-		}
-		if ((f1 & MEM_Real) != 0) {
-			if ((f2 & MEM_Int) != 0) {
-				return double_compare_nint64(pMem1->u.r,
-							     pMem2->u.i, 1);
-			} else if ((f2 & MEM_UInt) != 0) {
-				return double_compare_uint64(pMem1->u.r,
-							     pMem2->u.u, 1);
-			} else {
-				return -1;
-			}
-		}
-		return +1;
+		if ((f1 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+			return +1;
+		if ((f2 & (MEM_Real | MEM_Int | MEM_UInt)) == 0)
+			return -1;
+		mem_cmp_num(pMem1, pMem2, &res);
+		return res;
 	}
 
 	/* If one value is a string and the other is a blob, the string is less.
@@ -2070,27 +2161,13 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
 		if ((f2 & MEM_Str) == 0) {
 			return -1;
 		}
-		/* The collation sequence must be defined at this point, even if
-		 * the user deletes the collation sequence after the vdbe program is
-		 * compiled (this was not always the case).
-		 */
-		if (pColl) {
-			return vdbeCompareMemString(pMem1, pMem2, pColl);
-		} else {
-			size_t n = pMem1->n < pMem2->n ? pMem1->n : pMem2->n;
-			int res;
-			res = memcmp(pMem1->z, pMem2->z, n);
-			if (res == 0)
-				res = (int)pMem1->n - (int)pMem2->n;
-			return res;
-		}
-		/* If a NULL pointer was passed as the collate function, fall through
-		 * to the blob case and use memcmp().
-		 */
+		mem_cmp_str(pMem1, pMem2, &res, pColl);
+		return res;
 	}
 
 	/* Both values must be blobs.  Compare using memcmp().  */
-	return sqlBlobCompare(pMem1, pMem2);
+	mem_cmp_bin(pMem1, pMem2, &res);
+	return res;
 }
 
 bool
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index 9539fbbd1..7e498356b 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -364,6 +364,39 @@ mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
 int
 mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
 
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * BOOLEAN type or their values are converted to VARBINARY according to implicit
+ * cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bool(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * VARBINARY type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_bin(const struct Mem *a, const struct Mem *b, int *result);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * STRING type or their values are converted to VARBINARY according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_str(const struct Mem *left, const struct Mem *right, int *result,
+	    const struct coll *coll);
+
+/**
+ * Compare two MEMs and return the result of comparison. MEMs should be of
+ * NUMBER type or their values are converted to NUMBER according to
+ * implicit cast rules. Original MEMs are not changed.
+ */
+int
+mem_cmp_num(const struct Mem *a, const struct Mem *b, int *result);
+
 /**
  * Simple type to str convertor. It is used to simplify
  * error reporting.
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index fb2a5ccc1..2fbbf32a0 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1607,15 +1607,10 @@ case OP_Le:               /* same as TK_LE, jump, in1, in3 */
 case OP_Gt:               /* same as TK_GT, jump, in1, in3 */
 case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	int res, res2;      /* Result of the comparison of pIn1 against pIn3 */
-	u32 flags1;         /* Copy of initial value of pIn1->flags */
-	u32 flags3;         /* Copy of initial value of pIn3->flags */
 
 	pIn1 = &aMem[pOp->p1];
 	pIn3 = &aMem[pOp->p3];
-	flags1 = pIn1->flags;
-	flags3 = pIn3->flags;
-	enum field_type ft_p1 = pIn1->field_type;
-	enum field_type ft_p3 = pIn3->field_type;
+	enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
 	if (mem_is_any_null(pIn1, pIn3)) {
 		/* One or both operands are NULL */
 		if (pOp->p5 & SQL_NULLEQ) {
@@ -1649,82 +1644,54 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 			}
 			break;
 		}
-	} else if (mem_is_bool(pIn1) || mem_is_bool(pIn3) ||
-		   mem_is_bin(pIn1) || mem_is_bin(pIn3)) {
-		if (!mem_is_same_type(pIn1, pIn3)) {
-			char *inconsistent_type = mem_is_bool(pIn1) ||
-						  mem_is_bin(pIn1) ?
-						  mem_type_to_str(pIn3) :
-						  mem_type_to_str(pIn1);
-			char *expected_type = mem_is_bool(pIn1) ||
-					      mem_is_bin(pIn1) ?
-					      mem_type_to_str(pIn1) :
-					      mem_type_to_str(pIn3);
-			diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-				 inconsistent_type, expected_type);
+	} else if (mem_is_bool(pIn3) || mem_is_bool(pIn1)) {
+		if (mem_cmp_bool(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bool(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "boolean");
+			goto abort_due_to_error;
+		}
+	} else if (mem_is_bin(pIn3) || mem_is_bin(pIn1)) {
+		if (mem_cmp_bin(pIn3, pIn1, &res) != 0) {
+			char *str = !mem_is_bin(pIn3) ?
+				    mem_type_to_str(pIn3) :
+				    mem_type_to_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "varbinary");
+			goto abort_due_to_error;
+		}
+	} else if (type == FIELD_TYPE_STRING) {
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
+			goto abort_due_to_error;
+		}
+	} else if (sql_type_is_numeric(type) || mem_is_num(pIn3) ||
+		   mem_is_num(pIn1)) {
+		type = FIELD_TYPE_NUMBER;
+		if (mem_cmp_num(pIn3, pIn1, &res) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "numeric");
 			goto abort_due_to_error;
 		}
-		res = sqlMemCompare(pIn3, pIn1, NULL);
 	} else {
-		enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
-		if (sql_type_is_numeric(type)) {
-			if (mem_is_str(pIn1)) {
-				mem_apply_numeric_type(pIn1);
-				flags3 = pIn3->flags;
-			}
-			if (mem_is_str(pIn3)) {
-				if (mem_apply_numeric_type(pIn3) != 0) {
-					diag_set(ClientError,
-						 ER_SQL_TYPE_MISMATCH,
-						 mem_str(pIn3),
-						 "numeric");
-					goto abort_due_to_error;
-				}
-			}
-			/* Handle the common case of integer comparison here, as an
-			 * optimization, to avoid a call to sqlMemCompare()
-			 */
-			if (mem_is_int(pIn1) && mem_is_int(pIn3)) {
-				if (mem_is_nint(pIn1) && mem_is_nint(pIn3)) {
-					if (pIn3->u.i > pIn1->u.i)
-						res = +1;
-					else if (pIn3->u.i < pIn1->u.i)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_uint(pIn1) && mem_is_uint(pIn3)) {
-					if (pIn3->u.u > pIn1->u.u)
-						res = +1;
-					else if (pIn3->u.u < pIn1->u.u)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_uint(pIn1) && mem_is_nint(pIn3)) {
-					res = -1;
-					goto compare_op;
-				}
-				res = 1;
-				goto compare_op;
-			}
-		} else if (type == FIELD_TYPE_STRING) {
-			if (!mem_is_str(pIn1) && mem_is_num(pIn1)) {
-				sqlVdbeMemStringify(pIn1);
-				flags1 = (pIn1->flags & ~MEM_TypeMask) | (flags1 & MEM_TypeMask);
-				assert(pIn1!=pIn3);
-			}
-			if (!mem_is_str(pIn3) && mem_is_num(pIn3)) {
-				sqlVdbeMemStringify(pIn3);
-				flags3 = (pIn3->flags & ~MEM_TypeMask) | (flags3 & MEM_TypeMask);
-			}
+		type = FIELD_TYPE_STRING;
+		assert(mem_is_str(pIn3) && mem_is_same_type(pIn3, pIn1));
+		if (mem_cmp_str(pIn3, pIn1, &res, pOp->p4.pColl) != 0) {
+			const char *str = mem_apply_type(pIn3, type) != 0 ?
+					  mem_str(pIn3) : mem_str(pIn1);
+			diag_set(ClientError, ER_SQL_TYPE_MISMATCH, str,
+				 "string");
+			goto abort_due_to_error;
 		}
-		assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl==0);
-		res = sqlMemCompare(pIn3, pIn1, pOp->p4.pColl);
 	}
-			compare_op:
+
 	switch( pOp->opcode) {
 	case OP_Eq:    res2 = res==0;     break;
 	case OP_Ne:    res2 = res;        break;
@@ -1734,14 +1701,6 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	default:       res2 = res>=0;     break;
 	}
 
-	/* Undo any changes made by mem_apply_type() to the input registers. */
-	assert((pIn1->flags & MEM_Dyn) == (flags1 & MEM_Dyn));
-	pIn1->flags = flags1;
-	pIn1->field_type = ft_p1;
-	assert((pIn3->flags & MEM_Dyn) == (flags3 & MEM_Dyn));
-	pIn3->flags = flags3;
-	pIn3->field_type = ft_p3;
-
 	if (pOp->p5 & SQL_STOREP2) {
 		iCompare = res;
 		res2 = res2!=0;  /* For this path res2 must be exactly 0 or 1 */


More information about the Tarantool-patches mailing list