[tarantool-patches] Re: [PATCH 6/6] sql: allow to specify UNSIGNED column type
n.pettik
korablev at tarantool.org
Fri Jul 5 19:36:19 MSK 2019
> On 2 Jul 2019, at 00:53, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> wrote:
>
> There still are places, which access Mem.u.i/u without
> checking flags properly. Please, for each place either
> explain why it is correct, or fix it and add a test.
>
> --------------------------------------------------------------
> vdbe.c:2659:
> if ((pDest->flags & (MEM_Int | MEM_UInt)) != 0) {
> if (field_type == FIELD_TYPE_NUMBER)
> sqlVdbeMemSetDouble(pDest, pDest->u.i);
> }
>
> You can't access pDest->u.i - it can be a big unsigned.
Test case:
create table "t"(id int primary key, "a" number);
box.space.t:insert({2, 18446744073709551615ULL})
select * from “t”;
---
- metadata:
- name: ID
type: integer
- name: a
type: number
rows:
- [2, -1]
…
Diff:
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 97cc1fa8a..cabc852fa 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -2616,8 +2616,12 @@ case OP_Column: {
sqlVdbeMemShallowCopy(pDest, default_val_mem, MEM_Static);
}
if ((pDest->flags & (MEM_Int | MEM_UInt)) != 0) {
- if (field_type == FIELD_TYPE_NUMBER)
- sqlVdbeMemSetDouble(pDest, pDest->u.i);
+ if (field_type == FIELD_TYPE_NUMBER) {
+ if ((pDest->flags & MEM_Int) != 0)
+ sqlVdbeMemSetDouble(pDest, pDest->u.i);
+ else
+ sqlVdbeMemSetDouble(pDest, pDest->u.u);
+ }
}
op_column_out:
REGISTER_TRACE(p, pOp->p3, pDest);
diff --git a/test/sql/integer-overflow.result b/test/sql/integer-overflow.result
index 7ca2f336d..528e7a6d6 100644
--- a/test/sql/integer-overflow.result
+++ b/test/sql/integer-overflow.result
@@ -140,3 +140,32 @@ box.execute('SELECT * FROM t;')
box.space.T:drop()
---
...
+-- Make sure that integers stored in NUMBER field are converted
+-- to floating point properly.
+--
+box.execute("CREATE TABLE t(id INT PRIMARY KEY, a FLOAT);")
+---
+- row_count: 1
+...
+box.space.T:insert({1, 18446744073709551615ULL})
+---
+- [1, 18446744073709551615]
+...
+box.space.T:insert({2, -1})
+---
+- [2, -1]
+...
+box.execute("SELECT * FROM t;")
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ rows:
+ - [1, 1.844674407371e+19]
+ - [2, -1]
+...
+box.space.T:drop()
+---
+...
diff --git a/test/sql/integer-overflow.test.lua b/test/sql/integer-overflow.test.lua
index 246465da0..d5635d5af 100644
--- a/test/sql/integer-overflow.test.lua
+++ b/test/sql/integer-overflow.test.lua
@@ -42,3 +42,12 @@ box.space.T:insert({9223372036854775809})
box.space.T:insert({18446744073709551615ULL})
box.execute('SELECT * FROM t;')
box.space.T:drop()
+
+-- Make sure that integers stored in NUMBER field are converted
+-- to floating point properly.
+--
+box.execute("CREATE TABLE t(id INT PRIMARY KEY, a FLOAT);")
+box.space.T:insert({1, 18446744073709551615ULL})
+box.space.T:insert({2, -1})
+box.execute("SELECT * FROM t;")
+box.space.T:drop()
> --------------------------------------------------------------
> vdbe.c:2955:
> pIn1 = &aMem[pOp->p1];
> uint32_t space_id = pIn1->u.i;
>
> Here you touch u.i, but it is always
> unsigned. You should use u.u, I think.
Fair (but space id always fits into 32 bits and always non-negative,
so technically it is correct usage):
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index cabc852fa..0626983f0 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -2915,7 +2915,8 @@ case OP_Savepoint: {
case OP_CheckViewReferences: {
assert(pOp->p1 > 0);
pIn1 = &aMem[pOp->p1];
- uint32_t space_id = pIn1->u.i;
+ uint64_t space_id = pIn1->u.u;
+ assert(space_id <= INT32_MAX);
struct space *space = space_by_id(space_id);
assert(space != NULL);
if (space->def->view_ref_count > 0) {
> --------------------------------------------------------------
> vdbeaux.c:2428:
> if (flags & MEM_Int) {
>
> ...
>
> i64 i = pMem->u.i;
> u64 u;
> if (i < 0) {
> u = ~i;
> } else {
> u = i;
> }
>
> Why do you check 'i < 0', if you already
> see its flag is 'Int'. It should be < 0 by
> definition.
I’m not sure this code is still in usage. It looks like
dead code. Will remove it in a follow-up patch.
> --------------------------------------------------------------
> vdbeaux.c:2589:
> u64 v;
> u32 i;
> if (serial_type == 7) {
> assert(sizeof(v) == sizeof(pMem->u.r));
> memcpy(&v, &pMem->u.r, sizeof(v));
> swapMixedEndianFloat(v);
> } else {
> v = pMem->u.i;
> }
>
> Why are you sure it is safe to store u.i into
> uint64_t variable?
Same.
> --------------------------------------------------------------
> vdbeaux.c:2644:
> u64 x = FOUR_BYTE_UINT(buf);
> u32 y = FOUR_BYTE_UINT(buf + 4);
> x = (x << 32) + y;
> if (serial_type == 6) {
> /* EVIDENCE-OF: R-29851-52272 Value is a big-endian 64-bit
> * twos-complement integer.
> */
> pMem->u.i = *(i64 *) & x;
> pMem->flags = MEM_Int;
>
> Vice versa - why is it safe to store uint64_t into u.i,
> and set its flag to 'Int' (== value is negative).
>
> Actually, all this 'serial' shit looks broken. Please,
> verify that code.
Same.
> --------------------------------------------------------------
> vdbeaux.c:2998:
> if ((f1 & MEM_UInt) != 0) {
> if ((f2 & MEM_Real) != 0) {
> return sqlIntFloatCompare(pMem1->u.i,
>
> pMem1 is unsigned, according to the first check,
> but you use u.i. Why?
Thx, I’ve fixed series of similar places and extended sql/types.test.lua:
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index 325c54c18..b6b5cd0bf 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -2887,43 +2887,50 @@ sqlBlobCompare(const Mem * pB1, const Mem * pB2)
return n1 - n2;
}
-/*
- * Do a comparison between a 64-bit signed integer and a 64-bit floating-point
- * number. Return negative, zero, or positive if the first (i64) is less than,
- * equal to, or greater than the second (double).
+/**
+ * Do a comparison between a 64-bit unsigned/signed integer and a
+ * 64-bit floating-point number. Return negative, zero, or
+ * positive if the first (integer) is less than, equal to, or
+ * greater than the second (double).
*/
static int
-sqlIntFloatCompare(i64 i, double r)
+compare_uint_float(uint64_t u, double r)
{
- if (sizeof(LONGDOUBLE_TYPE) > 8) {
- LONGDOUBLE_TYPE x = (LONGDOUBLE_TYPE) i;
- if (x < r)
- return -1;
- if (x > r)
- return +1;
- return 0;
- } else {
- i64 y;
- double s;
- if (r < -9223372036854775808.0)
- return +1;
- if (r > 9223372036854775807.0)
- return -1;
- y = (i64) r;
- if (i < y)
- return -1;
- if (i > y) {
- if (y == SMALLEST_INT64 && r > 0.0)
- return -1;
- return +1;
- }
- s = (double)i;
- if (s < r)
- return -1;
- if (s > r)
- return +1;
- return 0;
- }
+ if (r > (double) UINT64_MAX)
+ return -1;
+ if (r < 0.0)
+ return +1;
+ uint64_t y = (uint64_t) r;
+ if (u < y)
+ return -1;
+ if (u > y)
+ return +1;
+ double s = (double) u;
+ if (s < r)
+ return -1;
+ if (s > r)
+ return +1;
+ return 0;
+}
+
+static int
+compare_int_float(int64_t i, double r)
+{
+ if (r < (double) INT64_MIN)
+ return +1;
+ if (r >= 0.0)
+ return -1;
+ int64_t y = (int64_t) r;
+ if (i < y)
+ return -1;
+ if (i > y)
+ return +1;
+ double s = (double) i;
+ if (s < r)
+ return -1;
+ if (s > r)
+ return +1;
+ return 0;
}
/*
@@ -2989,15 +2996,15 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
}
if ((f1 & MEM_Int) != 0) {
if ((f2 & MEM_Real) != 0) {
- return sqlIntFloatCompare(pMem1->u.i,
- pMem2->u.r);
+ return compare_int_float(pMem1->u.i,
+ pMem2->u.r);
} else {
return -1;
}
}
if ((f1 & MEM_UInt) != 0) {
if ((f2 & MEM_Real) != 0) {
- return sqlIntFloatCompare(pMem1->u.i,
+ return compare_uint_float(pMem1->u.u,
pMem2->u.r);
} else if ((f2 & MEM_Int) != 0) {
return +1;
@@ -3007,10 +3014,10 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
}
if ((f1 & MEM_Real) != 0) {
if ((f2 & MEM_Int) != 0) {
- return -sqlIntFloatCompare(pMem2->u.i,
- pMem1->u.r);
+ return -compare_int_float(pMem2->u.i,
+ pMem1->u.r);
} else if ((f2 & MEM_UInt) != 0) {
- return -sqlIntFloatCompare(pMem2->u.u,
+ return -compare_uint_float(pMem2->u.u,
pMem1->u.r);
} else {
return -1;
@@ -3171,26 +3178,33 @@ sqlVdbeCompareMsgpack(const char **key1,
break;
}
case MP_UINT:{
- uint64_t v = mp_decode_uint(&aKey1);
- if (v > INT64_MAX) {
- mem1.u.r = (double)v;
- goto do_float;
+ mem1.u.u = mp_decode_uint(&aKey1);
+ if ((pKey2->flags & MEM_Int) != 0) {
+ rc = +1;
+ } else if ((pKey2->flags & MEM_UInt) != 0) {
+ if (mem1.u.u < pKey2->u.u)
+ rc = -1;
+ else if (mem1.u.u > pKey2->u.u)
+ rc = +1;
+ } else if ((pKey2->flags & MEM_Real) != 0) {
+ rc = compare_uint_float(mem1.u.u, pKey2->u.r);
+ } else {
+ rc = (pKey2->flags & MEM_Null) ? +1 : -1;
}
- mem1.u.u = v;
- goto do_int;
+ break;
}
case MP_INT:{
mem1.u.i = mp_decode_int(&aKey1);
- do_int:
- if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
+ if ((pKey2->flags & MEM_UInt) != 0) {
+ rc = -1;
+ } else if ((pKey2->flags & MEM_Int) != 0) {
if (mem1.u.i < pKey2->u.i) {
rc = -1;
} else if (mem1.u.i > pKey2->u.i) {
rc = +1;
}
} else if (pKey2->flags & MEM_Real) {
- rc = sqlIntFloatCompare(mem1.u.i,
- pKey2->u.r);
+ rc = compare_int_float(mem1.u.i, pKey2->u.r);
} else {
rc = (pKey2->flags & MEM_Null) ? +1 : -1;
}
@@ -3203,9 +3217,10 @@ sqlVdbeCompareMsgpack(const char **key1,
case MP_DOUBLE:{
mem1.u.r = mp_decode_double(&aKey1);
do_float:
- if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
- rc = -sqlIntFloatCompare(pKey2->u.i,
- mem1.u.r);
+ if ((pKey2->flags & MEM_Int) != 0) {
+ rc = -compare_int_float(pKey2->u.i, mem1.u.r);
+ } else if (pKey2->flags & MEM_UInt) {
+ rc = -compare_uint_float(pKey2->u.u, mem1.u.r);
} else if (pKey2->flags & MEM_Real) {
if (mem1.u.r < pKey2->u.r) {
rc = -1;
> --------------------------------------------------------------
> vdbeaux.c:3184:
> do_int:
> if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
> if (mem1.u.i < pKey2->u.i) {
> rc = -1;
>
> pKey2 is an integer, but you don't know the sign. And use u.i,
> assuming it is negative, or at least small enough. Why?
>
> Line 3207 is the same.
Fixed, see diff above.
> --------------------------------------------------------------
> vdbemem.c:1431:
> } else if (pVal->u.i == SMALLEST_INT64) {
> pVal->u.r = -(double)SMALLEST_INT64;
> MemSetTypeFlag(pVal, MEM_Real);
> } else {
> pVal->u.i = -pVal->u.i;
> }
>
> You compare u.i and SMALLEST_INT64, but you can't
> be sure, that u.i is not a big unsigned, can you?
Fixed:
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index f8673912e..64acb5d41 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1428,11 +1428,15 @@ valueFromExpr(sql * db, /* The database connection */
return rc;
if (pVal->flags & MEM_Real) {
pVal->u.r = -pVal->u.r;
- } else if (pVal->u.i == SMALLEST_INT64) {
- pVal->u.r = -(double)SMALLEST_INT64;
- MemSetTypeFlag(pVal, MEM_Real);
- } else {
- pVal->u.i = -pVal->u.i;
+ } else if ((pVal->flags & MEM_Int) != 0) {
+ mem_set_u64(pVal, (uint64_t)(-pVal->u.i));
+ } else if ((pVal->flags & MEM_UInt) != 0) {
+ if (pVal->u.u > (uint64_t) INT64_MAX + 1) {
+ pVal->u.r = -(double) pVal->u.u;
+ MemSetTypeFlag(pVal, MEM_Real);
+ } else {
+ mem_set_i64(pVal, (int64_t)(-pVal->u.u));
+ }
}
sql_value_apply_type(pVal, type);
}
More information about the Tarantool-patches
mailing list