[tarantool-patches] [PATCH v2 4/5] sql: encode tuples with mpstream on Vdbe run

Kirill Shcherbatov kshcherbatov at tarantool.org
Sat Dec 29 13:49:01 MSK 2018


Introduced new sql_vdbe_mem_encode_tuple and
mpstream_encode_vdbe_mem routines to perform Vdbe memory to
msgpack encoding on region without previous size estimation call.
Got rid off sqlite3VdbeMsgpackRecordLen and
sqlite3VdbeMsgpackRecordPut functions that became useless. This
approach also resolves problem with invalid size estimation #3035
because it is not required anymore.

Needed for #3850
Closes #3035
---
 src/box/sql.c         | 27 ++++++++--------
 src/box/sql/vdbe.c    | 31 +++++++++++--------
 src/box/sql/vdbeInt.h | 26 ++++++++++++++--
 src/box/sql/vdbeaux.c | 71 -------------------------------------------
 src/box/sql/vdbemem.c | 70 ++++++++++++++++++++++++++++++++++++++++++
 src/mpstream.c        | 30 ++++++++++++++++++
 src/mpstream.h        | 11 +++++++
 7 files changed, 167 insertions(+), 99 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index 8c7607d84..081a038f1 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -34,6 +34,7 @@
 #include "sql/sqliteInt.h"
 #include "sql/tarantoolInt.h"
 #include "sql/vdbeInt.h"
+#include "mpstream.h"
 
 #include "index.h"
 #include <info.h>
@@ -268,15 +269,20 @@ int tarantoolSqlite3Previous(BtCursor *pCur, int *pRes)
 int tarantoolSqlite3MovetoUnpacked(BtCursor *pCur, UnpackedRecord *pIdxKey,
 				   int *pRes)
 {
-	int rc, res_success;
-	size_t ks;
-
-	ks = sqlite3VdbeMsgpackRecordLen(pIdxKey->aMem, pIdxKey->nField);
-	if (key_alloc(pCur, ks) != 0)
+	struct region *region = &fiber()->gc;
+	size_t used = region_used(region);
+	uint32_t tuple_size;
+	const char *tuple =
+		sql_vdbe_mem_encode_tuple(pIdxKey->aMem, pIdxKey->nField,
+					  &tuple_size, region);
+	if (tuple == NULL)
 		return SQL_TARANTOOL_ERROR;
-	sqlite3VdbeMsgpackRecordPut((u8 *)pCur->key, pIdxKey->aMem,
-				    pIdxKey->nField);
+	if (key_alloc(pCur, tuple_size) != 0)
+		return SQL_TARANTOOL_ERROR;
+	memcpy(pCur->key, tuple, tuple_size);
+	region_truncate(region, used);
 
+	int rc, res_success;
 	switch (pIdxKey->opcode) {
 	default:
 	  /*  "Unexpected opcode" */
@@ -697,13 +703,6 @@ rename_fail:
 	return SQL_TARANTOOL_ERROR;
 }
 
-/** Callback to forward and error from mpstream methods. */
-static void
-set_encode_error(void *error_ctx)
-{
-	*(bool *)error_ctx = true;
-}
-
 int
 sql_rename_table(uint32_t space_id, const char *new_name)
 {
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index e6b413c70..b8faa8f45 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -49,6 +49,7 @@
 #include "tarantoolInt.h"
 
 #include "msgpuck/msgpuck.h"
+#include "mpstream.h"
 
 #include "box/schema.h"
 #include "box/space.h"
@@ -2848,7 +2849,6 @@ case OP_Affinity: {
  */
 case OP_MakeRecord: {
 	Mem *pRec;             /* The new record */
-	i64 nByte;             /* Data space required for this record */
 	Mem *pData0;           /* First field to be combined into the record */
 	Mem MAYBE_UNUSED *pLast;  /* Last field of the record */
 	int nField;            /* Number of fields in the record */
@@ -2894,14 +2894,17 @@ case OP_MakeRecord: {
 		}while( zAffinity[0]);
 	}
 
-	/* Loop through the elements that will make up the record to figure
-	 * out how much space is required for the new record.
-	 */
-	nByte = sqlite3VdbeMsgpackRecordLen(pData0, nField);
-
-	if (nByte>db->aLimit[SQLITE_LIMIT_LENGTH]) {
-		goto too_big;
+	struct region *region = &fiber()->gc;
+	size_t used = region_used(region);
+	uint32_t tuple_size;
+	char *tuple =
+		sql_vdbe_mem_encode_tuple(pData0, nField, &tuple_size, region);
+	if (tuple == NULL) {
+		rc = SQL_TARANTOOL_ERROR;
+		goto abort_due_to_error;
 	}
+	if ((int64_t)tuple_size > db->aLimit[SQLITE_LIMIT_LENGTH])
+		goto too_big;
 
 	/* In case of ephemeral space, it is possible to save some memory
 	 * allocating one by ordinary malloc: instead of cutting pieces
@@ -2915,21 +2918,25 @@ case OP_MakeRecord: {
 	 * routine.
 	 */
 	if (bIsEphemeral) {
-		rc = sqlite3VdbeMemClearAndResize(pOut, nByte);
+		rc = sqlite3VdbeMemClearAndResize(pOut, tuple_size);
 		pOut->flags = MEM_Blob;
+		pOut->n = tuple_size;
+		memcpy(pOut->z, tuple, tuple_size);
+		region_truncate(region, used);
 	} else {
 		/* Allocate memory on the region for the tuple
 		 * to be passed to Tarantool. Before that, make
 		 * sure previously allocated memory has gone.
 		 */
 		sqlite3VdbeMemRelease(pOut);
-		rc = sql_vdbe_mem_alloc_region(pOut, nByte);
+		pOut->flags = MEM_Blob | MEM_Ephem;
+		pOut->n = tuple_size;
+		pOut->z = tuple;
 	}
 	if (rc)
 		goto no_mem;
-	/* Write the record */
+	assert(sqlite3VdbeCheckMemInvariants(pOut));
 	assert(pOp->p3>0 && pOp->p3<=(p->nMem+1 - p->nCursor));
-	pOut->n = sqlite3VdbeMsgpackRecordPut((u8 *)pOut->z, pData0, nField);
 	REGISTER_TRACE(pOp->p3, pOut);
 	UPDATE_MAX_BLOBSIZE(pOut);
 	break;
diff --git a/src/box/sql/vdbeInt.h b/src/box/sql/vdbeInt.h
index 50bc35b2b..fcb47455b 100644
--- a/src/box/sql/vdbeInt.h
+++ b/src/box/sql/vdbeInt.h
@@ -526,8 +526,6 @@ int sqlite3VdbeMemExpandBlob(Mem *);
 #define ExpandBlob(P) SQLITE_OK
 #endif
 
-i64 sqlite3VdbeMsgpackRecordLen(Mem * pMem, u32 n);
-u32 sqlite3VdbeMsgpackRecordPut(u8 * pBuf, Mem * pMem, u32 n);
 /**
  * Perform comparison of two keys: one is packed and one is not.
  *
@@ -552,4 +550,28 @@ int sqlite3VdbeRecordCompareMsgpack(const void *key1,
 				    struct UnpackedRecord *key2);
 u32 sqlite3VdbeMsgpackGet(const unsigned char *buf, Mem * pMem);
 
+struct mpstream;
+struct region;
+
+/** Callback to forward and error from mpstream methods. */
+static inline void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
+/**
+ * Perform encoding field_count Vdbe memory fields on region as
+ * msgpack array.
+ * @param fields The first Vdbe memory field to encode.
+ * @param field_count Count of fields to encode.
+ * @param[out] tuple_size Size of encoded tuple.
+ * @param region Region to use.
+ * @retval NULL on error, diag message is set.
+ * @retval no NULL tuple pointer on success.
+ */
+char *
+sql_vdbe_mem_encode_tuple(struct Mem *fields, uint32_t field_count,
+			  uint32_t *tuple_size, struct region *region);
+
 #endif				/* !defined(SQLITE_VDBEINT_H) */
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index d477662a4..6ef8c308e 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -3532,77 +3532,6 @@ sqlite3VdbeSetVarmask(Vdbe * v, int iVar)
 	}
 }
 
-i64
-sqlite3VdbeMsgpackRecordLen(Mem * pRec, u32 n)
-{
-	i64 nByte = 5;		/* largest array header */
-	Mem *pEnd = pRec + n;
-	assert(n != 0);
-	do {
-		assert(memIsValid(pRec));
-		if (pRec->flags & (MEM_Null | MEM_Bool)) {
-			nByte += 1;
-		} else if (pRec->flags & (MEM_Int | MEM_Real)) {
-			nByte += 9;
-		} else {
-			nByte += 5 + (u32) pRec->n;
-			if (pRec->flags & MEM_Zero) {
-				nByte += pRec->u.nZero;
-			}
-		}
-	} while ((++pRec) != pEnd);
-	return nByte;
-}
-
-u32
-sqlite3VdbeMsgpackRecordPut(u8 * pBuf, Mem * pRec, u32 n)
-{
-	char *zNewRecord = mp_encode_array((char *)pBuf, n);
-	Mem *pEnd = pRec + n;
-	assert(n != 0);
-	do {
-		assert(memIsValid(pRec));
-		if (pRec->flags & MEM_Null) {
-			zNewRecord = mp_encode_nil(zNewRecord);
-		} else if (pRec->flags & MEM_Real) {
-			zNewRecord = mp_encode_double(zNewRecord, pRec->u.r);
-		} else if (pRec->flags & MEM_Int) {
-			if (pRec->u.i >= 0) {
-				zNewRecord =
-				    mp_encode_uint(zNewRecord, pRec->u.i);
-			} else {
-				zNewRecord =
-				    mp_encode_int(zNewRecord, pRec->u.i);
-			}
-		} else if (pRec->flags & MEM_Str) {
-			zNewRecord =
-			    mp_encode_str(zNewRecord, pRec->z, pRec->n);
-		} else if (pRec->flags & MEM_Bool) {
-			zNewRecord =
-			    mp_encode_bool(zNewRecord, pRec->u.b);
-		} else {
-			/* Emit BIN header iff the BLOB doesn't store MsgPack content */
-			if ((pRec->flags & MEM_Subtype) == 0
-			    || pRec->subtype != SQL_SUBTYPE_MSGPACK) {
-				zNewRecord =
-				    mp_encode_binl(zNewRecord,
-						   pRec->n +
-						   ((pRec->
-						     flags & MEM_Zero) ? pRec->
-						    u.nZero : 0)
-				    );
-			}
-			memcpy(zNewRecord, pRec->z, pRec->n);
-			zNewRecord += pRec->n;
-			if (pRec->flags & MEM_Zero) {
-				memset(zNewRecord, 0, pRec->u.nZero);
-				zNewRecord += pRec->u.nZero;
-			}
-		}
-	} while ((++pRec) != pEnd);
-	return (u32) (zNewRecord - (char *)pBuf);
-}
-
 int
 sqlite3VdbeCompareMsgpack(const char **key1,
 			  struct UnpackedRecord *unpacked, int key2_idx)
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 22beba8be..3f171766e 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -40,6 +40,8 @@
 #include "vdbeInt.h"
 #include "tarantoolInt.h"
 #include "box/schema.h"
+#include "box/tuple.h"
+#include "mpstream.h"
 
 #ifdef SQLITE_DEBUG
 /*
@@ -1714,3 +1716,71 @@ sqlite3ValueBytes(sqlite3_value * pVal)
 		return 0;
 	return valueBytes(pVal);
 }
+
+/**
+ * Perform encoding memory variable to stream.
+ * @param stream Initialized mpstream encoder object.
+ * @param var Vdbe memory variable to encode with stream.
+ */
+static void
+mpstream_encode_vdbe_mem(struct mpstream *stream, struct Mem *var)
+{
+	assert(memIsValid(var));
+	if (var->flags & MEM_Null) {
+		mpstream_encode_nil(stream);
+	} else if (var->flags & MEM_Real) {
+		mpstream_encode_double(stream, var->u.r);
+	} else if (var->flags & MEM_Int) {
+		if (var->u.i >= 0)
+			mpstream_encode_uint(stream, var->u.i);
+		else
+			mpstream_encode_int(stream, var->u.i);
+	} else if (var->flags & MEM_Str) {
+		mpstream_encode_strn(stream, var->z, var->n);
+	} else if (var->flags & MEM_Bool) {
+		mpstream_encode_bool(stream, var->u.b);
+	} else {
+		/*
+		 * Emit BIN header iff the BLOB doesn't store
+		 * MsgPack content.
+		 */
+		if ((var->flags & MEM_Subtype) == 0 ||
+		     var->subtype != SQL_SUBTYPE_MSGPACK) {
+			uint32_t binl = var->n +
+					((var->flags & MEM_Zero) ?
+					var->u.nZero : 0);
+			mpstream_encode_binl(stream, binl);
+		}
+		mpstream_memcpy(stream, var->z, var->n);
+		if (var->flags & MEM_Zero)
+			mpstream_memset(stream, 0, var->u.nZero);
+	}
+}
+
+char *
+sql_vdbe_mem_encode_tuple(struct Mem *fields, uint32_t field_count,
+			  uint32_t *tuple_size, struct region *region)
+{
+	size_t used = region_used(region);
+	bool is_error = false;
+	struct mpstream stream;
+	mpstream_init(&stream, region, region_reserve_cb, region_alloc_cb,
+		      set_encode_error, &is_error);
+	mpstream_encode_array(&stream, field_count);
+	for (struct Mem *field = fields; field < fields + field_count; field++)
+		mpstream_encode_vdbe_mem(&stream, field);
+	mpstream_flush(&stream);
+	if (is_error) {
+		diag_set(OutOfMemory, stream.pos - stream.buf,
+			 "mpstream_flush", "stream");
+		return NULL;
+	}
+	*tuple_size = region_used(region) - used;
+	char *tuple = region_join(region, *tuple_size);
+	if (tuple == NULL) {
+		diag_set(OutOfMemory, *tuple_size, "region_join", "tuple");
+		return NULL;
+	}
+	mp_tuple_assert(tuple, tuple + *tuple_size);
+	return tuple;
+}
diff --git a/src/mpstream.c b/src/mpstream.c
index e4f7950ba..8b7276ab1 100644
--- a/src/mpstream.c
+++ b/src/mpstream.c
@@ -175,3 +175,33 @@ mpstream_encode_bool(struct mpstream *stream, bool val)
     char *pos = mp_encode_bool(data, val);
     mpstream_advance(stream, pos - data);
 }
+
+void
+mpstream_encode_binl(struct mpstream *stream, uint32_t len)
+{
+	char *data = mpstream_reserve(stream, mp_sizeof_binl(len));
+	if (data == NULL)
+		return;
+	char *pos = mp_encode_binl(data, len);
+	mpstream_advance(stream, pos - data);
+}
+
+void
+mpstream_memcpy(struct mpstream *stream, const void *src, uint32_t n)
+{
+	char *data = mpstream_reserve(stream, n);
+	if (data == NULL)
+		return;
+	memcpy(data, src, n);
+	mpstream_advance(stream, n);
+}
+
+void
+mpstream_memset(struct mpstream *stream, int c, uint32_t n)
+{
+	char *data = mpstream_reserve(stream, n);
+	if (data == NULL)
+		return;
+	memset(data, c, n);
+	mpstream_advance(stream, n);
+}
diff --git a/src/mpstream.h b/src/mpstream.h
index e22d05241..8f86e0d09 100644
--- a/src/mpstream.h
+++ b/src/mpstream.h
@@ -133,6 +133,17 @@ mpstream_encode_nil(struct mpstream *stream);
 void
 mpstream_encode_bool(struct mpstream *stream, bool val);
 
+void
+mpstream_encode_binl(struct mpstream *stream, uint32_t len);
+
+/** Copies n bytes from memory area src to stream. */
+void
+mpstream_memcpy(struct mpstream *stream, const void *src, uint32_t n);
+
+/** Fills n stream bytes with the constant byte c. */
+void
+mpstream_memset(struct mpstream *stream, int c, uint32_t n);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
-- 
2.19.2





More information about the Tarantool-patches mailing list