[Tarantool-patches] [PATCH v5 19/52] sql: introduce arithmetic operations for MEM

imeevma at tarantool.org imeevma at tarantool.org
Fri Apr 9 20:57:23 MSK 2021


Thank you for the review! My answers and new patch below.


On 30.03.2021 02:02, Vladislav Shpilevoy wrote:
> Thanks for the patch!
>
> See 3 comments below.
>
> On 23.03.2021 10:35, Mergen Imeev via Tarantool-patches wrote:
>> This patch introduces mem_arithmetic(). Function mem_arithmetic()
>> executes arithmetic operations on the first and the second MEMs and
>> writes the result to the third MEM.
>>
>> Part of #5818
>> ---
>>  src/box/sql/mem.c  | 224 +++++++++++++++++++++++++++++++++++++--------
>>  src/box/sql/mem.h  |   6 +-
>>  src/box/sql/vdbe.c | 111 +---------------------
>>  3 files changed, 193 insertions(+), 148 deletions(-)
>>
>> diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
>> index f160439c9..6120939d8 100644
>> --- a/src/box/sql/mem.c
>> +++ b/src/box/sql/mem.c
>> @@ -370,6 +370,192 @@ mem_concat(struct Mem *left, struct Mem *right, struct Mem *result)
>>    return 0;
>>  }
>>  
>> +int
>> +mem_arithmetic(const struct Mem *left, const struct Mem *right,
>> +         struct Mem *result, int op)
>> +{
>> +  sqlVdbeMemSetNull(result);
>
> 1. Lets use mem_clear() directly. You delete sqlVdbeMemSetNull() later
> anyway.
>
Fixed.

>> +  result->field_type = FIELD_TYPE_NUMBER;
>> +  if (((left->flags | right->flags) & MEM_Null) != 0)
>> +    return 0;
>> +
>> +  int64_t il;
>> +  bool is_l_neg;
>> +  double dl;
>> +  uint16_t type_left = 0;
>
> 2. Looks surprising when you use 'l' in 3 variables before
> and suddenly 'left' here.
>

> Tbh, the old names with A and B looked easier to read. Up to you,
> I can live with both.
>
Fixed, I think.

>> +  if ((left->flags & MEM_Real) != 0) {
>> +    dl = left->u.r;
>> +    type_left = MEM_Real;
>> +  } else if ((left->flags & MEM_Int) != 0) {
>> +    il = left->u.i;
>> +    type_left = MEM_Int;
>> +    is_l_neg = true;
>> +  } else if ((left->flags & MEM_UInt) != 0) {
>> +    il = left->u.i;
>> +    type_left = MEM_UInt;
>> +    is_l_neg = false;
>> +  } else if ((left->flags & (MEM_Str | MEM_Blob)) != 0) {
>> +    if (sql_atoi64(left->z, &il, &is_l_neg, left->n) == 0)
>> +      type_left = is_l_neg ? MEM_Int : MEM_UInt;
>> +    else if (sqlAtoF(left->z, &dl, left->n) != 0)
>> +      type_left = MEM_Real;
>> +  }
>> +
>> +  int64_t ir;
>> +  bool is_r_neg;
>> +  double dr;
>> +  uint16_t type_right = 0;
>> +  if ((right->flags & MEM_Real) != 0) {
>> +    dr = right->u.r;
>> +    type_right = MEM_Real;
>> +  } else if ((right->flags & MEM_Int) != 0) {
>> +    ir = right->u.i;
>> +    type_right = MEM_Int;
>> +    is_r_neg = true;
>> +  } else if ((right->flags & MEM_UInt) != 0) {
>> +    ir = right->u.i;
>> +    type_right = MEM_UInt;
>> +    is_r_neg = false;
>> +  } else if ((right->flags & (MEM_Str | MEM_Blob)) != 0) {
>> +    if (sql_atoi64(right->z, &ir, &is_r_neg, right->n) == 0)
>> +      type_right = is_r_neg ? MEM_Int : MEM_UInt;
>> +    else if (sqlAtoF(right->z, &dr, right->n) != 0)
>> +      type_right = MEM_Real;
>> +  }
>> +
>> +  if ((type_right & (MEM_Int | MEM_UInt | MEM_Real)) == 0) {
>> +    diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
>> +       mem_str(right), "numeric");
>> +    return -1;
>> +  }
>> +  if ((type_left & (MEM_Int | MEM_UInt | MEM_Real)) == 0) {
>> +    diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
>> +       mem_str(left), "numeric");
>> +    return -1;
>> +  }
>> +  if (((type_left | type_right) & MEM_Real) != 0) {
>> +    if (type_left == MEM_Int)
>> +      dl = (double)il;
>> +    else if (type_left == MEM_UInt)
>> +      dl = (double)(uint64_t)il;
>> +
>> +    if (type_right == MEM_Int)
>> +      dr = (double)ir;
>> +    else if (type_right == MEM_UInt)
>> +      dr = (double)(uint64_t)ir;
>> +
>> +    double dres;
>> +    switch(op) {
>> +    case OP_Add:
>> +      dres = dl + dr;
>> +      break;
>> +    case OP_Subtract:
>> +      dres = dl - dr;
>> +      break;
>> +    case OP_Multiply:
>> +      dres = dl * dr;
>> +      break;
>> +    case OP_Divide:
>> +      if (dr == 0.) {
>> +        diag_set(ClientError, ER_SQL_EXECUTE,
>> +           "division by zero");
>> +        return -1;
>> +      }
>> +      dres = dl / dr;
>> +      break;
>> +    case OP_Remainder: {
>> +      int64_t il = (int64_t)dl;
>> +      int64_t ir = (int64_t)dr;
>> +      if (ir == 0) {
>> +        diag_set(ClientError, ER_SQL_EXECUTE,
>> +           "division by zero");
>> +        return -1;
>> +      }
>> +      if (ir == -1)
>> +        ir = 1;
>> +      dres = (double)(il % ir);
>> +      break;
>> +    }
>> +    default:
>> +      unreachable();
>> +    }
>> +    if (sqlIsNaN(dres))
>> +      return 0;
>> +    result->u.r = dres;
>> +    result->flags = MEM_Real;
>> +    return 0;
>> +  }
>> +  int64_t ires;
>> +  /*
>> +   * TODO: This is wrong. Both these flags should already be set. This
>> +   * assignment done to not change behaviour of the function, which
>> +   * is currently bugged.
>> +   */
>> +  is_l_neg = (left->flags & MEM_Int) != 0;
>> +  is_r_neg = (right->flags & MEM_Int) != 0;
>> +  bool is_res_neg;
>> +  switch(op) {
>> +  case OP_Add:
>> +    if (sql_add_int(il, is_l_neg, ir, is_r_neg, &ires,
>> +        &is_res_neg) != 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "integer is overflowed");
>> +      return -1;
>> +    }
>> +    break;
>> +  case OP_Subtract:
>> +    if (sql_sub_int(il, is_l_neg, ir, is_r_neg, &ires,
>> +        &is_res_neg) != 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "integer is overflowed");
>> +      return -1;
>> +    }
>> +    break;
>> +  case OP_Multiply:
>> +    if (sql_mul_int(il, is_l_neg, ir, is_r_neg, &ires,
>> +        &is_res_neg) != 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "integer is overflowed");
>> +      return -1;
>> +    }
>> +    break;
>> +  case OP_Divide:
>> +    if (ir == 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "division by zero");
>> +      return -1;
>> +    }
>> +    if (sql_div_int(il, is_l_neg, ir, is_r_neg, &ires,
>> +        &is_res_neg) != 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "integer is overflowed");
>> +      return -1;
>> +    }
>> +    break;
>> +  case OP_Remainder: {
>> +    if (ir == 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "division by zero");
>> +      return -1;
>> +    }
>> +    if (ir == -1)
>> +      ir = 1;
>> +    if (sql_rem_int(il, is_l_neg, ir, is_r_neg, &ires,
>> +        &is_res_neg) != 0) {
>> +      diag_set(ClientError, ER_SQL_EXECUTE,
>> +         "integer is overflowed");
>> +      return -1;
>> +    }
>> +    break;
>> +  }
>> +  default:
>> +    unreachable();
>> +  }
>> +  result->u.i = ires;
>> +  result->flags = is_res_neg ? MEM_Int : MEM_UInt;
>> +  return 0;
>
> 3. The original code looked shorter and easier to read. Why did you
> pad it out so much?
>
> Additionally, since you now have a function for doing arith, you
> could make one function for each opcode with common parts as a
> static functions in mem.c, and call them right from 'case's in
> vdbe.c. So instead of one big case,case,case,case: arith which
> also has a switch inside, we would have proper case: plus,
> case: multiply, etc. Would be faster and would make each
> individual function simpler I think.
>
> The names could be mem_arith_plus(), mem_arith_mul(), mem_arith_minus(),
> etc.
Fixed. I named new functions mem_add(), mem_sub(), mem_mul(), mem_div() and
mem_rem(). Each of them simpler than this function.


