Tarantool development patches archive
 help / color / mirror / Atom feed
From: imeevma@tarantool.org
To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org,
	vdavydov.dev@gmail.com, kostja@tarantool.org
Subject: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
Date: Sun,  2 Dec 2018 14:03:21 +0300	[thread overview]
Message-ID: <a857e1129f9e38bde8c5c6037149887f695ec6dc.1543747066.git.imeevma@gmail.com> (raw)
In-Reply-To: <cover.1543604148.git.imeevma@gmail.com>

This patch is the most dubious patch due to the implicit use of
mpstream as a stream for obuf. Discussion and patch below.

It is worth noting that in this version of the patch nothing
changes. At this point there is no approved solution for this
patch.


On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>  }
>>>>>>  
>>>>>>  int
>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>> +		  struct mpstream *stream)
>>>>>>  {
>>>>>>  	sqlite3 *db = sql_get();
>>>>>>  	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>> -	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>  	int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>  	if (column_count > 0) {
>>>>>> -		if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>> +		if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>  err:
>>>>>>  			rc = -1;
>>>>>>  			goto finish;
>>>>>>  		}
>>>>>>  		*keys = 2;
>>>>>> -		int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>> -			   mp_sizeof_array(port_tuple->size);
>>>>>> -		char *pos = (char *) obuf_alloc(out, size);
>>>>>> -		if (pos == NULL) {
>>>>>> -			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>> -			goto err;
>>>>>> -		}
>>>>>> -		pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>> -		pos = mp_encode_array(pos, port_tuple->size);
>>>>>> -		/*
>>>>>> -		 * Just like SELECT, SQL uses output format compatible
>>>>>> -		 * with Tarantool 1.6
>>>>>> -		 */
>>>>>> -		if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>> +		mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>> +		mpstream_flush(stream);
>>>>>> +		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>
>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>
>>>>> And when you introduce vstream later, you simply move this code to
>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>> used in mpstream to port_dump instead of obuf?
>>>>
>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>> and is used for box.select.
>>>>
>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>
>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>> in both cases we not just dump in a specific format, but to a concrete
>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>> by obuf destination.
>>>
>>> There's port_dump_plain, which dumps port contents in a specific format.
>>> So port_dump_obuf would look ambiguous.
>>>
>>>>
>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>> not hot.
>>>
>>> That would interconnect port and mpstream, make them dependent on each
>>> other. I don't think that would be good.
>>>
>>>>
>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>> box.select and iproto_call will use port_dump_obuf.
>>>>
>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>
>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>> because it's very limited in its applicability. People already prefer to
>>> use box.call over box.insert/select/etc over iproto, and with the
>>> appearance of box.execute they are likely to stop using plain box.select
>>> at all.
>>>
>>> That said, personally I would try to pass reserve/alloc methods to port,
>>> see how it goes.
>>>
>>
>> I do not see a reason to slow down box.select if we can don't do it.
>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>> functions including select.
>
> box.select called from Lua code doesn't use port_dump_msgpack.
>
>>
>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>> port_dump_obuf and add port_dump_msgpack which does not depend on
>> mpstream and takes alloc/reserve/ctx directly.
>
> Better call the optimized version (the one without callbacks)
> port_dump_msgpack_obuf to avoid confusion IMO.
>
> Anyway, I'd try to run cbench to see if it really perfomrs better
> than the one using callbacks.

Patch:

commit a857e1129f9e38bde8c5c6037149887f695ec6dc
Author: Mergen Imeev <imeevma@gmail.com>
Date:   Fri Nov 9 22:31:07 2018 +0300

    iproto: replace obuf by mpstream in execute.c
    
    It is useful to replace obuf by mpstream. It allows us to design
    vstream. Interface vstream will be used as universal interface to
    dump result of SQL queries.
    
    Needed for #3505

diff --git a/src/box/execute.c b/src/box/execute.c
index 3a6cadf..0d266dd 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -43,6 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
+#include "mpstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	int size = mp_sizeof_uint(IPROTO_METADATA) +
-		   mp_sizeof_array(column_count);
-	char *pos = (char *) obuf_alloc(out, size);
-	if (pos == NULL) {
-		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-		return -1;
-	}
-	pos = mp_encode_uint(pos, IPROTO_METADATA);
-	pos = mp_encode_array(pos, column_count);
+	mpstream_encode_uint(stream, IPROTO_METADATA);
+	mpstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		 * column_name simply returns them.
 		 */
 		assert(name != NULL);
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
-	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		int size = mp_sizeof_uint(IPROTO_DATA) +
-			   mp_sizeof_array(port_tuple->size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_DATA);
-		pos = mp_encode_array(pos, port_tuple->size);
-		/*
-		 * Just like SELECT, SQL uses output format compatible
-		 * with Tarantool 1.6
-		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		mpstream_encode_uint(stream, IPROTO_DATA);
+		mpstream_flush(stream);
+		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
-		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
-			   mp_sizeof_map(map_size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
-		pos = mp_encode_map(pos, map_size);
+		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-		       mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
-		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
-			goto err;
 		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
 
 extern const char *sql_info_key_strs[];
 
-struct obuf;
 struct region;
 struct sql_bind;
 struct xrow_header;
+struct mpstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
 #include "rmean.h"
 #include "execute.h"
 #include "errinj.h"
+#include "mpstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4

  parent reply	other threads:[~2018-12-02 11:03 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
2018-11-30 19:01 ` [PATCH v4 1/5] box: move port to src/ imeevma
2018-12-03  9:22   ` Vladimir Davydov
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 4/5] lua: create vstream implementation for Lua imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 5/5] sql: check new box.sql.execute() imeevma
2018-12-02 11:03 ` imeevma [this message]
2018-12-03 15:21   ` [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c Vladimir Davydov
2018-12-03 20:48     ` [tarantool-patches] " Vladislav Shpilevoy
2018-12-04  8:26       ` Vladimir Davydov
2018-12-04 11:28         ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=a857e1129f9e38bde8c5c6037149887f695ec6dc.1543747066.git.imeevma@gmail.com \
    --to=imeevma@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox