Tarantool development patches archive
 help / color / mirror / Atom feed
From: imeevma@tarantool.org
To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org
Subject: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c
Date: Sat, 17 Nov 2018 17:03:56 +0300	[thread overview]
Message-ID: <502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com> (raw)
In-Reply-To: <cover.1542460773.git.imeevma@gmail.com>

It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.

Needed for #3505
---
 src/box/execute.c | 92 +++++++++++++++++++++++--------------------------------
 src/box/execute.h |  6 ++--
 src/box/iproto.cc | 16 +++++++++-
 3 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index 285ae2e..5c2ec19 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -454,23 +453,21 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+	char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
 	if (pos == NULL) {
-		diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "obuf_alloc",
+		diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "mpstream_reserve",
 			 "pos");
 		return -1;
 	}
 	pos = mp_store_u8(pos, IPROTO_METADATA);
 	pos = mp_store_u8(pos, 0xdd);
 	pos = mp_store_u32(pos, column_count);
+	mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -479,18 +476,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		 * column_name simply returns them.
 		 */
 		assert(name != NULL);
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -549,81 +539,77 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
 	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+		char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
 		if (pos == NULL) {
 			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
-				 "obuf_alloc", "pos");
+				 "mpstream_reserve", "pos");
 			goto err;
 		}
 		pos = mp_store_u8(pos, IPROTO_DATA);
 		pos = mp_store_u8(pos, 0xdd);
 		pos = mp_store_u32(pos, port_tuple->size);
+		mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+		mpstream_flush(stream);
 		/*
 		 * Just like SELECT, SQL uses output format compatible
 		 * with Tarantool 1.6
 		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
 		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
-				 "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_store_u8(pos, IPROTO_SQL_INFO);
-		pos = mp_store_u8(pos, 0xdf);
-		pos = mp_store_u32(pos, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-			   mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
 		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
+		char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
+		if (pos == NULL) {
+			diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
+				 "mpstream_reserve", "pos");
 			goto err;
 		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+		pos = mp_store_u8(pos, IPROTO_SQL_INFO);
+		pos = mp_store_u8(pos, 0xdf);
+		pos = mp_store_u32(pos, map_size);
+		mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 2a242e8..7eb2121 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -34,6 +34,7 @@
 #include <stdint.h>
 #include <stdbool.h>
 #include "port.h"
+#include "mpstream.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -105,13 +106,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse MessagePack array of SQL parameters and store a result
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 5fb2aff..83b268b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1634,12 +1634,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1659,10 +1667,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4

  parent reply	other threads:[~2018-11-17 14:03 UTC|newest]

Thread overview: 18+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-11-17 14:03 [tarantool-patches] [PATCH v1 00/10] sql: remove box.sql.execute imeevma
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 01/10] box: store sql text and length in sql_request imeevma
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 02/10] iproto: remove iproto functions from execute.c imeevma
2018-11-19 17:58   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:03 ` imeevma [this message]
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 04/10] sql: create interface vstream imeevma
2018-11-19 17:58   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 05/10] sql: EXPLAIN through net.box leads to SEGFAULT imeevma
2018-11-19 13:47   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 06/10] sql: SELECT from system spaces returns unpacked msgpack imeevma
2018-11-19 13:48   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 07/10] sql: too many autogenerated ids leads to SEGFAULT imeevma
2018-11-19 13:47   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 08/10] box: add method dump_lua to port imeevma
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 09/10] lua: create vstream implementation for Lua imeevma
2018-11-19 17:58   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 10/10] sql: check new box.sql.execute() imeevma
2018-11-19 12:54 ` [tarantool-patches] Re: [PATCH v1 00/10] sql: remove box.sql.execute Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com \
    --to=imeevma@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox