[tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c

imeevma at tarantool.org imeevma at tarantool.org
Sat Nov 17 17:03:56 MSK 2018


It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.

Needed for #3505
---
 src/box/execute.c | 92 +++++++++++++++++++++++--------------------------------
 src/box/execute.h |  6 ++--
 src/box/iproto.cc | 16 +++++++++-
 3 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index 285ae2e..5c2ec19 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -454,23 +453,21 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+	char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
 	if (pos == NULL) {
-		diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "obuf_alloc",
+		diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "mpstream_reserve",
 			 "pos");
 		return -1;
 	}
 	pos = mp_store_u8(pos, IPROTO_METADATA);
 	pos = mp_store_u8(pos, 0xdd);
 	pos = mp_store_u32(pos, column_count);
+	mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -479,18 +476,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		 * column_name simply returns them.
 		 */
 		assert(name != NULL);
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -549,81 +539,77 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
 	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+		char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
 		if (pos == NULL) {
 			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
-				 "obuf_alloc", "pos");
+				 "mpstream_reserve", "pos");
 			goto err;
 		}
 		pos = mp_store_u8(pos, IPROTO_DATA);
 		pos = mp_store_u8(pos, 0xdd);
 		pos = mp_store_u32(pos, port_tuple->size);
+		mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+		mpstream_flush(stream);
 		/*
 		 * Just like SELECT, SQL uses output format compatible
 		 * with Tarantool 1.6
 		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
 		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
-				 "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_store_u8(pos, IPROTO_SQL_INFO);
-		pos = mp_store_u8(pos, 0xdf);
-		pos = mp_store_u32(pos, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-			   mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
 		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
+		char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
+		if (pos == NULL) {
+			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
+				 "mpstream_reserve", "pos");
 			goto err;
 		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+		pos = mp_store_u8(pos, IPROTO_SQL_INFO);
+		pos = mp_store_u8(pos, 0xdf);
+		pos = mp_store_u32(pos, map_size);
+		mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 2a242e8..7eb2121 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -34,6 +34,7 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include "port.h"
+#include "mpstream.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -105,13 +106,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse MessagePack array of SQL parameters and store a result
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 5fb2aff..83b268b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1634,12 +1634,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1659,10 +1667,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4





More information about the Tarantool-patches mailing list