From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Stanislav Zudin Subject: [PATCH 07/13] sql: arithmetic functions support big integers Date: Fri, 15 Mar 2019 18:45:36 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org, vdavydov.dev@gmail.com Cc: Stanislav Zudin List-ID: Makes arithmetic functions accept arguments with values in the range [2^63, 2^64). --- src/box/sql/func.c | 2 +- src/box/sql/sqlInt.h | 23 +++- src/box/sql/util.c | 236 ++++++++++++++++++++++++++++++++---------- src/box/sql/vdbe.c | 36 ++++--- src/box/sql/vdbeInt.h | 2 +- 5 files changed, 223 insertions(+), 76 deletions(-) diff --git a/src/box/sql/func.c b/src/box/sql/func.c index cf65bf2a2..8a8acc216 100644 --- a/src/box/sql/func.c +++ b/src/box/sql/func.c @@ -1437,7 +1437,7 @@ sumStep(sql_context * context, int argc, sql_value ** argv) i64 v = sql_value_int64(argv[0]); p->rSum += v; if ((p->approx | p->overflow) == 0 - && sqlAddInt64(&p->iSum, v)) { + && sqlAddInt64(&p->iSum, true, v, true) != ATHR_SIGNED) { p->overflow = 1; } } else { diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h index 9b1d7df9a..7f8e3f04e 100644 --- a/src/box/sql/sqlInt.h +++ b/src/box/sql/sqlInt.h @@ -4383,9 +4383,26 @@ Expr *sqlExprAddCollateString(Parse *, Expr *, const char *); Expr *sqlExprSkipCollate(Expr *); int sqlCheckIdentifierName(Parse *, char *); void sqlVdbeSetChanges(sql *, int); -int sqlAddInt64(i64 *, i64); -int sqlSubInt64(i64 *, i64); -int sqlMulInt64(i64 *, i64); + +enum arithmetic_result { + /* The result fits the signed 64-bit integer */ + ATHR_SIGNED, + /* The result is positive and fits the + * unsigned 64-bit integer + */ + ATHR_UNSIGNED, + /* The operation causes an overflow */ + ATHR_OVERFLOW, + /* The operation causes division by zero */ + ATHR_DIVBYZERO +}; + +enum arithmetic_result sqlAddInt64(i64 *, bool, i64, bool); +enum arithmetic_result sqlSubInt64(i64 *, bool, i64, bool); +enum arithmetic_result sqlMulInt64(i64 *, bool, i64, bool); +enum arithmetic_result sqlDivInt64(i64 *, bool, i64, bool); +enum arithmetic_result sqlRemInt64(i64 *, bool, i64, bool); + int sqlAbsInt32(int); #ifdef SQL_ENABLE_8_3_NAMES void sqlFileSuffix3(const char *, char *); diff --git a/src/box/sql/util.c b/src/box/sql/util.c index be77f72f8..3786c5083 100644 --- a/src/box/sql/util.c +++ b/src/box/sql/util.c @@ -1249,74 +1249,202 @@ sqlSafetyCheckSickOrOk(sql * db) } /* - * Attempt to add, substract, or multiply the 64-bit signed value iB against - * the other 64-bit signed integer at *pA and store the result in *pA. - * Return 0 on success. Or if the operation would have resulted in an - * overflow, leave *pA unchanged and return 1. + * Get modulo of 64-bit number */ -int -sqlAddInt64(i64 * pA, i64 iB) +static u64 mod64(i64 v, bool is_signed) +{ + bool is_neg = v < 0 && is_signed; + if (is_neg) + return (v == INT64_MIN) ? (u64)v : (u64)(-v); + else + return (u64)v; +} + +/* + * Attempt to add, substract, or multiply the 64-bit value iB against + * the other 64-bit integer at *pA and store the result in *pA. + * Return ATHR_SIGNED or ATHR_UNSIGNED on success. + * Or if the operation would have resulted in an + * overflow, leave *pA unchanged and return ATHR_OVERFLOW. + */ +enum arithmetic_result +sqlAddInt64(i64 * pA, bool is_signedA, i64 iB, bool is_signedB) { i64 iA = *pA; - testcase(iA == 0); - testcase(iA == 1); - testcase(iB == -1); - testcase(iB == 0); - if (iB >= 0) { - testcase(iA > 0 && LARGEST_INT64 - iA == iB); - testcase(iA > 0 && LARGEST_INT64 - iA == iB - 1); - if (iA > 0 && LARGEST_INT64 - iA < iB) - return 1; + + bool is_negA = iA < 0 && is_signedA; + bool is_negB = iB < 0 && is_signedB; + + /* Make sure we've got only one combination of + * positive and negative operands + */ + if (is_negA > is_negB) { + SWAP(is_negA, is_negB); + SWAP(iA, iB); + } + + if (is_negA != is_negB) { + + assert(iA >=0 && iB < 0); + u64 uB = mod64(iB, true); + + if ((u64)iA >= uB) { + u64 sum = (u64)iA - uB; + *pA = (i64)sum; + return (sum <= INT64_MAX) ? ATHR_SIGNED + : ATHR_UNSIGNED; + } else { + u64 sum = uB - (u64)iA; + if (sum == INT64_MIN_MOD) { + *pA = INT64_MIN; + } else { + assert(sum < INT64_MAX); + *pA = -(i64)sum; + } + return ATHR_SIGNED; + } + } + + if (is_negA) { + assert(is_signedA && is_signedB); + if (-(iA + LARGEST_INT64) > iB + 1) + return ATHR_OVERFLOW; + *pA = iA + iB; + return ATHR_SIGNED; } else { - testcase(iA < 0 && -(iA + LARGEST_INT64) == iB + 1); - testcase(iA < 0 && -(iA + LARGEST_INT64) == iB + 2); - if (iA < 0 && -(iA + LARGEST_INT64) > iB + 1) - return 1; + if (UINT64_MAX - (u64)iA < (u64)iB) + return ATHR_OVERFLOW; + + u64 sum = (u64)iA + (u64)iB; + *pA = (i64)sum; + return (sum <= INT64_MAX) ? ATHR_SIGNED + : ATHR_UNSIGNED; } - *pA += iB; - return 0; } -int -sqlSubInt64(i64 * pA, i64 iB) +enum arithmetic_result +sqlSubInt64(i64 * pA, bool is_signedA, i64 iB, bool is_signedB) { - testcase(iB == SMALLEST_INT64 + 1); - if (iB == SMALLEST_INT64) { - testcase((*pA) == (-1)); - testcase((*pA) == 0); - if ((*pA) >= 0) - return 1; - *pA -= iB; - return 0; + i64 iA = *pA; + + bool is_negA = iA < 0 && is_signedA; + bool is_negB = iB < 0 && is_signedB; + + if (is_negA) { + if (!is_signedB){ + assert((u64)iB > INT64_MAX); + return ATHR_OVERFLOW; + } + + if (iB == INT64_MIN) + return ATHR_OVERFLOW; + else + return sqlAddInt64(pA, true, -iB, true); + } + + if (is_negB) { + /* iA - (-iB) => iA + iB */ + u64 uB = mod64(iB, true); + if (iB == INT64_MIN) + is_signedB = false; + + return sqlAddInt64(pA, is_signedA, uB, is_signedB); } else { - return sqlAddInt64(pA, -iB); + /* Both iA & iB are positive */ + if ((u64)iA < (u64)iB) + return ATHR_OVERFLOW; + u64 val = (u64)iA - (u64)iB; + *pA = (i64)val; + return (val > INT64_MAX) ? ATHR_UNSIGNED + : ATHR_SIGNED; } } -int -sqlMulInt64(i64 * pA, i64 iB) +static enum arithmetic_result +apply_sign(i64* pOut, u64 value, bool is_neg) { - i64 iA = *pA; - if (iB > 0) { - if (iA > LARGEST_INT64 / iB) - return 1; - if (iA < SMALLEST_INT64 / iB) - return 1; - } else if (iB < 0) { - if (iA > 0) { - if (iB < SMALLEST_INT64 / iA) - return 1; - } else if (iA < 0) { - if (iB == SMALLEST_INT64) - return 1; - if (iA == SMALLEST_INT64) - return 1; - if (-iA > LARGEST_INT64 / -iB) - return 1; - } + if (is_neg) { + if (value > INT64_MIN_MOD) + return ATHR_OVERFLOW; + else if (value == INT64_MIN_MOD) + *pOut = (i64)value; + else + *pOut = -(i64)value; + + return ATHR_SIGNED; } - *pA = iA * iB; - return 0; + + *pOut = (i64) value; + return (value > INT64_MAX) ? ATHR_UNSIGNED + : ATHR_SIGNED; +} + +enum arithmetic_result +sqlMulInt64(i64 * pA, bool is_signedA, i64 iB, bool is_signedB) +{ + if (*pA == 0 || iB == 0) { + *pA = 0; + return ATHR_SIGNED; + } + + bool is_negA = *pA < 0 && is_signedA; + bool is_negB = iB < 0 && is_signedB; + + bool is_neg = is_negA != is_negB; + + u64 uA = mod64(*pA, is_signedA); + u64 uB = mod64(iB, is_signedB); + + if (is_neg) { + if (INT64_MIN_MOD / uA < uB) + return ATHR_OVERFLOW; + } else { + if (INT64_MAX / uA < uB) + return ATHR_OVERFLOW; + } + + u64 mul = uA * uB; + return apply_sign(pA, mul, is_neg); +} + +enum arithmetic_result +sqlDivInt64(i64 * pA, bool is_signedA, i64 iB, bool is_signedB) { + if (*pA == 0) + return ATHR_SIGNED; + if (iB == 0) + return ATHR_DIVBYZERO; + + bool is_negA = *pA < 0 && is_signedA; + bool is_negB = iB < 0 && is_signedB; + + bool is_neg = is_negA != is_negB; + + u64 uA = mod64(*pA, is_signedA); + u64 uB = mod64(iB, is_signedB); + + u64 div = uA / uB; + return apply_sign(pA, div, is_neg); +} + +enum arithmetic_result +sqlRemInt64(i64 * pA, bool is_signedA, i64 iB, bool is_signedB) { + + if (iB == 0) + return ATHR_DIVBYZERO; + /* + * The sign of the remainder is defined in such + * a way that if the quotient a/b is representable + * in the result type, then (a/b)*b + a%b == a. + * + * The 2nd operand doesn't affect the sign of result. + */ + + bool is_neg = *pA < 0 && is_signedA; + u64 uA = mod64(*pA, is_signedA); + u64 uB = mod64(iB, is_signedB); + + u64 rem = uA % uB; + return apply_sign(pA, rem, is_neg); } /* diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c index ea9d9d98f..d4bd845fb 100644 --- a/src/box/sql/vdbe.c +++ b/src/box/sql/vdbe.c @@ -1672,28 +1672,29 @@ case OP_Remainder: { /* same as TK_REM, in1, in2, out3 */ if ((type1 & type2 & MEM_Int)!=0) { iA = pIn1->u.i; iB = pIn2->u.i; + bool is_signedA = (type1 & MEM_Unsigned) == 0; + bool is_signedB = (type2 & MEM_Unsigned) == 0; bIntint = 1; + enum arithmetic_result arr; switch( pOp->opcode) { - case OP_Add: if (sqlAddInt64(&iB,iA)) goto integer_overflow; break; - case OP_Subtract: if (sqlSubInt64(&iB,iA)) goto integer_overflow; break; - case OP_Multiply: if (sqlMulInt64(&iB,iA)) goto integer_overflow; break; - case OP_Divide: { - if (iA == 0) - goto division_by_zero; - if (iA==-1 && iB==SMALLEST_INT64) goto integer_overflow; - iB /= iA; - break; + case OP_Add: arr = sqlAddInt64(&iB, is_signedA, iA, is_signedB); break; + case OP_Subtract: arr = sqlSubInt64(&iB, is_signedA, iA, is_signedB); break; + case OP_Multiply: arr = sqlMulInt64(&iB, is_signedA, iA, is_signedB); break; + case OP_Divide: arr = sqlDivInt64(&iB, is_signedA, iA, is_signedB); break; + default: arr = sqlRemInt64(&iB, is_signedA, iA, is_signedB); break; } - default: { - if (iA == 0) - goto division_by_zero; - if (iA==-1) iA = 1; - iB %= iA; + + switch(arr){ + case ATHR_SIGNED: + MemSetTypeFlag(pOut, MEM_Int); break; - } + case ATHR_UNSIGNED: + MemSetTypeFlag(pOut, MEM_Int|MEM_Unsigned); + break; + case ATHR_OVERFLOW: goto integer_overflow; + case ATHR_DIVBYZERO: goto division_by_zero; } pOut->u.i = iB; - MemSetTypeFlag(pOut, MEM_Int); } else { bIntint = 0; if (sqlVdbeRealValue(pIn1, &rA) != 0) { @@ -5177,7 +5178,8 @@ case OP_OffsetLimit: { /* in1, out2, in3 */ assert(pIn1->flags & MEM_Int); assert(pIn3->flags & MEM_Int); x = pIn1->u.i; - if (x<=0 || sqlAddInt64(&x, pIn3->u.i>0?pIn3->u.i:0)) { + if (x<=0 || + sqlAddInt64(&x, true, pIn3->u.i>0?pIn3->u.i:0, true) != ATHR_SIGNED) { /* If the LIMIT is less than or equal to zero, loop forever. This * is documented. But also, if the LIMIT+OFFSET exceeds 2^63 then * also loop forever. This is undocumented. In fact, one could argue diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h index 0375845d9..42f22df52 100644 --- a/src/box/sql/vdbeInt.h +++ b/src/box/sql/vdbeInt.h @@ -276,7 +276,7 @@ enum { * Clear any existing type flags from a Mem and replace them with f */ #define MemSetTypeFlag(p, f) \ - ((p)->flags = ((p)->flags&~(MEM_TypeMask|MEM_Zero))|f) + ((p)->flags = ((p)->flags&~(MEM_TypeMask|MEM_Zero|MEM_Unsigned))|f) /* * Return true if a memory cell is not marked as invalid. This macro -- 2.17.1