Tarantool development patches archive
 help / color / mirror / Atom feed
From: imeevma@tarantool.org
To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org
Subject: [tarantool-patches] [PATCH v2 4/7] iproto: replace obuf by mpstream in execute.c
Date: Thu, 22 Nov 2018 22:10:59 +0300	[thread overview]
Message-ID: <73a6bdaa61ebd26734c704a2785e7669da507cd4.1542910674.git.imeevma@gmail.com> (raw)
In-Reply-To: <cover.1542910674.git.imeevma@gmail.com>

It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.

Needed for #3505
---
 src/box/execute.c | 100 +++++++++++++++++-------------------------------------
 src/box/execute.h |   6 ++--
 src/box/iproto.cc |  17 +++++++++-
 3 files changed, 51 insertions(+), 72 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index d73681d..1f0e5ab 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -43,6 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
+#include "mpstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	int size = mp_sizeof_uint(IPROTO_METADATA) +
-		   mp_sizeof_array(column_count);
-	char *pos = (char *) obuf_alloc(out, size);
-	if (pos == NULL) {
-		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-		return -1;
-	}
-	pos = mp_encode_uint(pos, IPROTO_METADATA);
-	pos = mp_encode_array(pos, column_count);
+	mpstream_encode_uint(stream, IPROTO_METADATA);
+	mpstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		assert(name != NULL);
 		if (type == NULL)
 			type = "UNKNOWN";
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -627,81 +610,60 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
 	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		int size = mp_sizeof_uint(IPROTO_DATA) +
-			   mp_sizeof_array(port_tuple->size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_DATA);
-		pos = mp_encode_array(pos, port_tuple->size);
+		mpstream_encode_uint(stream, IPROTO_DATA);
+		mpstream_encode_array(stream, port_tuple->size);
+		mpstream_flush(stream);
 		/*
 		 * Just like SELECT, SQL uses output format compatible
 		 * with Tarantool 1.6
 		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
 		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
-			   mp_sizeof_map(map_size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
-		pos = mp_encode_map(pos, map_size);
+		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-		       mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
 		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
-			goto err;
-		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..940f3a3 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -52,6 +52,7 @@ struct obuf;
 struct region;
 struct sql_bind;
 struct xrow_header;
+struct mpstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -105,13 +106,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
 #include "rmean.h"
 #include "execute.h"
 #include "errinj.h"
+#include "mpstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4

  parent reply	other threads:[~2018-11-22 19:11 UTC|newest]

Thread overview: 14+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-11-22 19:10 [tarantool-patches] [PATCH v2 0/7] Remove box.sql.execute imeevma
2018-11-22 19:10 ` [tarantool-patches] [PATCH v2 1/7] box: store sql text and length in sql_request imeevma
2018-11-22 19:10 ` [tarantool-patches] [PATCH v2 2/7] box: add method dump_lua to port imeevma
2018-11-22 21:49   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-27 19:25     ` Imeev Mergen
2018-11-22 19:10 ` [tarantool-patches] [PATCH v2 3/7] iproto: remove iproto functions from execute.c imeevma
2018-11-22 19:10 ` imeevma [this message]
2018-11-22 19:11 ` [tarantool-patches] [PATCH v2 5/7] sql: create interface vstream imeevma
2018-11-22 21:49   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-27 19:25     ` Imeev Mergen
2018-11-22 19:11 ` [tarantool-patches] [PATCH v2 6/7] lua: create vstream implementation for Lua imeevma
2018-11-22 21:49   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-27 19:25     ` Imeev Mergen
2018-11-22 19:11 ` [tarantool-patches] [PATCH v2 7/7] sql: check new box.sql.execute() imeevma

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=73a6bdaa61ebd26734c704a2785e7669da507cd4.1542910674.git.imeevma@gmail.com \
    --to=imeevma@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [tarantool-patches] [PATCH v2 4/7] iproto: replace obuf by mpstream in execute.c' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox