[Tarantool-patches] [PATCH v5 19/52] sql: introduce arithmetic operations for MEM

Mergen Imeev imeevma at tarantool.org
Thu Apr 15 02:33:24 MSK 2021


Thank you for the review! My answer below. I also included whole patch here
since there was merge conflicts due to changes function name in the last patch.

On Thu, Apr 15, 2021 at 01:10:42AM +0200, Vladislav Shpilevoy wrote:
> Thanks for the fixes!
> 
> >>> diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
> >>> index 2d76ef88d..859e337aa 100644
> >>> --- a/src/box/sql/mem.c
> >>> +++ b/src/box/sql/mem.c
> >>> @@ -390,6 +390,240 @@ mem_concat(struct Mem *a, struct Mem *b, struct Mem *result)
> >>> +
> >>> +static int
> >>> +get_number(const struct Mem *mem, struct sql_num *number)
> >>> +{
> >>> + if ((mem->flags & MEM_Real) != 0) {
> >>> +   number->d = mem->u.r;
> >>> +   number->type = MEM_Real;
> >>> +   return 0;
> >>> + }
> >>> + if ((mem->flags & MEM_Int) != 0) {
> >>> +   number->i = mem->u.i;
> >>> +   number->type = MEM_Int;
> >>> +   number->is_neg = true;
> >>> +   return 0;
> >>> + }
> >>> + if ((mem->flags & MEM_UInt) != 0) {
> >>> +   number->u = mem->u.u;
> >>> +   number->type = MEM_UInt;
> >>> +   number->is_neg = false;
> >>> +   return 0;
> >>> + }
> >>> + if ((mem->flags & (MEM_Str | MEM_Blob)) == 0)
> >>> +   return -1;
> >>> + if ((mem->flags & MEM_Subtype) != 0)
> >>> +   return -1;
> >>> + if (sql_atoi64(mem->z, &number->i, &number->is_neg, mem->n) == 0) {
> >>> +   number->type = number->is_neg ? MEM_Int : MEM_UInt;
> >>> +   /*
> >>> +    * The next line should be removed along with the is_neg field
> >>> +    * of struct sql_num. The integer type tells us about the sign.
> >>> +    * However, if it is removed, the behavior of arithmetic
> >>> +    * operations will change.
> >>> +    */
> >>> +   number->is_neg = (mem->flags & MEM_Int) != 0;
> >>
> >> I don't understand that. How is it possible it mismatches the
> >> value returned from sql_atoi64()? And why isn't it just 'false' then?
> >> Because a few lines above you already checked (mem->flags & MEM_Int) != 0
> >> and it was false.
> >>
> > Not exactly right. For example:
> > 
> > tarantool> box.execute([[SELECT '-5' + 2;]])
> > ---
> > - metadata:
> >   - name: COLUMN_1
> >     type: integer
> >   rows:
> >   - [18446744073709551613]
> > ...
> > 
> > As you see, this is wrong. This is due to the fact, that MEM of type string do
> > not have MEM_Int set. Even though this is wrong, it is expected behaviour. I
> > created an issue for this: #5756. Since I didn't want to change this behaviour,
> > I added is_neg field to struct sql_num. This is clearly a hack and should be
> > fixed.
> 
> But that does not answer the second part of my question - why can't
> I set it to false here always?
> 
You are right, I forgot about this.

> ====================
> @@ -286,7 +286,7 @@ get_number(const struct Mem *mem, struct sql_num *number)
>  		 * However, if it is removed, the behavior of arithmetic
>  		 * operations will change.
>  		 */
> -		number->is_neg = (mem->flags & MEM_Int) != 0;
> +		number->is_neg = false;
>  		return 0;
>  	}
> ====================
> 
Thank you. I applied this diff and tested.

> Because (mem->flags & MEM_Int) == 0, otherwise it would return earlier above.
> 
> Also 'is_neg' is not used at all now in all places where get_number() is called.
> At least in this commit. I would propose to add it in the commit which needs it
> or remove it then now.
No, it is used in functions sql_add_int(), sql_sub_int(), etc. Actually, this is
the only patch that use this field.


New patch:


commit 184a2407e92466657dc44d4b7f9ff80599141010
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Sun Mar 14 11:51:52 2021 +0300

    sql: introduce arithmetic operations for MEM
    
    This patch introduces mem_add(), mem_sub(), mem_mul(), mem_div() and
    mem_rem(), which perform arithmetic operations on two MEMs. Operands
    must contain values of numeric types or values that can be converted
    to a number according to implicit casting rules.
    
    Part of #5818

diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 2f2f859e3..a8bbfd3ea 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -245,6 +245,240 @@ mem_concat(struct Mem *a, struct Mem *b, struct Mem *result)
 	return 0;
 }
 
