[Tarantool-patches] [PATCH v4 19/53] sql: introduce mem_compare()

imeevma at tarantool.org imeevma at tarantool.org
Tue Mar 23 12:35:34 MSK 2021


This patch introduces mem_compare(). Function mem_compare() compares the
first and the second MEMs and returns result of comparison.

Part of #5818
---
 src/box/sql/mem.c  | 241 +++++++++++++++++++++++++++++++++++++++++++++
 src/box/sql/mem.h  |   4 +
 src/box/sql/vdbe.c |  92 +----------------
 3 files changed, 247 insertions(+), 90 deletions(-)

diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 6120939d8..8119644ed 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -556,6 +556,247 @@ mem_arithmetic(const struct Mem *left, const struct Mem *right,
 	return 0;
 }
 
+static int
+compare_blobs(const struct Mem *left, const struct Mem *right, int *result)
+{
+	int nl = left->n;
+	int nr = right->n;
+	int minlen = MIN(nl, nr);
+
+	/*
+	 * It is possible to have a Blob value that has some non-zero content
+	 * followed by zero content.  But that only comes up for Blobs formed
+	 * by the OP_MakeRecord opcode, and such Blobs never get passed into
+	 * mem_compare().
+	 */
+	assert((left->flags & MEM_Zero) == 0 || nl == 0);
+	assert((right->flags & MEM_Zero) == 0 || nr == 0);
+
+	if (left->flags & right->flags & MEM_Zero) {
+		*result = left->u.nZero - right->u.nZero;
+		return 0;
+	}
+	if (left->flags & MEM_Zero) {
+		for (int i = 0; i < minlen; ++i) {
+			if (right->z[i] != 0) {
+				*result = -1;
+				return 0;
+			}
+		}
+		*result = left->u.nZero - nr;
+		return 0;
+	}
+	if (right->flags & MEM_Zero) {
+		for (int i = 0; i < minlen; ++i) {
+			if (left->z[i] != 0){
+				*result = 1;
+				return 0;
+			}
+		}
+		*result = right->u.nZero - nl;
+		return 0;
+	}
+	*result = memcmp(left->z, right->z, minlen);
+	if (*result != 0)
+		return 0;
+	*result = nl - nr;
+	return 0;
+}
+
+static int
+compare_numbers(const struct Mem *left, const struct Mem *right, int *result)
+{
+	int64_t il;
+	double dl;
+	int left_type = left->flags & (MEM_Int | MEM_UInt | MEM_Real);
+	if (left_type == MEM_Real) {
+		dl = left->u.r;
+	} else if (left_type != 0) {
+		il = left->u.i;
+	} else if ((left->flags & MEM_Str) != 0) {
+		bool is_l_neg;
+		if (sql_atoi64(left->z, &il, &is_l_neg, left->n) == 0)
+			left_type = is_l_neg ? MEM_Int : MEM_UInt;
+		else if (sqlAtoF(left->z, &dl, left->n) != 0)
+			left_type = MEM_Real;
+	}
+	if (left_type == 0) {
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
+			 mem_str(left), "numeric");
+		return -1;
+	}
+
+	int64_t ir;
+	double dr;
+	int right_type = right->flags & (MEM_Int | MEM_UInt | MEM_Real);
+	if (right_type == MEM_Real) {
+		dr = right->u.r;
+	} else if (right_type != 0) {
+		ir = right->u.i;
+	} else if ((right->flags & MEM_Str) != 0) {
+		bool is_r_neg;
+		if (sql_atoi64(right->z, &ir, &is_r_neg, right->n) == 0)
+			right_type = is_r_neg ? MEM_Int : MEM_UInt;
+		else if (sqlAtoF(right->z, &dr, right->n) != 0)
+			right_type = MEM_Real;
+	}
+	/* TODO: There should be check for rvalue type. */
+	if (right_type == 0) {
+		*result = -1;
+		return 0;
+	}
+	if (left_type == right_type) {
+		if (left_type == MEM_Real) {
+			if (dl > dr)
+				*result = 1;
+			else if (dl < dr)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		if (left_type == MEM_Int) {
+			if (il > ir)
+				*result = 1;
+			else if (il < ir)
+				*result = -1;
+			else
+				*result = 0;
+			return 0;
+		}
+		uint64_t ul = (uint64_t)il;
+		uint64_t ur = (uint64_t)ir;
+		if (ul > ur)
+			*result = 1;
+		else if (ul < ur)
+			*result = -1;
+		else
+			*result = 0;
+		return 0;
+	}
+	if (left_type == MEM_Real) {
+		if (right_type == MEM_Int) {
+			*result = double_compare_nint64(dl, ir, 1);
+			return 0;
+		}
+		*result = double_compare_uint64(dl, (uint64_t)ir, 1);
+		return 0;
+	}
+	if (right_type == MEM_Real) {
+		if (left_type == MEM_Int) {
+			*result = double_compare_nint64(dr, il, -1);
+			return 0;
+		}
+		*result = double_compare_uint64(dr, (uint64_t)il, -1);
+		return 0;
+	}
+	if (left_type == MEM_Int) {
+		*result = -1;
+		return 0;
+	}
+	assert(right_type == MEM_Int && left_type == MEM_UInt);
+	*result = 1;
+	return 0;
+}
+
+static int
+compare_strings(const struct Mem *left, const struct Mem *right, int *result,
+		struct coll *coll)
+{
+	char *sl;
+	uint32_t nl;
+	char bufl[BUF_SIZE];
+	if ((left->flags & MEM_Str) != 0) {
+		sl = left->z;
+		nl = left->n;
+	} else {
+		assert((left->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		sl = &bufl[0];
+		if ((left->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, sl, "%lld", left->u.i);
+		else if ((left->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, sl, "%llu", left->u.u);
+		else
+			sql_snprintf(BUF_SIZE, sl, "%!.15g", left->u.r);
+		nl = strlen(sl);
+	}
+
+	char *sr;
+	uint32_t nr;
+	char bufr[BUF_SIZE];
+	if ((right->flags & MEM_Str) != 0) {
+		sr = right->z;
+		nr = right->n;
+	} else {
+		assert((right->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0);
+		sr = &bufr[0];
+		if ((right->flags & MEM_Int) != 0)
+			sql_snprintf(BUF_SIZE, sr, "%lld", right->u.i);
+		else if ((right->flags & MEM_UInt) != 0)
+			sql_snprintf(BUF_SIZE, sr, "%llu", right->u.u);
+		else
+			sql_snprintf(BUF_SIZE, sr, "%!.15g", right->u.r);
+		nr = strlen(sr);
+	}
+	if (coll) {
+		*result = coll->cmp(sl, nl, sr, nr, coll);
+		return 0;
+	}
+	uint32_t minlen = MIN(nl, nr);
+	*result = memcmp(sl, sr, minlen);
+	if (*result != 0)
+		return 0;
+	*result = nl - nr;
+	return 0;
+}
+
+int
+mem_compare(const struct Mem *left, const struct Mem *right, int *result,
+	    enum field_type type, struct coll *coll)
+{
+	assert(((left->flags | right->flags) & MEM_Null) == 0);
+	if ((right->flags & MEM_Bool) != 0) {
+		if ((left->flags & MEM_Bool) != 0) {
+			if (left->u.b == right->u.b)
+				*result = 0;
+			else if (left->u.b)
+				*result = 1;
+			else
+				*result = -1;
+			return 0;
+		}
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
+			 mem_type_to_str(left), "boolean");
+		return -1;
+	}
+	if ((left->flags & MEM_Bool) != 0) {
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
+			 mem_type_to_str(right), "boolean");
+		return -1;
+	}
+	if ((right->flags & MEM_Blob) != 0) {
+		if ((left->flags & MEM_Blob) != 0)
+			return compare_blobs(left, right, result);
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
+			 mem_type_to_str(left), "varbinary");
+		return -1;
+	}
+	if ((left->flags & MEM_Blob) != 0) {
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
+			 mem_type_to_str(right), "varbinary");
+		return -1;
+	}
+	if (sql_type_is_numeric(type))
+		return compare_numbers(left, right, result);
+	if (type == FIELD_TYPE_STRING)
+		return compare_strings(left, right, result, coll);
+	if (((left->flags | right->flags) &
+	     (MEM_Int | MEM_UInt | MEM_Real)) != 0)
+		return compare_numbers(left, right, result);
+	assert((left->flags & MEM_Str) != 0 && (right->flags & MEM_Str) != 0);
+	return compare_strings(left, right, result, coll);
+}
+
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index 7cea07dc9..096ce6533 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -196,6 +196,10 @@ int
 mem_arithmetic(const struct Mem *left, const struct Mem *right,
 	       struct Mem *result, int op);
 
+int
+mem_compare(const struct Mem *left, const struct Mem *right, int *result,
+	    enum field_type type, struct coll *coll);
+
 /* One or more of the following flags are set to indicate the validOK
  * representations of the value stored in the Mem struct.
  *
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 55cb42932..726066c17 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1543,15 +1543,9 @@ case OP_Le:               /* same as TK_LE, jump, in1, in3 */
 case OP_Gt:               /* same as TK_GT, jump, in1, in3 */
 case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	int res, res2;      /* Result of the comparison of pIn1 against pIn3 */
-	u32 flags1;         /* Copy of initial value of pIn1->flags */
-	u32 flags3;         /* Copy of initial value of pIn3->flags */
 
 	pIn1 = &aMem[pOp->p1];
 	pIn3 = &aMem[pOp->p3];
-	flags1 = pIn1->flags;
-	flags3 = pIn3->flags;
-	enum field_type ft_p1 = pIn1->field_type;
-	enum field_type ft_p3 = pIn3->field_type;
 	if (mem_is_null(pIn1) || mem_is_null(pIn3)) {
 		/* One or both operands are NULL */
 		if (pOp->p5 & SQL_NULLEQ) {
@@ -1585,85 +1579,11 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 			}
 			break;
 		}
-	} else if (mem_is_boolean(pIn1) || mem_is_boolean(pIn3) ||
-		   mem_is_binary(pIn1) || mem_is_binary(pIn3)) {
-		if (!mem_is_same_type(pIn1, pIn3)) {
-			char *inconsistent_type = mem_is_boolean(pIn1) ||
-						  mem_is_binary(pIn1) ?
-						  mem_type_to_str(pIn3) :
-						  mem_type_to_str(pIn1);
-			char *expected_type = mem_is_boolean(pIn1) ||
-					      mem_is_binary(pIn1) ?
-					      mem_type_to_str(pIn1) :
-					      mem_type_to_str(pIn3);
-			diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-				 inconsistent_type, expected_type);
-			goto abort_due_to_error;
-		}
-		res = sqlMemCompare(pIn3, pIn1, NULL);
 	} else {
 		enum field_type type = pOp->p5 & FIELD_TYPE_MASK;
-		if (sql_type_is_numeric(type)) {
-			if (mem_is_string(pIn1)) {
-				mem_apply_numeric_type(pIn1);
-				flags3 = pIn3->flags;
-			}
-			if (mem_is_string(pIn3)) {
-				if (mem_apply_numeric_type(pIn3) != 0) {
-					diag_set(ClientError,
-						 ER_SQL_TYPE_MISMATCH,
-						 mem_str(pIn3),
-						 "numeric");
-					goto abort_due_to_error;
-				}
-			}
-			/* Handle the common case of integer comparison here, as an
-			 * optimization, to avoid a call to sqlMemCompare()
-			 */
-			if (mem_is_integer(pIn1) && mem_is_integer(pIn3)) {
-				if (!mem_is_unsigned(pIn1) &&
-				    !mem_is_unsigned(pIn3)) {
-					if (pIn3->u.i > pIn1->u.i)
-						res = +1;
-					else if (pIn3->u.i < pIn1->u.i)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_unsigned(pIn1) &&
-				    mem_is_unsigned(pIn3)) {
-					if (pIn3->u.u > pIn1->u.u)
-						res = +1;
-					else if (pIn3->u.u < pIn1->u.u)
-						res = -1;
-					else
-						res = 0;
-					goto compare_op;
-				}
-				if (mem_is_unsigned(pIn1) &&
-				    !mem_is_unsigned(pIn3)) {
-					res = -1;
-					goto compare_op;
-				}
-				res = 1;
-				goto compare_op;
-			}
-		} else if (type == FIELD_TYPE_STRING) {
-			if (!mem_is_string(pIn1) && mem_is_number(pIn1)) {
-				sqlVdbeMemStringify(pIn1);
-				flags1 = (pIn1->flags & ~MEM_TypeMask) | (flags1 & MEM_TypeMask);
-				assert(pIn1!=pIn3);
-			}
-			if (!mem_is_string(pIn3) && mem_is_number(pIn3)) {
-				sqlVdbeMemStringify(pIn3);
-				flags3 = (pIn3->flags & ~MEM_TypeMask) | (flags3 & MEM_TypeMask);
-			}
-		}
-		assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl==0);
-		res = sqlMemCompare(pIn3, pIn1, pOp->p4.pColl);
+		if (mem_compare(pIn3, pIn1, &res, type, pOp->p4.pColl))
+			goto abort_due_to_error;
 	}
-			compare_op:
 	switch( pOp->opcode) {
 	case OP_Eq:    res2 = res==0;     break;
 	case OP_Ne:    res2 = res;        break;
@@ -1673,14 +1593,6 @@ case OP_Ge: {             /* same as TK_GE, jump, in1, in3 */
 	default:       res2 = res>=0;     break;
 	}
 
-	/* Undo any changes made by mem_apply_type() to the input registers. */
-	assert((pIn1->flags & MEM_Dyn) == (flags1 & MEM_Dyn));
-	pIn1->flags = flags1;
-	pIn1->field_type = ft_p1;
-	assert((pIn3->flags & MEM_Dyn) == (flags3 & MEM_Dyn));
-	pIn3->flags = flags3;
-	pIn3->field_type = ft_p3;
-
 	if (pOp->p5 & SQL_STOREP2) {
 		iCompare = res;
 		res2 = res2!=0;  /* For this path res2 must be exactly 0 or 1 */
-- 
2.25.1



More information about the Tarantool-patches mailing list