New patch:

commit 25bd849d8ebd45d3865770f1c4c6fc0c02d53d6a
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Sun Mar 14 11:51:52 2021 +0300

    sql: introduce arithmetic operations for MEM
    
    This patch introduces mem_add(), mem_sub(), mem_mul(), mem_div() and
    mem_rem(), which perform arithmetic operations on two MEMs. Operands
    must contain values of numeric types or values that can be converted
    to a number according to implicit casting rules.
    
    Part of #5818

diff --git a/src/box/sql/mem.c b/src/box/sql/mem.c
index 2d76ef88d..859e337aa 100644
--- a/src/box/sql/mem.c
+++ b/src/box/sql/mem.c
@@ -390,6 +390,240 @@ mem_concat(struct Mem *a, struct Mem *b, struct Mem *result)
  return 0;
 }
 
+struct sql_num {
+ union {
+   int64_t i;
+   uint64_t u;
+   double d;
+ };
+ int type;
+ bool is_neg;
+};
+
+static int
+get_number(const struct Mem *mem, struct sql_num *number)
+{
+ if ((mem->flags & MEM_Real) != 0) {
+   number->d = mem->u.r;
+   number->type = MEM_Real;
+   return 0;
+ }
+ if ((mem->flags & MEM_Int) != 0) {
+   number->i = mem->u.i;
+   number->type = MEM_Int;
+   number->is_neg = true;
+   return 0;
+ }
+ if ((mem->flags & MEM_UInt) != 0) {
+   number->u = mem->u.u;
+   number->type = MEM_UInt;
+   number->is_neg = false;
+   return 0;
+ }
+ if ((mem->flags & (MEM_Str | MEM_Blob)) == 0)
+   return -1;
+ if ((mem->flags & MEM_Subtype) != 0)
+   return -1;
+ if (sql_atoi64(mem->z, &number->i, &number->is_neg, mem->n) == 0) {
+   number->type = number->is_neg ? MEM_Int : MEM_UInt;
+   /*
+    * The next line should be removed along with the is_neg field
+    * of struct sql_num. The integer type tells us about the sign.
+    * However, if it is removed, the behavior of arithmetic
+    * operations will change.
+    */
+   number->is_neg = (mem->flags & MEM_Int) != 0;
+   return 0;
+ }
+ if (sqlAtoF(mem->z, &number->d, mem->n) != 0) {
+   number->type = MEM_Real;
+   return 0;
+ }
+ return -1;
+}
+
+static int
+arithmetic_prepare(const struct Mem *left, const struct Mem *right,
+      struct sql_num *a, struct sql_num *b)
+{
+ if (get_number(right, b) != 0) {
+   diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(right),
+      "numeric");
+   return -1;
+ }
+ if (get_number(left, a) != 0) {
+   diag_set(ClientError, ER_SQL_TYPE_MISMATCH, mem_str(left),
+      "numeric");
+   return -1;
+ }
+ assert(a->type != 0 && b->type != 0);
+ if (a->type == MEM_Real && b->type != MEM_Real) {
+   b->d = b->type == MEM_Int ? (double)b->i : (double)b->u;
+   b->type = MEM_Real;
+   return 0;
+ }
+ if (a->type != MEM_Real && b->type == MEM_Real) {
+   a->d = a->type == MEM_Int ? (double)a->i : (double)a->u;
+   a->type = MEM_Real;
+   return 0;
+ }
+ return 0;
+}
+
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (is_result_null(left, right, result, FIELD_TYPE_NUMBER))
+   return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+   return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+   result->u.r = a.d + b.d;
+   result->flags = MEM_Real;
+   return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_add_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+   return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (is_result_null(left, right, result, FIELD_TYPE_NUMBER))
+   return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+   return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+   result->u.r = a.d - b.d;
+   result->flags = MEM_Real;
+   return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_sub_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+   return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (is_result_null(left, right, result, FIELD_TYPE_NUMBER))
+   return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+   return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+   result->u.r = a.d * b.d;
+   result->flags = MEM_Real;
+   return 0;
+ }
+
+ int64_t res;
+ bool is_neg;
+ if (sql_mul_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+   return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (is_result_null(left, right, result, FIELD_TYPE_NUMBER))
+   return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+   return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ if (a.type == MEM_Real) {
+   if (b.d == 0.) {
+     diag_set(ClientError, ER_SQL_EXECUTE,
+        "division by zero");
+     return -1;
+   }
+   result->u.r = a.d / b.d;
+   result->flags = MEM_Real;
+   return 0;
+ }
+
+ if (b.i == 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+   return -1;
+ }
+ int64_t res;
+ bool is_neg;
+ if (sql_div_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+   return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result)
+{
+ if (is_result_null(left, right, result, FIELD_TYPE_NUMBER))
+   return 0;
+
+ struct sql_num a, b;
+ if (arithmetic_prepare(left, right, &a, &b) != 0)
+   return -1;
+
+ assert(a.type != MEM_Real || a.type == b.type);
+ /*
+  * TODO: This operation works wrong when double d > INT64_MAX and
+  * d < UINT64_MAX. Also, there may be precision losses due to
+  * conversion integer to double and back.
+  */
+ a.i = a.type == MEM_Real ? (int64_t)a.d : a.i;
+ b.i = b.type == MEM_Real ? (int64_t)b.d : b.i;
+ if (b.i == 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
+   return -1;
+ }
+ int64_t res;
+ bool is_neg;
+ if (sql_rem_int(a.i, a.is_neg, b.i, b.is_neg, &res, &is_neg) != 0) {
+   diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
+   return -1;
+ }
+ result->u.i = res;
+ result->flags = is_neg ? MEM_Int : MEM_UInt;
+ return 0;
+}
+
 static inline bool
 mem_has_msgpack_subtype(struct Mem *mem)
 {
@@ -594,44 +828,6 @@ sql_value_type(sql_value *pVal)
  return mem_mp_type(mem);
 }
 
-
-/*
- * pMem currently only holds a string type (or maybe a BLOB that we can
- * interpret as a string if we want to).  Compute its corresponding
- * numeric type, if has one.  Set the pMem->u.r and pMem->u.i fields
- * accordingly.
- */
-static u16 SQL_NOINLINE
-computeNumericType(Mem *pMem)
-{
- assert((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) == 0);
- assert((pMem->flags & (MEM_Str|MEM_Blob))!=0);
- if (sqlAtoF(pMem->z, &pMem->u.r, pMem->n)==0)
-   return 0;
- bool is_neg;
- if (sql_atoi64(pMem->z, (int64_t *) &pMem->u.i, &is_neg, pMem->n) == 0)
-   return is_neg ? MEM_Int : MEM_UInt;
- return MEM_Real;
-}
-
-/*
- * Return the numeric type for pMem, either MEM_Int or MEM_Real or both or
- * none.
- *
- * Unlike mem_apply_numeric_type(), this routine does not modify pMem->flags.
- * But it does set pMem->u.r and pMem->u.i appropriately.
- */
-u16
-numericType(Mem *pMem)
-{
- if ((pMem->flags & (MEM_Int | MEM_UInt | MEM_Real)) != 0)
-   return pMem->flags & (MEM_Int | MEM_UInt | MEM_Real);
- if (pMem->flags & (MEM_Str|MEM_Blob)) {
-   return computeNumericType(pMem);
- }
- return 0;
-}
-
 /*
  * The sqlValueBytes() routine returns the number of bytes in the
  * sql_value object assuming that it uses the encoding "enc".
diff --git a/src/box/sql/mem.h b/src/box/sql/mem.h
index df273026b..69a7d9f7a 100644
--- a/src/box/sql/mem.h
+++ b/src/box/sql/mem.h
@@ -192,6 +192,40 @@ mem_move(struct Mem *to, struct Mem *from);
 int
 mem_concat(struct Mem *left, struct Mem *right, struct Mem *result);
 
+/**
+ * Add the first MEM to the second MEM and write the result to the third MEM.
+ */
+int
+mem_add(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Subtract the second MEM from the first MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_sub(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Multiply the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_mul(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write the result to the third
+ * MEM.
+ */
+int
+mem_div(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
+/**
+ * Divide the first MEM by the second MEM and write integer part of the result
+ * to the third MEM.
+ */
+int
+mem_rem(const struct Mem *left, const struct Mem *right, struct Mem *result);
+
 /* One or more of the following flags are set to indicate the validOK
  * representations of the value stored in the Mem struct.
  *
@@ -261,8 +295,6 @@ mem_mp_type(struct Mem *mem);
 
 enum mp_type
 sql_value_type(struct Mem *);
-u16
-numericType(Mem *pMem);
 
 int sqlValueBytes(struct Mem *);
 
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 6f3475147..67e1cc85a 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1076,6 +1076,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Add: {                 /* same as TK_PLUS, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_add(pIn2, pIn1, pOut) != 0)
+   goto abort_due_to_error;
+ break;
+}
+
 /* Opcode: Multiply P1 P2 P3 * *
  * Synopsis: r[P3]=r[P1]*r[P2]
  *
@@ -1084,6 +1093,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Multiply: {            /* same as TK_STAR, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_mul(pIn2, pIn1, pOut) != 0)
+   goto abort_due_to_error;
+ break;
+}
+
 /* Opcode: Subtract P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]-r[P1]
  *
@@ -1091,6 +1109,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * and store the result in register P3.
  * If either input is NULL, the result is NULL.
  */
+case OP_Subtract: {           /* same as TK_MINUS, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_sub(pIn2, pIn1, pOut) != 0)
+   goto abort_due_to_error;
+ break;
+}
+
 /* Opcode: Divide P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]/r[P1]
  *
@@ -1099,6 +1126,15 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * register P1 is zero, then the result is NULL. If either input is
  * NULL, the result is NULL.
  */
+case OP_Divide: {             /* same as TK_SLASH, in1, in2, out3 */
+ pIn1 = &aMem[pOp->p1];
+ pIn2 = &aMem[pOp->p2];
+ pOut = &aMem[pOp->p3];
+ if (mem_div(pIn2, pIn1, pOut) != 0)
+   goto abort_due_to_error;
+ break;
+}
+
 /* Opcode: Remainder P1 P2 P3 * *
  * Synopsis: r[P3]=r[P2]%r[P1]
  *
@@ -1107,120 +1143,13 @@ case OP_Concat: {           /* same as TK_CONCAT, in1, in2, out3 */
  * If the value in register P1 is zero the result is NULL.
  * If either operand is NULL, the result is NULL.
  */
-case OP_Add:                   /* same as TK_PLUS, in1, in2, out3 */
-case OP_Subtract:              /* same as TK_MINUS, in1, in2, out3 */
-case OP_Multiply:              /* same as TK_STAR, in1, in2, out3 */
-case OP_Divide:                /* same as TK_SLASH, in1, in2, out3 */
 case OP_Remainder: {           /* same as TK_REM, in1, in2, out3 */
- u16 type1;      /* Numeric type of left operand */
- u16 type2;      /* Numeric type of right operand */
- i64 iA;         /* Integer value of left operand */
- i64 iB;         /* Integer value of right operand */
- double rA;      /* Real value of left operand */
- double rB;      /* Real value of right operand */
-
  pIn1 = &aMem[pOp->p1];
- type1 = numericType(pIn1);
  pIn2 = &aMem[pOp->p2];
- type2 = numericType(pIn2);
- pOut = vdbe_prepare_null_out(p, pOp->p3);
- if (mem_is_any_null(pIn1, pIn2))
-   goto arithmetic_result_is_null;
- if ((type1 & (MEM_Int | MEM_UInt)) != 0 &&
-     (type2 & (MEM_Int | MEM_UInt)) != 0) {
-   iA = pIn1->u.i;
-   iB = pIn2->u.i;
-   bool is_lhs_neg = mem_is_int(pIn1) && !mem_is_uint(pIn1);
-   bool is_rhs_neg = mem_is_int(pIn2) && !mem_is_uint(pIn2);
-   bool is_res_neg;
-   switch( pOp->opcode) {
-   case OP_Add: {
-     if (sql_add_int(iA, is_lhs_neg, iB, is_rhs_neg,
-         (int64_t *) &iB, &is_res_neg) != 0)
-       goto integer_overflow;
-     break;
-   }
-   case OP_Subtract: {
-     if (sql_sub_int(iB, is_rhs_neg, iA, is_lhs_neg,
-         (int64_t *) &iB, &is_res_neg) != 0)
-       goto integer_overflow;
-     break;
-   }
-   case OP_Multiply: {
-     if (sql_mul_int(iA, is_lhs_neg, iB, is_rhs_neg,
-         (int64_t *) &iB, &is_res_neg) != 0)
-       goto integer_overflow;
-     break;
-   }
-   case OP_Divide: {
-     if (iA == 0)
-       goto division_by_zero;
-     if (sql_div_int(iB, is_rhs_neg, iA, is_lhs_neg,
-         (int64_t *) &iB, &is_res_neg) != 0)
-       goto integer_overflow;
-     break;
-   }
-   default: {
-     if (iA == 0)
-       goto division_by_zero;
-     if (iA==-1) iA = 1;
-     if (sql_rem_int(iB, is_rhs_neg, iA, is_lhs_neg,
-         (int64_t *) &iB, &is_res_neg) != 0)
-       goto integer_overflow;
-     break;
-   }
-   }
-   mem_set_int(pOut, iB, is_res_neg);
- } else {
-   if (sqlVdbeRealValue(pIn1, &rA) != 0) {
-     diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-        mem_str(pIn1), "numeric");
-     goto abort_due_to_error;
-   }
-   if (sqlVdbeRealValue(pIn2, &rB) != 0) {
-     diag_set(ClientError, ER_SQL_TYPE_MISMATCH,
-        mem_str(pIn2), "numeric");
-     goto abort_due_to_error;
-   }
-   assert(((type1 | type2) & MEM_Real) != 0);
-   switch( pOp->opcode) {
-   case OP_Add:         rB += rA;       break;
-   case OP_Subtract:    rB -= rA;       break;
-   case OP_Multiply:    rB *= rA;       break;
-   case OP_Divide: {
-     if (rA == (double)0)
-       goto division_by_zero;
-     rB /= rA;
-     break;
-   }
-   default: {
-     iA = (i64)rA;
-     iB = (i64)rB;
-     if (iA == 0)
-       goto division_by_zero;
-     if (iA==-1) iA = 1;
-     rB = (double)(iB % iA);
-     break;
-   }
-   }
-   if (sqlIsNaN(rB)) {
-     goto arithmetic_result_is_null;
-   }
-   mem_set_double(pOut, rB);
- }
- break;
-
-arithmetic_result_is_null:
- /* Force NULL be of type NUMBER. */
- pOut->field_type = FIELD_TYPE_NUMBER;
+ pOut = &aMem[pOp->p3];
+ if (mem_rem(pIn2, pIn1, pOut) != 0)
+   goto abort_due_to_error;
  break;
-
-division_by_zero:
- diag_set(ClientError, ER_SQL_EXECUTE, "division by zero");
- goto abort_due_to_error;
-integer_overflow:
- diag_set(ClientError, ER_SQL_EXECUTE, "integer is overflowed");
- goto abort_due_to_error;
 }
 
 /* Opcode: CollSeq P1 * * P4



More information about the Tarantool-patches mailing list