+struct sql_num {
+	union {
+		int64_t i;
+		uint64_t u;
+		double d;
+	};
+	int type;
+	bool is_neg;
+};
+
+static int
+get_number(const struct Mem *mem, struct sql_num *number)
+{
+	if ((mem->flags & MEM_Real) != 0) {
+		number->d = mem->u.r;
+		number->type = MEM_Real;
+		return 0;
+	}
+	if ((mem->flags & MEM_Int) != 0) {
+		number->i = mem->u.i;
+		number->type = MEM_Int;
+		number->is_neg = true;
+		return 0;
+	}
+	if ((mem->flags & MEM_UInt) != 0) {
+		number->u = mem->u.u;
+		number->type = MEM_UInt;
+		number->is_neg = false;
+		return 0;
+	}
+	if ((mem->flags & (MEM_Str | MEM_Blob)) == 0)
+		return -1;
+	if ((mem->flags & MEM_Subtype) != 0)
+		return -1;
+	if (sql_atoi64(mem->z, &number->i, &number->is_neg, mem->n) == 0) {
+		number->type = number->is_neg ? MEM_Int : MEM_UInt;
+		/*
+		 * The next line should be removed along with the is_neg field
+		 * of struct sql_num. The integer type tells us about the sign.
+		 * However, if it is removed, the behavior of arithmetic
+		 * operations will change.
+		 */
+		number->is_neg = false;
+		return 0;
+	}
+	if (sqlAtoF(mem->z, &number->d, mem->n) != 0) {
+		number->type = MEM_Real;
+		return 0;
+	}
+	return -1;
+}
+
+static int
+arithmetic_prepare(const struct Mem *left, const struct Mem *right,
+		   struct sql_num *a, struct sql_num *b)
+{
+	if (get_number(right, b) != 0) {
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(right),
+			 "numeric");
+		return -1;
+	}
+	if (get_number(left, a) != 0) {
+		diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
+			 "numeric");
+		return -1;
+	}
+	assert(a->type != 0 && b->type != 0);
+	if (a->type == MEM_Real && b->type != MEM_Real) {
+		b->d = b->type == MEM_Int ? (double)b->i : (double)b->u;
+		b->type = MEM_Real;
+		return 0;
+	}
+	if (a->type != MEM_Real && b->type == MEM_Real) {
+		a->d = a->type == MEM_Int ? (double)a->i : (double)a->u;
+		a->type = MEM_Real;
+		return 0;
+	}
+	return 0;
+}
+
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+	if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+		return 0;
+
+	struct sql_num a, b;
+	if (arithmetic_prepare(left, right, &a, &b) != 0)
+		return -1;
+
+	assert(a.type != MEM_Real || a.type == b.type);
+	if (a.type == MEM_Real) {
+		result->u.r = a.d + b.d;
+		result->flags = MEM_Real;
+		return 0;
+	}
+
+	int64_t res;
+	bool is_neg;
+	if (sql_add_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+		return -1;
+	}
+	result->u.i = res;
+	result->flags = is_neg ? MEM_Int : MEM_UInt;
+	return 0;
+}
+
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+	if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+		return 0;
+
+	struct sql_num a, b;
+	if (arithmetic_prepare(left, right, &a, &b) != 0)
+		return -1;
+
+	assert(a.type != MEM_Real || a.type == b.type);
+	if (a.type == MEM_Real) {
+		result->u.r = a.d - b.d;
+		result->flags = MEM_Real;
+		return 0;
+	}
+
+	int64_t res;
+	bool is_neg;
+	if (sql_sub_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+		return -1;
+	}
+	result->u.i = res;
+	result->flags = is_neg ? MEM_Int : MEM_UInt;
+	return 0;
+}
+
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+	if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+		return 0;
+
+	struct sql_num a, b;
+	if (arithmetic_prepare(left, right, &a, &b) != 0)
+		return -1;
+
+	assert(a.type != MEM_Real || a.type == b.type);
+	if (a.type == MEM_Real) {
+		result->u.r = a.d * b.d;
+		result->flags = MEM_Real;
+		return 0;
+	}
+
+	int64_t res;
+	bool is_neg;
+	if (sql_mul_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+		return -1;
+	}
+	result->u.i = res;
+	result->flags = is_neg ? MEM_Int : MEM_UInt;
+	return 0;
+}
+
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+	if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+		return 0;
+
+	struct sql_num a, b;
+	if (arithmetic_prepare(left, right, &a, &b) != 0)
+		return -1;
+
+	assert(a.type != MEM_Real || a.type == b.type);
+	if (a.type == MEM_Real) {
+		if (b.d == 0.) {
+			diag_set(ClientError, ER_SQL_EXECUTE,
+				 "division by zero");
+			return -1;
+		}
+		result->u.r = a.d / b.d;
+		result->flags = MEM_Real;
+		return 0;
+	}
+
+	if (b.i == 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+		return -1;
+	}
+	int64_t res;
+	bool is_neg;
+	if (sql_div_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+		return -1;
+	}
+	result->u.i = res;
+	result->flags = is_neg ? MEM_Int : MEM_UInt;
+	return 0;
+}
+
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+	if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+		return 0;
+
+	struct sql_num a, b;
+	if (arithmetic_prepare(left, right, &a, &b) != 0)
+		return -1;
+
+	assert(a.type != MEM_Real || a.type == b.type);
+	/*
+	 * TODO: This operation works wrong when double d > INT64_MAX and
+	 * d < UINT64_MAX. Also, there may be precision losses due to
+	 * conversion integer to double and back.
+	 */
+	a.i = a.type == MEM_Real ? (int64_t)a.d : a.i;
+	b.i = b.type == MEM_Real ? (int64_t)b.d : b.i;
+	if (b.i == 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+		return -1;
+	}
+	int64_t res;
+	bool is_neg;
+	if (sql_rem_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+		diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+		return -1;
+	}
+	result->u.i = res;
+	result->flags = is_neg ? MEM_Int : MEM_UInt;
+	return 0;
+}
+
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
@@ -449,44 +683,6 @@ sql_value_type(sql_value *pVal)
 	return mem_mp_type(mem);
 }
 
