[Tarantool-patches] [PATCH v5 19/52] sql: introduce arithmetic operations for MEM
Mergen Imeev
imeevma at tarantool.org
Thu Apr 15 02:33:24 MSK 2021
Thank you for the review! My answer below. I also included whole patch here
since there was merge conflicts due to changes function name in the last patch.
On Thu, Apr 15, 2021 at 01:10:42AM +0200, Vladislav Shpilevoy wrote:
> Thanks for the fixes!
>
> >>> diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
> >>> index 2d76ef88d..859e337aa 100644
> >>> --- a/src/box/sql/mem.c
> >>> +++ b/src/box/sql/mem.c
> >>> @@ -390,6 +390,240 @@ mem_concat(struct Mem *a, struct Mem *b, struct Mem *result)
> >>> +
> >>> +static int
> >>> +get_number(const struct Mem *mem, struct sql_num *number)
> >>> +{
> >>> + if ((mem->flags & MEM_Real) != 0) {
> >>> + number->d = mem->u.r;
> >>> + number->type = MEM_Real;
> >>> + return 0;
> >>> + }
> >>> + if ((mem->flags & MEM_Int) != 0) {
> >>> + number->i = mem->u.i;
> >>> + number->type = MEM_Int;
> >>> + number->is_neg = true;
> >>> + return 0;
> >>> + }
> >>> + if ((mem->flags & MEM_UInt) != 0) {
> >>> + number->u = mem->u.u;
> >>> + number->type = MEM_UInt;
> >>> + number->is_neg = false;
> >>> + return 0;
> >>> + }
> >>> + if ((mem->flags & (MEM_Str | MEM_Blob)) == 0)
> >>> + return -1;
> >>> + if ((mem->flags & MEM_Subtype) != 0)
> >>> + return -1;
> >>> + if (sql_atoi64(mem->z, &number->i, &number->is_neg, mem->n) == 0) {
> >>> + number->type = number->is_neg ? MEM_Int : MEM_UInt;
> >>> + /*
> >>> + * The next line should be removed along with the is_neg field
> >>> + * of struct sql_num. The integer type tells us about the sign.
> >>> + * However, if it is removed, the behavior of arithmetic
> >>> + * operations will change.
> >>> + */
> >>> + number->is_neg = (mem->flags & MEM_Int) != 0;
> >>
> >> I don't understand that. How is it possible it mismatches the
> >> value returned from sql_atoi64()? And why isn't it just 'false' then?
> >> Because a few lines above you already checked (mem->flags & MEM_Int) != 0
> >> and it was false.
> >>
> > Not exactly right. For example:
> >
> > tarantool> box.execute([[SELECT '-5' + 2;]])
> > ---
> > - metadata:
> > - name: COLUMN_1
> > type: integer
> > rows:
> > - [18446744073709551613]
> > ...
> >
> > As you see, this is wrong. This is due to the fact, that MEM of type string do
> > not have MEM_Int set. Even though this is wrong, it is expected behaviour. I
> > created an issue for this: #5756. Since I didn't want to change this behaviour,
> > I added is_neg field to struct sql_num. This is clearly a hack and should be
> > fixed.
>
> But that does not answer the second part of my question - why can't
> I set it to false here always?
>
You are right, I forgot about this.
> ====================
> @@ -286,7 +286,7 @@ get_number(const struct Mem *mem, struct sql_num *number)
> * However, if it is removed, the behavior of arithmetic
> * operations will change.
> */
> - number->is_neg = (mem->flags & MEM_Int) != 0;
> + number->is_neg = false;
> return 0;
> }
> ====================
>
Thank you. I applied this diff and tested.
> Because (mem->flags & MEM_Int) == 0, otherwise it would return earlier above.
>
> Also 'is_neg' is not used at all now in all places where get_number() is called.
> At least in this commit. I would propose to add it in the commit which needs it
> or remove it then now.
No, it is used in functions sql_add_int(), sql_sub_int(), etc. Actually, this is
the only patch that use this field.
New patch:
commit 184a2407e92466657dc44d4b7f9ff80599141010
Author: Mergen Imeev <imeevma at gmail.com>
Date: Sun Mar 14 11:51:52 2021 +0300
sql: introduce arithmetic operations for MEM
This patch introduces mem_add(), mem_sub(), mem_mul(), mem_div() and
mem_rem(), which perform arithmetic operations on two MEMs. Operands
must contain values of numeric types or values that can be converted
to a number according to implicit casting rules.
Part of #5818
diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 2f2f859e3..a8bbfd3ea 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -245,6 +245,240 @@ mem_concat(struct Mem *a, struct Mem *b, struct Mem *result)
return 0;
}
+struct sql_num {
+ union {
+ int64_t i;
+ uint64_t u;
+ double d;
+ };
+ int type;
+ bool is_neg;
+};
+
+static int
+get_number(const struct Mem *mem, struct sql_num *number)
+{
+ if ((mem->flags & MEM_Real) != 0) {
+ number->d = mem->u.r;
+ number->type = MEM_Real;
+ return 0;
+ }
+ if ((mem->flags & MEM_Int) != 0) {
+ number->i = mem->u.i;
+ number->type = MEM_Int;
+ number->is_neg = true;
+ return 0;
+ }
+ if ((mem->flags & MEM_UInt) != 0) {
+ number->u = mem->u.u;
+ number->type = MEM_UInt;
+ number->is_neg = false;
+ return 0;
+ }
+ if ((mem->flags & (MEM_Str | MEM_Blob)) == 0)
+ return -1;
+ if ((mem->flags & MEM_Subtype) != 0)
+ return -1;
+ if (sql_atoi64(mem->z, &number->i, &number->is_neg, mem->n) == 0) {
+ number->type = number->is_neg ? MEM_Int : MEM_UInt;
+ /*
+ * The next line should be removed along with the is_neg field
+ * of struct sql_num. The integer type tells us about the sign.
+ * However, if it is removed, the behavior of arithmetic
+ * operations will change.
+ */
+ number->is_neg = false;
+ return 0;
+ }
+ if (sqlAtoF(mem->z, &number->d, mem->n) != 0) {
+ number->type = MEM_Real;
+ return 0;
+ }
+ return -1;
+}
+
+static int
+arithmetic_prepare(const struct Mem *left, const struct Mem *right,
+ struct sql_num *a, struct sql_num *b)
+{
+ if (get_number(right, b) != 0) {
+ diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(right),
+ "numeric");
+ return -1;
+ }
+ if (get_number(left, a) != 0) {
+ diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
+ "numeric");
+ return -1;
+ }
+ assert(a->type != 0 && b->type != 0);
+ if (a->type == MEM_Real && b->type != MEM_Real) {
+ b->d = b->type == MEM_Int ? (double)b->i : (double)b->u;
+ b->type = MEM_Real;
+ return 0;
+ }
+ if (a->type != MEM_Real && b->type == MEM_Real) {
+ a->d = a->type == MEM_Int ? (double)a->i : (double)a->u;
+ a->type = MEM_Real;
+ return 0;
+ }
+ return 0;
+}
+
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+ return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+ return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+ result->u.r = a.d + b.d;
+ result->flags = MEM_Real;
+ return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_add_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+ return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+ return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+ return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+ result->u.r = a.d - b.d;
+ result->flags = MEM_Real;
+ return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_sub_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+ return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+ return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+ return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+ result->u.r = a.d * b.d;
+ result->flags = MEM_Real;
+ return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_mul_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+ return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+ return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+ return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+ if (b.d == 0.) {
+ diag_set(ClientError, ER_SQL_EXECUTE,
+ "division by zero");
+ return -1;
+ }
+ result->u.r = a.d / b.d;
+ result->flags = MEM_Real;
+ return 0;
+ }
+
+ if (b.i == 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+ return -1;
+ }
+ int64_t res;
+ bool is_neg;
+ if (sql_div_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+ return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (try_return_null(left, right, result, FIELD_TYPE_NUMBER))
+ return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+ return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ /*
+ * TODO: This operation works wrong when double d > INT64_MAX and
+ * d < UINT64_MAX. Also, there may be precision losses due to
+ * conversion integer to double and back.
+ */
+ a.i = a.type == MEM_Real ? (int64_t)a.d : a.i;
+ b.i = b.type == MEM_Real ? (int64_t)b.d : b.i;
+ if (b.i == 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+ return -1;
+ }
+ int64_t res;
+ bool is_neg;
+ if (sql_rem_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+ diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+ return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
static inline bool
mem_has_msgpack_subtype(struct Mem *mem)
{
@@ -449,44 +683,6 @@ sql_value_type(sql_value *pVal)
return mem_mp_type(mem);
}
-
-/*
- * pMem currently only holds a string type (or maybe a BLOB that we can
- * interpret as a string if we want to). Compute its corresponding
- * numeric type, if has one. Set the pMem->u.r and pMem->u.i fields
- * accordingly.
- */
-static u16 SQL_NOINLINE
-computeNumericType(Mem *pMem)
-{
- assert((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) == 0);
- assert((pMem->flags & (MEM_Str|MEM_Blob))!=0);
- if (sqlAtoF(pMem->z, &pMem->u.r, pMem->n)==0)
- return 0;
- bool is_neg;
- if (sql_atoi64(pMem->z, (int64_t *) &pMem->u.i, &is_neg, pMem->n) == 0)
- return is_neg ? MEM_Int : MEM_UInt;
- return MEM_Real;
-}
-
-/*
- * Return the numeric type for pMem, either MEM_Int or MEM_Real or both or
- * none.
- *
- * Unlike mem_apply_numeric_type(), this routine does not modify pMem->flags.
- * But it does set pMem->u.r and pMem->u.i appropriately.
- */
-u16
-numericType(Mem *pMem)
-{
- if ((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
- return pMem->flags & (MEM_Int | MEM_UInt | MEM_Real);
- if (pMem->flags & (MEM_Str|MEM_Blob)) {
- return computeNumericType(pMem);
- }
- return 0;
-}
-
/*
* The sqlValueBytes() routine returns the number of bytes in the
* sql_value object assuming that it uses the encoding "enc".
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index d17ed0593..9539fbbd1 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -330,6 +330,40 @@ mem_move(struct Mem *to, struct Mem *from);
int
mem_concat(struct Mem *left, struct Mem *right, struct Mem *result);
+/**
+ * Add the first MEM to the second MEM and write the result to the third MEM.
+ */
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Subtract the second MEM from the first MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Multiply the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write integer part of the result
+ * to the third MEM.
+ */
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
/**
* Simple type to str convertor. It is used to simplify
* error reporting.
@@ -347,8 +381,6 @@ mem_mp_type(struct Mem *mem);
enum mp_type
sql_value_type(struct Mem *);
-u16
-numericType(Mem *pMem);
int sqlValueBytes(struct Mem *);
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index e78229581..fb2a5ccc1 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1076,6 +1076,15 @@ case OP_Concat: { /* same as TK_CONCAT, in1, in2, out3 */
* and store the result in register P3.
* If either input is NULL, the result is NULL.
*/
+case OP_Add: { /* same as TK_PLUS, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_add(pIn2, pIn1, pOut) != 0)
+ goto abort_due_to_error;
+ break;
+}
+
/* Opcode: Multiply P1 P2 P3 * *
* Synopsis: r[P3]=r[P1]*r[P2]
*
@@ -1084,6 +1093,15 @@ case OP_Concat: { /* same as TK_CONCAT, in1, in2, out3 */
* and store the result in register P3.
* If either input is NULL, the result is NULL.
*/
+case OP_Multiply: { /* same as TK_STAR, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_mul(pIn2, pIn1, pOut) != 0)
+ goto abort_due_to_error;
+ break;
+}
+
/* Opcode: Subtract P1 P2 P3 * *
* Synopsis: r[P3]=r[P2]-r[P1]
*
@@ -1091,6 +1109,15 @@ case OP_Concat: { /* same as TK_CONCAT, in1, in2, out3 */
* and store the result in register P3.
* If either input is NULL, the result is NULL.
*/
+case OP_Subtract: { /* same as TK_MINUS, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_sub(pIn2, pIn1, pOut) != 0)
+ goto abort_due_to_error;
+ break;
+}
+
/* Opcode: Divide P1 P2 P3 * *
* Synopsis: r[P3]=r[P2]/r[P1]
*
@@ -1099,6 +1126,15 @@ case OP_Concat: { /* same as TK_CONCAT, in1, in2, out3 */
* register P1 is zero, then the result is NULL. If either input is
* NULL, the result is NULL.
*/
+case OP_Divide: { /* same as TK_SLASH, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_div(pIn2, pIn1, pOut) != 0)
+ goto abort_due_to_error;
+ break;
+}
+
/* Opcode: Remainder P1 P2 P3 * *
* Synopsis: r[P3]=r[P2]%r[P1]
*
@@ -1107,120 +1143,13 @@ case OP_Concat: { /* same as TK_CONCAT, in1, in2, out3 */
* If the value in register P1 is zero the result is NULL.
* If either operand is NULL, the result is NULL.
*/
-case OP_Add: /* same as TK_PLUS, in1, in2, out3 */
-case OP_Subtract: /* same as TK_MINUS, in1, in2, out3 */
-case OP_Multiply: /* same as TK_STAR, in1, in2, out3 */
-case OP_Divide: /* same as TK_SLASH, in1, in2, out3 */
case OP_Remainder: { /* same as TK_REM, in1, in2, out3 */
- u16 type1; /* Numeric type of left operand */
- u16 type2; /* Numeric type of right operand */
- i64 iA; /* Integer value of left operand */
- i64 iB; /* Integer value of right operand */
- double rA; /* Real value of left operand */
- double rB; /* Real value of right operand */
-
pIn1 = &aMem[pOp->p1];
- type1 = numericType(pIn1);
pIn2 = &aMem[pOp->p2];
- type2 = numericType(pIn2);
- pOut = vdbe_prepare_null_out(p, pOp->p3);
- if (mem_is_any_null(pIn1, pIn2))
- goto arithmetic_result_is_null;
- if ((type1 & (MEM_Int | MEM_UInt)) != 0 &&
- (type2 & (MEM_Int | MEM_UInt)) != 0) {
- iA = pIn1->u.i;
- iB = pIn2->u.i;
- bool is_lhs_neg = mem_is_nint(pIn1);
- bool is_rhs_neg = mem_is_nint(pIn2);
- bool is_res_neg;
- switch( pOp->opcode) {
- case OP_Add: {
- if (sql_add_int(iA, is_lhs_neg, iB, is_rhs_neg,
- (int64_t *) &iB, &is_res_neg) != 0)
- goto integer_overflow;
- break;
- }
- case OP_Subtract: {
- if (sql_sub_int(iB, is_rhs_neg, iA, is_lhs_neg,
- (int64_t *) &iB, &is_res_neg) != 0)
- goto integer_overflow;
- break;
- }
- case OP_Multiply: {
- if (sql_mul_int(iA, is_lhs_neg, iB, is_rhs_neg,
- (int64_t *) &iB, &is_res_neg) != 0)
- goto integer_overflow;
- break;
- }
- case OP_Divide: {
- if (iA == 0)
- goto division_by_zero;
- if (sql_div_int(iB, is_rhs_neg, iA, is_lhs_neg,
- (int64_t *) &iB, &is_res_neg) != 0)
- goto integer_overflow;
- break;
- }
- default: {
- if (iA == 0)
- goto division_by_zero;
- if (iA==-1) iA = 1;
- if (sql_rem_int(iB, is_rhs_neg, iA, is_lhs_neg,
- (int64_t *) &iB, &is_res_neg) != 0)
- goto integer_overflow;
- break;
- }
- }
- mem_set_int(pOut, iB, is_res_neg);
- } else {
- if (sqlVdbeRealValue(pIn1, &rA) != 0) {
- diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
- mem_str(pIn1), "numeric");
- goto abort_due_to_error;
- }
- if (sqlVdbeRealValue(pIn2, &rB) != 0) {
- diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
- mem_str(pIn2), "numeric");
- goto abort_due_to_error;
- }
- assert(((type1 | type2) & MEM_Real) != 0);
- switch( pOp->opcode) {
- case OP_Add: rB += rA; break;
- case OP_Subtract: rB -= rA; break;
- case OP_Multiply: rB *= rA; break;
- case OP_Divide: {
- if (rA == (double)0)
- goto division_by_zero;
- rB /= rA;
- break;
- }
- default: {
- iA = (i64)rA;
- iB = (i64)rB;
- if (iA == 0)
- goto division_by_zero;
- if (iA==-1) iA = 1;
- rB = (double)(iB % iA);
- break;
- }
- }
- if (sqlIsNaN(rB)) {
- goto arithmetic_result_is_null;
- }
- mem_set_double(pOut, rB);
- }
- break;
-
-arithmetic_result_is_null:
- /* Force NULL be of type NUMBER. */
- pOut->field_type = FIELD_TYPE_NUMBER;
+ pOut = &aMem[pOp->p3];
+ if (mem_rem(pIn2, pIn1, pOut) != 0)
+ goto abort_due_to_error;
break;
-
-division_by_zero:
- diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
- goto abort_due_to_error;
-integer_overflow:
- diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
- goto abort_due_to_error;
}
/* Opcode: CollSeq P1 * * P4
More information about the Tarantool-patches
mailing list