[tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c

imeevma at tarantool.org imeevma at tarantool.org
Tue Nov 27 22:25:43 MSK 2018


It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.

Needed for #3505
---
 src/box/execute.c | 105 ++++++++++++++++--------------------------------------
 src/box/execute.h |   7 ++--
 src/box/iproto.cc |  17 ++++++++-
 3 files changed, 50 insertions(+), 79 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index d73681d..9ba9e66 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -43,6 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
+#include "mpstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	int size = mp_sizeof_uint(IPROTO_METADATA) +
-		   mp_sizeof_array(column_count);
-	char *pos = (char *) obuf_alloc(out, size);
-	if (pos == NULL) {
-		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-		return -1;
-	}
-	pos = mp_encode_uint(pos, IPROTO_METADATA);
-	pos = mp_encode_array(pos, column_count);
+	mpstream_encode_uint(stream, IPROTO_METADATA);
+	mpstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		assert(name != NULL);
 		if (type == NULL)
 			type = "UNKNOWN";
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
-	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		int size = mp_sizeof_uint(IPROTO_DATA) +
-			   mp_sizeof_array(port_tuple->size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_DATA);
-		pos = mp_encode_array(pos, port_tuple->size);
-		/*
-		 * Just like SELECT, SQL uses output format compatible
-		 * with Tarantool 1.6
-		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		mpstream_encode_uint(stream, IPROTO_DATA);
+		mpstream_flush(stream);
+		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
-		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
-			   mp_sizeof_map(map_size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
-		pos = mp_encode_map(pos, map_size);
+		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-		       mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
-		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
-			goto err;
 		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
 
 extern const char *sql_info_key_strs[];
 
-struct obuf;
 struct region;
 struct sql_bind;
 struct xrow_header;
+struct mpstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
 #include "rmean.h"
 #include "execute.h"
 #include "errinj.h"
+#include "mpstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4





More information about the Tarantool-patches mailing list