-
-/*
- * pMem currently only holds a string type (or maybe a BLOB that we can
- * interpret as a string if we want to).  Compute its corresponding
- * numeric type, if has one.  Set the pMem->u.r and pMem->u.i fields
- * accordingly.
- */
-static u16 SQL_NOINLINE
-computeNumericType(Mem *pMem)
-{
-	assert((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) == 0);
-	assert((pMem->flags & (MEM_Str|MEM_Blob))!=0);
-	if (sqlAtoF(pMem->z, &pMem->u.r, pMem->n)==0)
-		return 0;
-	bool is_neg;
-	if (sql_atoi64(pMem->z, (int64_t *) &pMem->u.i, &is_neg, pMem->n) == 0)
-		return is_neg ? MEM_Int : MEM_UInt;
-	return MEM_Real;
-}
-
-/*
- * Return the numeric type for pMem, either MEM_Int or MEM_Real or both or
- * none.
- *
- * Unlike mem_apply_numeric_type(), this routine does not modify pMem->flags.
- * But it does set pMem->u.r and pMem->u.i appropriately.
- */
-u16
-numericType(Mem *pMem)
-{
-	if ((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
-		return pMem->flags & (MEM_Int | MEM_UInt | MEM_Real);
-	if (pMem->flags & (MEM_Str|MEM_Blob)) {
-		return computeNumericType(pMem);
-	}
-	return 0;
-}
-
 /*
  * The sqlValueBytes() routine returns the number of bytes in the
  * sql_value object assuming that it uses the encoding "enc".
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index d17ed0593..9539fbbd1 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -330,6 +330,40 @@ mem_move(struct Mem *to, struct Mem *from);
 int
 mem_concat(struct Mem *left, struct Mem *right, struct Mem *result);
 
+/**
+ * Add the first MEM to the second MEM and write the result to the third MEM.
+ */
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Subtract the second MEM from the first MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Multiply the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write integer part of the result
+ * to the third MEM.
+ */
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
 /**
  * Simple type to str convertor. It is used to simplify
  * error reporting.
@@ -347,8 +381,6 @@ mem_mp_type(struct Mem *mem);
 
 enum mp_type
 sql_value_type(struct Mem *);
-u16
-numericType(Mem *pMem);
 
 int sqlValueBytes(struct Mem *);
 
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index e78229581..fb2a5ccc1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1076,6 +1076,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Add: {                 /* same as TK_PLUS, in1, in2, out3 */
+	pIn1 = &aMem[pOp->p1];
+	pIn2 = &aMem[pOp->p2];
+	pOut = &aMem[pOp->p3];
+	if (mem_add(pIn2, pIn1, pOut) != 0)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: Multiply P1 P2 P3 * *
  * Synopsis: r[P3]=r[P1]*r[P2]
  *
@@ -1084,6 +1093,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Multiply: {            /* same as TK_STAR, in1, in2, out3 */
+	pIn1 = &aMem[pOp->p1];
+	pIn2 = &aMem[pOp->p2];
+	pOut = &aMem[pOp->p3];
+	if (mem_mul(pIn2, pIn1, pOut) != 0)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: Subtract P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]-r[P1]
  *
@@ -1091,6 +1109,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Subtract: {           /* same as TK_MINUS, in1, in2, out3 */
+	pIn1 = &aMem[pOp->p1];
+	pIn2 = &aMem[pOp->p2];
+	pOut = &aMem[pOp->p3];
+	if (mem_sub(pIn2, pIn1, pOut) != 0)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: Divide P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]/r[P1]
  *
@@ -1099,6 +1126,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * register P1 is zero, then the result is NULL. If either input is
  * NULL, the result is NULL.
  */
+case OP_Divide: {             /* same as TK_SLASH, in1, in2, out3 */
+	pIn1 = &aMem[pOp->p1];
+	pIn2 = &aMem[pOp->p2];
+	pOut = &aMem[pOp->p3];
+	if (mem_div(pIn2, pIn1, pOut) != 0)
+		goto abort_due_to_error;
+	break;
+}
+
 /* Opcode: Remainder P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]%r[P1]
  *
@@ -1107,120 +1143,13 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * If the value in register P1 is zero the result is NULL.
  * If either operand is NULL, the result is NULL.
  */
-case OP_Add:                   /* same as TK_PLUS, in1, in2, out3 */
-case OP_Subtract:              /* same as TK_MINUS, in1, in2, out3 */
-case OP_Multiply:              /* same as TK_STAR, in1, in2, out3 */
-case OP_Divide:                /* same as TK_SLASH, in1, in2, out3 */
 case OP_Remainder: {           /* same as TK_REM, in1, in2, out3 */
-	u16 type1;      /* Numeric type of left operand */
-	u16 type2;      /* Numeric type of right operand */
-	i64 iA;         /* Integer value of left operand */
-	i64 iB;         /* Integer value of right operand */
-	double rA;      /* Real value of left operand */
-	double rB;      /* Real value of right operand */
-
 	pIn1 = &aMem[pOp->p1];
-	type1 = numericType(pIn1);
 	pIn2 = &aMem[pOp->p2];
-	type2 = numericType(pIn2);
-	pOut = vdbe_prepare_null_out(p, pOp->p3);
-	if (mem_is_any_null(pIn1, pIn2))
-		goto arithmetic_result_is_null;
-	if ((type1 & (MEM_Int | MEM_UInt)) != 0 &&
-	    (type2 & (MEM_Int | MEM_UInt)) != 0) {
-		iA = pIn1->u.i;
-		iB = pIn2->u.i;
-		bool is_lhs_neg = mem_is_nint(pIn1);
-		bool is_rhs_neg = mem_is_nint(pIn2);
-		bool is_res_neg;
-		switch( pOp->opcode) {
-		case OP_Add: {
-			if (sql_add_int(iA, is_lhs_neg, iB, is_rhs_neg,
-					(int64_t *) &iB, &is_res_neg) != 0)
-				goto integer_overflow;
-			break;
-		}
-		case OP_Subtract: {
-			if (sql_sub_int(iB, is_rhs_neg, iA, is_lhs_neg,
-					(int64_t *) &iB, &is_res_neg) != 0)
-				goto integer_overflow;
-			break;
-		}
-		case OP_Multiply: {
-			if (sql_mul_int(iA, is_lhs_neg, iB, is_rhs_neg,
-					(int64_t *) &iB, &is_res_neg) != 0)
-				goto integer_overflow;
-			break;
-		}
-		case OP_Divide: {
-			if (iA == 0)
-				goto division_by_zero;
-			if (sql_div_int(iB, is_rhs_neg, iA, is_lhs_neg,
-					(int64_t *) &iB, &is_res_neg) != 0)
-				goto integer_overflow;
-			break;
-		}
-		default: {
-			if (iA == 0)
-				goto division_by_zero;
-			if (iA==-1) iA = 1;
-			if (sql_rem_int(iB, is_rhs_neg, iA, is_lhs_neg,
-					(int64_t *) &iB, &is_res_neg) != 0)
-				goto integer_overflow;
-			break;
-		}
-		}
-		mem_set_int(pOut, iB, is_res_neg);
-	} else {
-		if (sqlVdbeRealValue(pIn1, &rA) != 0) {
-			diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-				 mem_str(pIn1), "numeric");
-			goto abort_due_to_error;
-		}
-		if (sqlVdbeRealValue(pIn2, &rB) != 0) {
-			diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-				 mem_str(pIn2), "numeric");
-			goto abort_due_to_error;
-		}
-		assert(((type1 | type2) & MEM_Real) != 0);
-		switch( pOp->opcode) {
-		case OP_Add:         rB += rA;       break;
-		case OP_Subtract:    rB -= rA;       break;
-		case OP_Multiply:    rB *= rA;       break;
-		case OP_Divide: {
-			if (rA == (double)0)
-				goto division_by_zero;
-			rB /= rA;
-			break;
-		}
-		default: {
-			iA = (i64)rA;
-			iB = (i64)rB;
-			if (iA == 0)
-				goto division_by_zero;
-			if (iA==-1) iA = 1;
-			rB = (double)(iB % iA);
-			break;
-		}
-		}
-		if (sqlIsNaN(rB)) {
-			goto arithmetic_result_is_null;
-		}
-		mem_set_double(pOut, rB);
-	}
-	break;
-
-arithmetic_result_is_null:
-	/* Force NULL be of type NUMBER. */
-	pOut->field_type = FIELD_TYPE_NUMBER;
+	pOut = &aMem[pOp->p3];
+	if (mem_rem(pIn2, pIn1, pOut) != 0)
+		goto abort_due_to_error;
 	break;
-
-division_by_zero:
-	diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
-	goto abort_due_to_error;
-integer_overflow:
-	diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
-	goto abort_due_to_error;
 }
 
 /* Opcode: CollSeq P1 * * P4


More information about the Tarantool-patches mailing list