[tarantool-patches] Re: [PATCH 6/6] sql: allow to specify UNSIGNED column type

n.pettik korablev at tarantool.org
Fri Jul 5 19:36:19 MSK 2019



> On 2 Jul 2019, at 00:53, Vladislav Shpilevoy <v.shpilevoy at tarantool.org> wrote:
> 
> There still are places, which access Mem.u.i/u without
> checking flags properly. Please, for each place either
> explain why it is correct, or fix it and add a test.
> 
> --------------------------------------------------------------
> vdbe.c:2659:
> 	if ((pDest->flags & (MEM_Int | MEM_UInt)) != 0) {
> 		if (field_type == FIELD_TYPE_NUMBER)
> 			sqlVdbeMemSetDouble(pDest, pDest->u.i);
> 	}
> 
> You can't access pDest->u.i - it can be a big unsigned.

Test case:

create table "t"(id int primary key, "a" number);
box.space.t:insert({2, 18446744073709551615ULL})
select * from “t”;
---
- metadata:
  - name: ID
    type: integer
  - name: a
    type: number
  rows:
  - [2, -1]
…

Diff:

diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 97cc1fa8a..cabc852fa 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -2616,8 +2616,12 @@ case OP_Column: {
                sqlVdbeMemShallowCopy(pDest, default_val_mem, MEM_Static);
        }
        if ((pDest->flags & (MEM_Int | MEM_UInt)) != 0) {
-               if (field_type == FIELD_TYPE_NUMBER)
-                       sqlVdbeMemSetDouble(pDest, pDest->u.i);
+               if (field_type == FIELD_TYPE_NUMBER) {
+                       if ((pDest->flags & MEM_Int) != 0)
+                               sqlVdbeMemSetDouble(pDest, pDest->u.i);
+                       else
+                               sqlVdbeMemSetDouble(pDest, pDest->u.u);
+               }
        }
 op_column_out:
        REGISTER_TRACE(p, pOp->p3, pDest);

diff --git a/test/sql/integer-overflow.result b/test/sql/integer-overflow.result
index 7ca2f336d..528e7a6d6 100644
--- a/test/sql/integer-overflow.result
+++ b/test/sql/integer-overflow.result
@@ -140,3 +140,32 @@ box.execute('SELECT * FROM t;')
 box.space.T:drop()
 ---
 ...
+-- Make sure that integers stored in NUMBER field are converted
+-- to floating point properly.
+--
+box.execute("CREATE TABLE t(id INT PRIMARY KEY, a FLOAT);")
+---
+- row_count: 1
+...
+box.space.T:insert({1, 18446744073709551615ULL})
+---
+- [1, 18446744073709551615]
+...
+box.space.T:insert({2, -1})
+---
+- [2, -1]
+...
+box.execute("SELECT * FROM t;")
+---
+- metadata:
+  - name: ID
+    type: integer
+  - name: A
+    type: number
+  rows:
+  - [1, 1.844674407371e+19]
+  - [2, -1]
+...
+box.space.T:drop()
+---
+...

diff --git a/test/sql/integer-overflow.test.lua b/test/sql/integer-overflow.test.lua
index 246465da0..d5635d5af 100644
--- a/test/sql/integer-overflow.test.lua
+++ b/test/sql/integer-overflow.test.lua
@@ -42,3 +42,12 @@ box.space.T:insert({9223372036854775809})
 box.space.T:insert({18446744073709551615ULL})
 box.execute('SELECT * FROM t;')
 box.space.T:drop()
+
+-- Make sure that integers stored in NUMBER field are converted
+-- to floating point properly.
+--
+box.execute("CREATE TABLE t(id INT PRIMARY KEY, a FLOAT);")
+box.space.T:insert({1, 18446744073709551615ULL})
+box.space.T:insert({2, -1})
+box.execute("SELECT * FROM t;")
+box.space.T:drop()

> --------------------------------------------------------------
> vdbe.c:2955:
> 	pIn1 = &aMem[pOp->p1];
> 	uint32_t space_id = pIn1->u.i;
> 
> Here you touch u.i, but it is always
> unsigned. You should use u.u, I think.

Fair (but space id always fits into 32 bits and always non-negative,
so technically it is correct usage):

diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index cabc852fa..0626983f0 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -2915,7 +2915,8 @@ case OP_Savepoint: {
 case OP_CheckViewReferences: {
        assert(pOp->p1 > 0);
        pIn1 = &aMem[pOp->p1];
-       uint32_t space_id = pIn1->u.i;
+       uint64_t space_id = pIn1->u.u;
+       assert(space_id <= INT32_MAX);
        struct space *space = space_by_id(space_id);
        assert(space != NULL);
        if (space->def->view_ref_count > 0) {

> --------------------------------------------------------------
> vdbeaux.c:2428:
> 	if (flags & MEM_Int) {
> 
> 		...
> 
> 		i64 i = pMem->u.i;
> 		u64 u;
> 		if (i < 0) {
> 			u = ~i;
> 		} else {
> 			u = i;
> 		}
> 
> Why do you check 'i < 0', if you already
> see its flag is 'Int'. It should be < 0 by
> definition.

I’m not sure this code is still in usage. It looks like
dead code. Will remove it in a follow-up patch.

> --------------------------------------------------------------
> vdbeaux.c:2589:
> 		u64 v;
> 		u32 i;
> 		if (serial_type == 7) {
> 			assert(sizeof(v) == sizeof(pMem->u.r));
> 			memcpy(&v, &pMem->u.r, sizeof(v));
> 			swapMixedEndianFloat(v);
> 		} else {
> 			v = pMem->u.i;
> 		}
> 
> Why are you sure it is safe to store u.i into
> uint64_t variable?

Same.

> --------------------------------------------------------------
> vdbeaux.c:2644:
> 	u64 x = FOUR_BYTE_UINT(buf);
> 	u32 y = FOUR_BYTE_UINT(buf + 4);
> 	x = (x << 32) + y;
> 	if (serial_type == 6) {
> 		/* EVIDENCE-OF: R-29851-52272 Value is a big-endian 64-bit
> 		 * twos-complement integer.
> 		 */
> 		pMem->u.i = *(i64 *) & x;
> 		pMem->flags = MEM_Int;
> 
> Vice versa - why is it safe to store uint64_t into u.i,
> and set its flag to 'Int' (== value is negative).
> 
> Actually, all this 'serial' shit looks broken. Please,
> verify that code.

Same.

> --------------------------------------------------------------
> vdbeaux.c:2998:
> 		if ((f1 & MEM_UInt) != 0) {
> 			if ((f2 & MEM_Real) != 0) {
> 				return sqlIntFloatCompare(pMem1->u.i,
> 
> pMem1 is unsigned, according to the first check,
> but you use u.i. Why?

Thx, I’ve fixed series of similar places and extended sql/types.test.lua:

diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index 325c54c18..b6b5cd0bf 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -2887,43 +2887,50 @@ sqlBlobCompare(const Mem * pB1, const Mem * pB2)
        return n1 - n2;
 }
 
-/*
- * Do a comparison between a 64-bit signed integer and a 64-bit floating-point
- * number.  Return negative, zero, or positive if the first (i64) is less than,
- * equal to, or greater than the second (double).
+/**
+ * Do a comparison between a 64-bit unsigned/signed integer and a
+ * 64-bit floating-point number.  Return negative, zero, or
+ * positive if the first (integer) is less than, equal to, or
+ * greater than the second (double).
  */
 static int
-sqlIntFloatCompare(i64 i, double r)
+compare_uint_float(uint64_t u, double r)
 {
-       if (sizeof(LONGDOUBLE_TYPE) > 8) {
-               LONGDOUBLE_TYPE x = (LONGDOUBLE_TYPE) i;
-               if (x < r)
-                       return -1;
-               if (x > r)
-                       return +1;
-               return 0;
-       } else {
-               i64 y;
-               double s;
-               if (r < -9223372036854775808.0)
-                       return +1;
-               if (r > 9223372036854775807.0)
-                       return -1;
-               y = (i64) r;
-               if (i < y)
-                       return -1;
-               if (i > y) {
-                       if (y == SMALLEST_INT64 && r > 0.0)
-                               return -1;
-                       return +1;
-               }
-               s = (double)i;
-               if (s < r)
-                       return -1;
-               if (s > r)
-                       return +1;
-               return 0;
-       }
+       if (r > (double) UINT64_MAX)
+               return -1;
+       if (r < 0.0)
+               return +1;
+       uint64_t y = (uint64_t) r;
+       if (u < y)
+               return -1;
+       if (u > y)
+               return +1;
+       double s = (double) u;
+       if (s < r)
+               return -1;
+       if (s > r)
+               return +1;
+       return 0;
+}
+
+static int
+compare_int_float(int64_t i, double r)
+{
+       if (r < (double) INT64_MIN)
+               return +1;
+       if (r >= 0.0)
+               return -1;
+       int64_t y = (int64_t) r;
+       if (i < y)
+               return -1;
+       if (i > y)
+               return +1;
+       double s = (double) i;
+       if (s < r)
+               return -1;
+       if (s > r)
+               return +1;
+       return 0;
 }
 
 /*
@@ -2989,15 +2996,15 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
                }
                if ((f1 & MEM_Int) != 0) {
                        if ((f2 & MEM_Real) != 0) {
-                               return sqlIntFloatCompare(pMem1->u.i,
-                                                             pMem2->u.r);
+                               return compare_int_float(pMem1->u.i,
+                                                        pMem2->u.r);
                        } else {
                                return -1;
                        }
                }
                if ((f1 & MEM_UInt) != 0) {
                        if ((f2 & MEM_Real) != 0) {
-                               return sqlIntFloatCompare(pMem1->u.i,
+                               return compare_uint_float(pMem1->u.u,
                                                          pMem2->u.r);
                        } else if ((f2 & MEM_Int) != 0) {
                                return +1;
@@ -3007,10 +3014,10 @@ sqlMemCompare(const Mem * pMem1, const Mem * pMem2, const struct coll * pColl)
                }
                if ((f1 & MEM_Real) != 0) {
                        if ((f2 & MEM_Int) != 0) {
-                               return -sqlIntFloatCompare(pMem2->u.i,
-                                                              pMem1->u.r);
+                               return -compare_int_float(pMem2->u.i,
+                                                         pMem1->u.r);
                        } else if ((f2 & MEM_UInt) != 0) {
-                               return -sqlIntFloatCompare(pMem2->u.u,
+                               return -compare_uint_float(pMem2->u.u,
                                                           pMem1->u.r);
                        } else {
                                return -1;
@@ -3171,26 +3178,33 @@ sqlVdbeCompareMsgpack(const char **key1,
                        break;
                }
        case MP_UINT:{
-                       uint64_t v = mp_decode_uint(&aKey1);
-                       if (v > INT64_MAX) {
-                               mem1.u.r = (double)v;
-                               goto do_float;
+                       mem1.u.u = mp_decode_uint(&aKey1);
+                       if ((pKey2->flags & MEM_Int) != 0) {
+                               rc = +1;
+                       } else if ((pKey2->flags & MEM_UInt) != 0) {
+                               if (mem1.u.u < pKey2->u.u)
+                                       rc = -1;
+                               else if (mem1.u.u > pKey2->u.u)
+                                       rc = +1;
+                       } else if ((pKey2->flags & MEM_Real) != 0) {
+                               rc = compare_uint_float(mem1.u.u, pKey2->u.r);
+                       } else {
+                               rc = (pKey2->flags & MEM_Null) ? +1 : -1;
                        }
-                       mem1.u.u = v;
-                       goto do_int;
+                       break;
                }
        case MP_INT:{
                        mem1.u.i = mp_decode_int(&aKey1);
- do_int:
-                       if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
+                       if ((pKey2->flags & MEM_UInt) != 0) {
+                               rc = -1;
+                       } else if ((pKey2->flags & MEM_Int) != 0) {
                                if (mem1.u.i < pKey2->u.i) {
                                        rc = -1;
                                } else if (mem1.u.i > pKey2->u.i) {
                                        rc = +1;
                                }
                        } else if (pKey2->flags & MEM_Real) {
-                               rc = sqlIntFloatCompare(mem1.u.i,
-                                                           pKey2->u.r);
+                               rc = compare_int_float(mem1.u.i, pKey2->u.r);
                        } else {
                                rc = (pKey2->flags & MEM_Null) ? +1 : -1;
                        }
@@ -3203,9 +3217,10 @@ sqlVdbeCompareMsgpack(const char **key1,
        case MP_DOUBLE:{
                        mem1.u.r = mp_decode_double(&aKey1);
  do_float:
-                       if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
-                               rc = -sqlIntFloatCompare(pKey2->u.i,
-                                                            mem1.u.r);
+                       if ((pKey2->flags & MEM_Int) != 0) {
+                               rc = -compare_int_float(pKey2->u.i, mem1.u.r);
+                       } else if (pKey2->flags & MEM_UInt) {
+                               rc = -compare_uint_float(pKey2->u.u, mem1.u.r);
                        } else if (pKey2->flags & MEM_Real) {
                                if (mem1.u.r < pKey2->u.r) {
                                        rc = -1;

> --------------------------------------------------------------
> vdbeaux.c:3184:
> do_int:
> 			if ((pKey2->flags & (MEM_Int | MEM_UInt)) != 0) {
> 				if (mem1.u.i < pKey2->u.i) {
> 					rc = -1;
> 
> pKey2 is an integer, but you don't know the sign. And use u.i,
> assuming it is negative, or at least small enough. Why?
> 
> Line 3207 is the same.

Fixed, see diff above.

> --------------------------------------------------------------
> vdbemem.c:1431:
> 			} else if (pVal->u.i == SMALLEST_INT64) {
> 				pVal->u.r = -(double)SMALLEST_INT64;
> 				MemSetTypeFlag(pVal, MEM_Real);
> 			} else {
> 				pVal->u.i = -pVal->u.i;
> 			}
> 
> You compare u.i and SMALLEST_INT64, but you can't
> be sure, that u.i is not a big unsigned, can you?

Fixed:

diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index f8673912e..64acb5d41 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1428,11 +1428,15 @@ valueFromExpr(sql * db, /* The database connection */
                                return rc;
                        if (pVal->flags & MEM_Real) {
                                pVal->u.r = -pVal->u.r;
-                       } else if (pVal->u.i == SMALLEST_INT64) {
-                               pVal->u.r = -(double)SMALLEST_INT64;
-                               MemSetTypeFlag(pVal, MEM_Real);
-                       } else {
-                               pVal->u.i = -pVal->u.i;
+                       } else if ((pVal->flags & MEM_Int) != 0) {
+                               mem_set_u64(pVal, (uint64_t)(-pVal->u.i));
+                       } else if ((pVal->flags & MEM_UInt) != 0) {
+                               if (pVal->u.u > (uint64_t) INT64_MAX + 1) {
+                                       pVal->u.r = -(double) pVal->u.u;
+                                       MemSetTypeFlag(pVal, MEM_Real);
+                               } else {
+                                       mem_set_i64(pVal, (int64_t)(-pVal->u.u));
+                               }
                        }
                        sql_value_apply_type(pVal, type);
                }



More information about the Tarantool-patches mailing list