[PATCH 04/13] sql: support big integers within sql binding

Stanislav Zudin szudin at tarantool.org
Fri Mar 15 18:45:33 MSK 2019


Enables sql binding for unsigned int64.
---
 src/box/execute.c     | 21 ++++++++++----
 src/box/lua/lua_sql.c |  3 ++
 src/box/lua/sql.c     | 10 ++++---
 src/box/sql/func.c    |  7 +++++
 src/box/sql/sqlInt.h  |  4 +++
 src/box/sql/vdbe.c    |  6 ++--
 src/box/sql/vdbeInt.h |  2 +-
 src/box/sql/vdbeapi.c | 66 ++++++++++++++++++++++++++++++++++++++++---
 src/box/sql/vdbemem.c | 12 +++++---
 9 files changed, 109 insertions(+), 22 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index 7c77df2e5..31b89a75e 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -130,13 +130,8 @@ sql_bind_decode(struct sql_bind *bind, int i, const char **packet)
 	switch (mp_typeof(**packet)) {
 	case MP_UINT: {
 		uint64_t n = mp_decode_uint(packet);
-		if (n > INT64_MAX) {
-			diag_set(ClientError, ER_SQL_BIND_VALUE,
-				 sql_bind_name(bind), "INTEGER");
-			return -1;
-		}
 		bind->i64 = (int64_t) n;
-		bind->type = SQL_INTEGER;
+		bind->type = (n > INT64_MAX) ? SQL_UNSIGNED : SQL_INTEGER;
 		bind->bytes = sizeof(bind->i64);
 		break;
 	}
@@ -246,6 +241,7 @@ sql_column_to_messagepack(struct sql_stmt *stmt, int i,
 	int type = sql_column_type(stmt, i);
 	switch (type) {
 	case SQL_INTEGER: {
+		assert(!sql_column_is_unsigned(stmt, i));
 		int64_t n = sql_column_int64(stmt, i);
 		if (n >= 0)
 			size = mp_sizeof_uint(n);
@@ -260,6 +256,16 @@ sql_column_to_messagepack(struct sql_stmt *stmt, int i,
 			mp_encode_int(pos, n);
 		break;
 	}
+	case SQL_UNSIGNED: {
+		assert(sql_column_is_unsigned(stmt, i));
+		int64_t n = sql_column_int64(stmt, i);
+		size = mp_sizeof_uint(n);
+		char *pos = (char *) region_alloc(region, size);
+		if (pos == NULL)
+			goto oom;
+		mp_encode_uint(pos, n);
+		break;
+	}
 	case SQL_FLOAT: {
 		double d = sql_column_double(stmt, i);
 		size = mp_sizeof_double(d);
@@ -389,6 +395,9 @@ sql_bind_column(struct sql_stmt *stmt, const struct sql_bind *p,
 	case SQL_INTEGER:
 		rc = sql_bind_int64(stmt, pos, p->i64);
 		break;
+	case SQL_UNSIGNED:
+		rc = sql_bind_uint64(stmt, pos, p->i64);
+		break;
 	case SQL_FLOAT:
 		rc = sql_bind_double(stmt, pos, p->d);
 		break;
diff --git a/src/box/lua/lua_sql.c b/src/box/lua/lua_sql.c
index f5a7b7819..57a3161c7 100644
--- a/src/box/lua/lua_sql.c
+++ b/src/box/lua/lua_sql.c
@@ -60,6 +60,9 @@ lua_sql_call(sql_context *pCtx, int nVal, sql_value **apVal) {
 		case SQL_INTEGER:
 			luaL_pushint64(L, sql_value_int64(param));
 			break;
+		case SQL_UNSIGNED:
+			luaL_pushuint64(L, sql_value_int64(param));
+			break;
 		case SQL_FLOAT:
 			lua_pushnumber(L, sql_value_double(param));
 			break;
diff --git a/src/box/lua/sql.c b/src/box/lua/sql.c
index cb2927144..9a35c03aa 100644
--- a/src/box/lua/sql.c
+++ b/src/box/lua/sql.c
@@ -32,10 +32,12 @@ lua_push_row(struct lua_State *L, struct sql_stmt *stmt)
 		int type = sql_column_type(stmt, i);
 		switch (type) {
 		case SQL_INTEGER:
-			if (sql_column_is_unsigned(stmt, i))
-				luaL_pushuint64(L, sql_column_int64(stmt, i));
-			else
-				luaL_pushint64(L, sql_column_int64(stmt, i));
+			assert(!sql_column_is_unsigned(stmt, i));
+			luaL_pushint64(L, sql_column_int64(stmt, i));
+			break;
+		case SQL_UNSIGNED:
+			assert(sql_column_is_unsigned(stmt, i));
+			luaL_pushuint64(L, sql_column_int64(stmt, i));
 			break;
 		case SQL_FLOAT:
 			lua_pushnumber(L, sql_column_double(stmt, i));
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index 21a69aa51..cf65bf2a2 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -141,6 +141,7 @@ lengthFunc(sql_context * context, int argc, sql_value ** argv)
 	switch (sql_value_type(argv[0])) {
 	case SQL_BLOB:
 	case SQL_INTEGER:
+	case SQL_UNSIGNED:
 	case SQL_FLOAT:{
 			sql_result_int(context,
 					   sql_value_bytes(argv[0]));
@@ -191,6 +192,11 @@ absFunc(sql_context * context, int argc, sql_value ** argv)
 			sql_result_int64(context, iVal);
 			break;
 		}
+	case SQL_UNSIGNED: {
+			i64 iVal = sql_value_int64(argv[0]);
+			sql_result_int64(context, iVal);
+			break;
+		}
 	case SQL_NULL:{
 			/* IMP: R-37434-19929 Abs(X) returns NULL if X is NULL. */
 			sql_result_null(context);
@@ -957,6 +963,7 @@ quoteFunc(sql_context * context, int argc, sql_value ** argv)
 					    SQL_TRANSIENT);
 			break;
 		}
+	case SQL_UNSIGNED:
 	case SQL_INTEGER:{
 			sql_result_value(context, argv[0]);
 			break;
diff --git a/src/box/sql/sqlInt.h b/src/box/sql/sqlInt.h
index 92e2f282f..56aa7c681 100644
--- a/src/box/sql/sqlInt.h
+++ b/src/box/sql/sqlInt.h
@@ -644,6 +644,7 @@ enum sql_type {
 	SQL_TEXT = 3,
 	SQL_BLOB = 4,
 	SQL_NULL = 5,
+	SQL_UNSIGNED = 6
 };
 
 /**
@@ -906,6 +907,9 @@ sql_bind_int(sql_stmt *, int, int);
 int
 sql_bind_int64(sql_stmt *, int, sql_int64);
 
+int
+sql_bind_uint64(sql_stmt *, int, sql_uint64);
+
 int
 sql_bind_null(sql_stmt *, int);
 
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index ea398e7d6..8a7f7a12f 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1439,7 +1439,7 @@ case OP_IntCopy: {            /* out2 */
 	pIn1 = &aMem[pOp->p1];
 	assert((pIn1->flags & MEM_Int)!=0);
 	pOut = &aMem[pOp->p2];
-	sqlVdbeMemSetInt64(pOut, pIn1->u.i);
+	sqlVdbeMemSetInt64(pOut, pIn1->u.i, (pIn1->flags & MEM_Unsigned)!=0);
 	break;
 }
 
@@ -1770,7 +1770,7 @@ integer_overflow:
 case OP_CollSeq: {
 	assert(pOp->p4type==P4_COLLSEQ || pOp->p4.pColl == NULL);
 	if (pOp->p1) {
-		sqlVdbeMemSetInt64(&aMem[pOp->p1], 0);
+		sqlVdbeMemSetInt64(&aMem[pOp->p1], 0, false);
 	}
 	break;
 }
@@ -5317,7 +5317,7 @@ case OP_AggStep: {
 	if (pCtx->skipFlag) {
 		assert(pOp[-1].opcode==OP_CollSeq);
 		i = pOp[-1].p1;
-		if (i) sqlVdbeMemSetInt64(&aMem[i], 1);
+		if (i) sqlVdbeMemSetInt64(&aMem[i], 1, false);
 	}
 	break;
 }
diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h
index 66b21299a..0375845d9 100644
--- a/src/box/sql/vdbeInt.h
+++ b/src/box/sql/vdbeInt.h
@@ -477,7 +477,7 @@ void sqlVdbeMemShallowCopy(Mem *, const Mem *, int);
 void sqlVdbeMemMove(Mem *, Mem *);
 int sqlVdbeMemNulTerminate(Mem *);
 int sqlVdbeMemSetStr(Mem *, const char *, int, u8, void (*)(void *));
-void sqlVdbeMemSetInt64(Mem *, i64);
+void sqlVdbeMemSetInt64(Mem *, i64, bool);
 #ifdef SQL_OMIT_FLOATING_POINT
 #define sqlVdbeMemSetDouble sqlVdbeMemSetInt64
 #else
diff --git a/src/box/sql/vdbeapi.c b/src/box/sql/vdbeapi.c
index 02a1b2e0f..2c486552e 100644
--- a/src/box/sql/vdbeapi.c
+++ b/src/box/sql/vdbeapi.c
@@ -279,8 +279,49 @@ sql_value_type(sql_value * pVal)
 		SQL_NULL,	/* 0x1d */
 		SQL_INTEGER,	/* 0x1e */
 		SQL_NULL,	/* 0x1f */
+
+		SQL_BLOB,	/* 0x20 */
+		SQL_NULL,	/* 0x21 */
+		SQL_TEXT,	/* 0x22 */
+		SQL_NULL,	/* 0x23 */
+		SQL_UNSIGNED,	/* 0x24 */
+		SQL_NULL,	/* 0x25 */
+		SQL_UNSIGNED,	/* 0x26 */
+		SQL_NULL,	/* 0x27 */
+		SQL_FLOAT,	/* 0x28 */
+		SQL_NULL,	/* 0x29 */
+		SQL_FLOAT,	/* 0x2a */
+		SQL_NULL,	/* 0x2b */
+		SQL_INTEGER,	/* 0x2c */
+		SQL_NULL,	/* 0x2d */
+		SQL_INTEGER,	/* 0x2e */
+		SQL_NULL,	/* 0x2f */
+		SQL_BLOB,	/* 0x30 */
+		SQL_NULL,	/* 0x31 */
+		SQL_TEXT,	/* 0x32 */
+		SQL_NULL,	/* 0x33 */
+		SQL_INTEGER,	/* 0x34 */
+		SQL_NULL,	/* 0x35 */
+		SQL_INTEGER,	/* 0x36 */
+		SQL_NULL,	/* 0x37 */
+		SQL_FLOAT,	/* 0x38 */
+		SQL_NULL,	/* 0x39 */
+		SQL_FLOAT,	/* 0x3a */
+		SQL_NULL,	/* 0x3b */
+		SQL_INTEGER,	/* 0x3c */
+		SQL_NULL,	/* 0x3d */
+		SQL_INTEGER,	/* 0x3e */
+		SQL_NULL,	/* 0x3f */
 	};
-	return aType[pVal->flags & MEM_PURE_TYPE_MASK];
+
+	assert(MEM_Unsigned == 0x20000);
+	/* compress the unsigned bit with the pure
+	 * type bits, to make them applicable for
+	 * array indexing.
+	 */
+	u32 offset = (pVal->flags >> 12) | (pVal->flags & MEM_PURE_TYPE_MASK);
+	assert(offset < 0x40);
+	return aType[offset];
 }
 
 /* Make a copy of an sql_value object
@@ -401,13 +442,13 @@ sql_result_error(sql_context * pCtx, const char *z, int n)
 void
 sql_result_int(sql_context * pCtx, int iVal)
 {
-	sqlVdbeMemSetInt64(pCtx->pOut, (i64) iVal);
+	sqlVdbeMemSetInt64(pCtx->pOut, (i64) iVal, false);
 }
 
 void
 sql_result_int64(sql_context * pCtx, i64 iVal)
 {
-	sqlVdbeMemSetInt64(pCtx->pOut, iVal);
+	sqlVdbeMemSetInt64(pCtx->pOut, iVal, false);
 }
 
 void
@@ -1370,7 +1411,20 @@ sql_bind_int64(sql_stmt * pStmt, int i, sql_int64 iValue)
 	rc = vdbeUnbind(p, i);
 	if (rc == SQL_OK) {
 		rc = sql_bind_type(p, i, "INTEGER");
-		sqlVdbeMemSetInt64(&p->aVar[i - 1], iValue);
+		sqlVdbeMemSetInt64(&p->aVar[i - 1], iValue, false);
+	}
+	return rc;
+}
+
+int
+sql_bind_uint64(sql_stmt * pStmt, int i, sql_uint64 iValue)
+{
+	int rc;
+	Vdbe *p = (Vdbe *) pStmt;
+	rc = vdbeUnbind(p, i);
+	if (rc == SQL_OK) {
+		rc = sql_bind_type(p, i, "INTEGER");
+		sqlVdbeMemSetInt64(&p->aVar[i - 1], (u64)iValue, true);
 	}
 	return rc;
 }
@@ -1418,6 +1472,10 @@ sql_bind_value(sql_stmt * pStmt, int i, const sql_value * pValue)
 			rc = sql_bind_int64(pStmt, i, pValue->u.i);
 			break;
 		}
+	case SQL_UNSIGNED: {
+			rc = sql_bind_uint64(pStmt, i, pValue->u.i);
+			break;
+		}
 	case SQL_FLOAT:{
 			rc = sql_bind_double(pStmt, i, pValue->u.r);
 			break;
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 0b5ba965e..5e35a1720 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -704,11 +704,13 @@ sqlVdbeMemSetZeroBlob(Mem * pMem, int n)
  * a 64-bit integer.
  */
 static SQL_NOINLINE void
-vdbeReleaseAndSetInt64(Mem * pMem, i64 val)
+vdbeReleaseAndSetInt64(Mem * pMem, i64 val, bool is_unsigned)
 {
 	sqlVdbeMemSetNull(pMem);
 	pMem->u.i = val;
 	pMem->flags = MEM_Int;
+	if (is_unsigned)
+		pMem->flags |= MEM_Unsigned;
 }
 
 /*
@@ -716,13 +718,15 @@ vdbeReleaseAndSetInt64(Mem * pMem, i64 val)
  * manifest type INTEGER.
  */
 void
-sqlVdbeMemSetInt64(Mem * pMem, i64 val)
+sqlVdbeMemSetInt64(Mem * pMem, i64 val, bool is_unsigned)
 {
 	if (VdbeMemDynamic(pMem)) {
-		vdbeReleaseAndSetInt64(pMem, val);
+		vdbeReleaseAndSetInt64(pMem, val, is_unsigned);
 	} else {
 		pMem->u.i = val;
 		pMem->flags = MEM_Int;
+		if (is_unsigned)
+			pMem->flags |= MEM_Unsigned;
 	}
 }
 
@@ -1298,7 +1302,7 @@ valueFromExpr(sql * db,	/* The database connection */
 			goto no_mem;
 		if (ExprHasProperty(pExpr, EP_IntValue)) {
 			sqlVdbeMemSetInt64(pVal,
-					       (i64) pExpr->u.iValue * negInt);
+					       (i64) pExpr->u.iValue * negInt, false);
 		} else {
 			zVal =
 			    sqlMPrintf(db, "%s%s", zNeg, pExpr->u.zToken);
-- 
2.17.1




More information about the Tarantool-patches mailing list