[PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c

imeevma at tarantool.org imeevma at tarantool.org
Sun Dec 2 14:03:21 MSK 2018


This patch is the most dubious patch due to the implicit use of
mpstream as a stream for obuf. Discussion and patch below.

It is worth noting that in this version of the patch nothing
changes. At this point there is no approved solution for this
patch.


On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma at tarantool.org wrote:
>>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>  }
>>>>>>  
>>>>>>  int
>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>> +		  struct mpstream *stream)
>>>>>>  {
>>>>>>  	sqlite3 *db = sql_get();
>>>>>>  	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>> -	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>  	int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>  	if (column_count > 0) {
>>>>>> -		if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>> +		if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>  err:
>>>>>>  			rc = -1;
>>>>>>  			goto finish;
>>>>>>  		}
>>>>>>  		*keys = 2;
>>>>>> -		int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>> -			   mp_sizeof_array(port_tuple->size);
>>>>>> -		char *pos = (char *) obuf_alloc(out, size);
>>>>>> -		if (pos == NULL) {
>>>>>> -			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>> -			goto err;
>>>>>> -		}
>>>>>> -		pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>> -		pos = mp_encode_array(pos, port_tuple->size);
>>>>>> -		/*
>>>>>> -		 * Just like SELECT, SQL uses output format compatible
>>>>>> -		 * with Tarantool 1.6
>>>>>> -		 */
>>>>>> -		if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>> +		mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>> +		mpstream_flush(stream);
>>>>>> +		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>
>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>
>>>>> And when you introduce vstream later, you simply move this code to
>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>> used in mpstream to port_dump instead of obuf?
>>>>
>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>> and is used for box.select.
>>>>
>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>
>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>> in both cases we not just dump in a specific format, but to a concrete
>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>> by obuf destination.
>>>
>>> There's port_dump_plain, which dumps port contents in a specific format.
>>> So port_dump_obuf would look ambiguous.
>>>
>>>>
>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>> not hot.
>>>
>>> That would interconnect port and mpstream, make them dependent on each
>>> other. I don't think that would be good.
>>>
>>>>
>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>> box.select and iproto_call will use port_dump_obuf.
>>>>
>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>
>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>> because it's very limited in its applicability. People already prefer to
>>> use box.call over box.insert/select/etc over iproto, and with the
>>> appearance of box.execute they are likely to stop using plain box.select
>>> at all.
>>>
>>> That said, personally I would try to pass reserve/alloc methods to port,
>>> see how it goes.
>>>
>>
>> I do not see a reason to slow down box.select if we can don't do it.
>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>> functions including select.
>
> box.select called from Lua code doesn't use port_dump_msgpack.
>
>>
>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>> port_dump_obuf and add port_dump_msgpack which does not depend on
>> mpstream and takes alloc/reserve/ctx directly.
>
> Better call the optimized version (the one without callbacks)
> port_dump_msgpack_obuf to avoid confusion IMO.
>
> Anyway, I'd try to run cbench to see if it really perfomrs better
> than the one using callbacks.

Patch:

commit a857e1129f9e38bde8c5c6037149887f695ec6dc
Author: Mergen Imeev <imeevma at gmail.com>
Date:   Fri Nov 9 22:31:07 2018 +0300

    iproto: replace obuf by mpstream in execute.c
    
    It is useful to replace obuf by mpstream. It allows us to design
    vstream. Interface vstream will be used as universal interface to
    dump result of SQL queries.
    
    Needed for #3505

diff --git a/src/box/execute.c b/src/box/execute.c
index 3a6cadf..0d266dd 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
 #include "sql/sqliteLimit.h"
 #include "errcode.h"
 #include "small/region.h"
-#include "small/obuf.h"
 #include "diag.h"
 #include "sql.h"
 #include "xrow.h"
@@ -43,6 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
+#include "mpstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	int size = mp_sizeof_uint(IPROTO_METADATA) +
-		   mp_sizeof_array(column_count);
-	char *pos = (char *) obuf_alloc(out, size);
-	if (pos == NULL) {
-		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-		return -1;
-	}
-	pos = mp_encode_uint(pos, IPROTO_METADATA);
-	pos = mp_encode_array(pos, column_count);
+	mpstream_encode_uint(stream, IPROTO_METADATA);
+	mpstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
-		size_t size = mp_sizeof_map(2) +
-			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
-			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
 		/*
@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
 		 * column_name simply returns them.
 		 */
 		assert(name != NULL);
-		size += mp_sizeof_str(strlen(name));
-		size += mp_sizeof_str(strlen(type));
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			return -1;
-		}
-		pos = mp_encode_map(pos, 2);
-		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
-		pos = mp_encode_str(pos, name, strlen(name));
-		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
-		pos = mp_encode_str(pos, type, strlen(type));
+		mpstream_encode_map(stream, 2);
+		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+		mpstream_encode_str(stream, name);
+		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+		mpstream_encode_str(stream, type);
 	}
 	return 0;
 }
@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
 }
 
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
-	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
 	int rc = 0, column_count = sqlite3_column_count(stmt);
 	if (column_count > 0) {
-		if (sql_get_description(stmt, out, column_count) != 0) {
+		if (sql_get_description(stmt, stream, column_count) != 0) {
 err:
 			rc = -1;
 			goto finish;
 		}
 		*keys = 2;
-		int size = mp_sizeof_uint(IPROTO_DATA) +
-			   mp_sizeof_array(port_tuple->size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_DATA);
-		pos = mp_encode_array(pos, port_tuple->size);
-		/*
-		 * Just like SELECT, SQL uses output format compatible
-		 * with Tarantool 1.6
-		 */
-		if (port_dump_msgpack_16(&response->port, out) < 0) {
+		mpstream_encode_uint(stream, IPROTO_DATA);
+		mpstream_flush(stream);
+		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
+		mpstream_reset(stream);
 	} else {
 		*keys = 1;
-		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
-			   mp_sizeof_map(map_size);
-		char *pos = (char *) obuf_alloc(out, size);
-		if (pos == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-			goto err;
-		}
-		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
-		pos = mp_encode_map(pos, map_size);
+		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
-		int changes = db->nChange;
-		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
-		       mp_sizeof_uint(changes);
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
-			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				size += id_entry->id >= 0 ?
-					mp_sizeof_uint(id_entry->id) :
-					mp_sizeof_int(id_entry->id);
+			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
-			}
-			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
-				mp_sizeof_array(id_count);
-		}
-		char *buf = obuf_alloc(out, size);
-		if (buf == NULL) {
-			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
-			goto err;
 		}
-		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
-		buf = mp_encode_uint(buf, changes);
+
+		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+		mpstream_encode_uint(stream, db->nChange);
 		if (!stailq_empty(autoinc_id_list)) {
-			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
-			buf = mp_encode_array(buf, id_count);
+			mpstream_encode_uint(stream,
+					     SQL_INFO_AUTOINCREMENT_IDS);
+			mpstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
-				buf = id_entry->id >= 0 ?
-				      mp_encode_uint(buf, id_entry->id) :
-				      mp_encode_int(buf, id_entry->id);
+				int64_t value = id_entry->id;
+				if (id_entry->id >= 0)
+					mpstream_encode_uint(stream, value);
+				else
+					mpstream_encode_int(stream, value);
 			}
 		}
 	}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
 
 extern const char *sql_info_key_strs[];
 
-struct obuf;
 struct region;
 struct sql_bind;
 struct xrow_header;
+struct mpstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
  * +----------------------------------------------+
  * @param response EXECUTE response.
  * @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
  */
 int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+		  struct mpstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
 #include "rmean.h"
 #include "execute.h"
 #include "errinj.h"
+#include "mpstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
 	tx_reply_error(msg);
 }
 
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+	*(bool *)error_ctx = true;
+}
+
 static void
 tx_process_sql(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	if (sql_response_dump(&response, &keys, out) != 0) {
+	struct mpstream stream;
+	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		      set_encode_error, &is_error);
+	if (is_error)
+		goto error;
+	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
+	mpstream_flush(&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
-- 
2.7.4




More information about the Tarantool-patches mailing list