Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org, imeevma@tarantool.org,
	Vladimir Davydov <vdavydov.dev@gmail.com>
Subject: Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c
Date: Wed, 28 Nov 2018 16:10:55 +0300	[thread overview]
Message-ID: <f11a261e-4364-bd27-5ba0-5f409b6abb41@tarantool.org> (raw)
In-Reply-To: <33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com>

Hi! Thanks a lot for a great work on fixes!

This patch and all the previous ones are ok to me.
Vova, please, review the first 4 commits.

On 27/11/2018 22:25, imeevma@tarantool.org wrote:
> It is useful to replace obuf by mpstream. It allows us to design
> vstream. Interface vstream will be used as universal interface to
> dump result of SQL queries.
> 
> Needed for #3505
> ---
>   src/box/execute.c | 105 ++++++++++++++++--------------------------------------
>   src/box/execute.h |   7 ++--
>   src/box/iproto.cc |  17 ++++++++-
>   3 files changed, 50 insertions(+), 79 deletions(-)
> 
> diff --git a/src/box/execute.c b/src/box/execute.c
> index d73681d..9ba9e66 100644
> --- a/src/box/execute.c
> +++ b/src/box/execute.c
> @@ -35,7 +35,6 @@
>   #include "sql/sqliteLimit.h"
>   #include "errcode.h"
>   #include "small/region.h"
> -#include "small/obuf.h"
>   #include "diag.h"
>   #include "sql.h"
>   #include "xrow.h"
> @@ -43,6 +42,7 @@
>   #include "port.h"
>   #include "tuple.h"
>   #include "sql/vdbe.h"
> +#include "mpstream.h"
>   
>   const char *sql_type_strs[] = {
>   	NULL,
> @@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
>    * @retval -1 Client or memory error.
>    */
>   static inline int
> -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
> +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
>   		    int column_count)
>   {
>   	assert(column_count > 0);
> -	int size = mp_sizeof_uint(IPROTO_METADATA) +
> -		   mp_sizeof_array(column_count);
> -	char *pos = (char *) obuf_alloc(out, size);
> -	if (pos == NULL) {
> -		diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> -		return -1;
> -	}
> -	pos = mp_encode_uint(pos, IPROTO_METADATA);
> -	pos = mp_encode_array(pos, column_count);
> +	mpstream_encode_uint(stream, IPROTO_METADATA);
> +	mpstream_encode_array(stream, column_count);
>   	for (int i = 0; i < column_count; ++i) {
> -		size_t size = mp_sizeof_map(2) +
> -			      mp_sizeof_uint(IPROTO_FIELD_NAME) +
> -			      mp_sizeof_uint(IPROTO_FIELD_TYPE);
>   		const char *name = sqlite3_column_name(stmt, i);
>   		const char *type = sqlite3_column_datatype(stmt, i);
>   		/*
> @@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
>   		assert(name != NULL);
>   		if (type == NULL)
>   			type = "UNKNOWN";
> -		size += mp_sizeof_str(strlen(name));
> -		size += mp_sizeof_str(strlen(type));
> -		char *pos = (char *) obuf_alloc(out, size);
> -		if (pos == NULL) {
> -			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> -			return -1;
> -		}
> -		pos = mp_encode_map(pos, 2);
> -		pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
> -		pos = mp_encode_str(pos, name, strlen(name));
> -		pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
> -		pos = mp_encode_str(pos, type, strlen(type));
> +		mpstream_encode_map(stream, 2);
> +		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
> +		mpstream_encode_str(stream, name);
> +		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
> +		mpstream_encode_str(stream, type);
>   	}
>   	return 0;
>   }
> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>   }
>   
>   int
> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> +sql_response_dump(struct sql_response *response, int *keys,
> +		  struct mpstream *stream)
>   {
>   	sqlite3 *db = sql_get();
>   	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> -	struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>   	int rc = 0, column_count = sqlite3_column_count(stmt);
>   	if (column_count > 0) {
> -		if (sql_get_description(stmt, out, column_count) != 0) {
> +		if (sql_get_description(stmt, stream, column_count) != 0) {
>   err:
>   			rc = -1;
>   			goto finish;
>   		}
>   		*keys = 2;
> -		int size = mp_sizeof_uint(IPROTO_DATA) +
> -			   mp_sizeof_array(port_tuple->size);
> -		char *pos = (char *) obuf_alloc(out, size);
> -		if (pos == NULL) {
> -			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> -			goto err;
> -		}
> -		pos = mp_encode_uint(pos, IPROTO_DATA);
> -		pos = mp_encode_array(pos, port_tuple->size);
> -		/*
> -		 * Just like SELECT, SQL uses output format compatible
> -		 * with Tarantool 1.6
> -		 */
> -		if (port_dump_msgpack_16(&response->port, out) < 0) {
> +		mpstream_encode_uint(stream, IPROTO_DATA);
> +		mpstream_flush(stream);
> +		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>   			/* Failed port dump destroyes the port. */
>   			goto err;
>   		}
> +		mpstream_reset(stream);
>   	} else {
>   		*keys = 1;
> -		assert(port_tuple->size == 0);
>   		struct stailq *autoinc_id_list =
>   			vdbe_autoinc_id_list((struct Vdbe *)stmt);
>   		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
> -		int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
> -			   mp_sizeof_map(map_size);
> -		char *pos = (char *) obuf_alloc(out, size);
> -		if (pos == NULL) {
> -			diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> -			goto err;
> -		}
> -		pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
> -		pos = mp_encode_map(pos, map_size);
> +		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
> +		mpstream_encode_map(stream, map_size);
>   		uint64_t id_count = 0;
> -		int changes = db->nChange;
> -		size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
> -		       mp_sizeof_uint(changes);
>   		if (!stailq_empty(autoinc_id_list)) {
>   			struct autoinc_id_entry *id_entry;
> -			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
> -				size += id_entry->id >= 0 ?
> -					mp_sizeof_uint(id_entry->id) :
> -					mp_sizeof_int(id_entry->id);
> +			stailq_foreach_entry(id_entry, autoinc_id_list, link)
>   				id_count++;
> -			}
> -			size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
> -				mp_sizeof_array(id_count);
> -		}
> -		char *buf = obuf_alloc(out, size);
> -		if (buf == NULL) {
> -			diag_set(OutOfMemory, size, "obuf_alloc", "buf");
> -			goto err;
>   		}
> -		buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
> -		buf = mp_encode_uint(buf, changes);
> +
> +		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
> +		mpstream_encode_uint(stream, db->nChange);
>   		if (!stailq_empty(autoinc_id_list)) {
> -			buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
> -			buf = mp_encode_array(buf, id_count);
> +			mpstream_encode_uint(stream,
> +					     SQL_INFO_AUTOINCREMENT_IDS);
> +			mpstream_encode_array(stream, id_count);
>   			struct autoinc_id_entry *id_entry;
>   			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
> -				buf = id_entry->id >= 0 ?
> -				      mp_encode_uint(buf, id_entry->id) :
> -				      mp_encode_int(buf, id_entry->id);
> +				int64_t value = id_entry->id;
> +				if (id_entry->id >= 0)
> +					mpstream_encode_uint(stream, value);
> +				else
> +					mpstream_encode_int(stream, value);
>   			}
>   		}
>   	}
> diff --git a/src/box/execute.h b/src/box/execute.h
> index 5f3d5eb..65ac81c 100644
> --- a/src/box/execute.h
> +++ b/src/box/execute.h
> @@ -48,10 +48,10 @@ enum sql_info_key {
>   
>   extern const char *sql_info_key_strs[];
>   
> -struct obuf;
>   struct region;
>   struct sql_bind;
>   struct xrow_header;
> +struct mpstream;
>   
>   /** EXECUTE request. */
>   struct sql_request {
> @@ -105,13 +105,14 @@ struct sql_response {
>    * +----------------------------------------------+
>    * @param response EXECUTE response.
>    * @param[out] keys number of keys in dumped map.
> - * @param out Output buffer.
> + * @param stream stream to where result is written.
>    *
>    * @retval  0 Success.
>    * @retval -1 Memory error.
>    */
>   int
> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
> +sql_response_dump(struct sql_response *response, int *keys,
> +		  struct mpstream *stream);
>   
>   /**
>    * Parse the EXECUTE request.
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 7c11d05..b110900 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -61,6 +61,7 @@
>   #include "rmean.h"
>   #include "execute.h"
>   #include "errinj.h"
> +#include "mpstream.h"
>   
>   enum {
>   	IPROTO_SALT_SIZE = 32,
> @@ -1573,12 +1574,20 @@ error:
>   	tx_reply_error(msg);
>   }
>   
> +/** Callback to forward and error from mpstream methods. */
> +static void
> +set_encode_error(void *error_ctx)
> +{
> +	*(bool *)error_ctx = true;
> +}
> +
>   static void
>   tx_process_sql(struct cmsg *m)
>   {
>   	struct iproto_msg *msg = tx_accept_msg(m);
>   	struct obuf *out;
>   	struct sql_response response;
> +	bool is_error = false;
>   
>   	tx_fiber_init(msg->connection->session, msg->header.sync);
>   
> @@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
>   	/* Prepare memory for the iproto header. */
>   	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
>   		goto error;
> -	if (sql_response_dump(&response, &keys, out) != 0) {
> +	struct mpstream stream;
> +	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
> +		      set_encode_error, &is_error);
> +	if (is_error)
> +		goto error;
> +	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
>   		obuf_rollback_to_svp(out, &header_svp);
>   		goto error;
>   	}
> +	mpstream_flush(&stream);
>   	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
>   	iproto_wpos_create(&msg->wpos, out);
>   	return;
> 

  reply	other threads:[~2018-11-28 13:10 UTC|newest]

Thread overview: 19+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-11-27 19:25 [tarantool-patches] [PATCH v3 0/7] Remove box.sql.execute imeevma
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 1/7] box: store sql text and length in sql_request imeevma
2018-11-29 10:45   ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 3/7] iproto: remove iproto functions from execute.c imeevma
2018-11-29 10:51   ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c imeevma
2018-11-28 13:10   ` Vladislav Shpilevoy [this message]
2018-11-29 10:53   ` Vladimir Davydov
2018-11-29 14:04     ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-30 10:19       ` Vladimir Davydov
2018-11-30 10:45         ` Vladislav Shpilevoy
2018-11-30 10:55           ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 7/7] sql: check new box.sql.execute() imeevma
2018-11-28 13:33 ` [tarantool-patches] [PATCH v3 2/7] box: add method dump_lua to port imeevma
2018-11-29 10:48   ` Vladimir Davydov
2018-11-28 13:45 ` [tarantool-patches] [PATCH v3 5/7] sql: create interface vstream imeevma
2018-11-28 18:25   ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-28 13:50 ` [tarantool-patches] [PATCH v3 6/7] lua: create vstream implementation for Lua imeevma
2018-11-28 18:25   ` [tarantool-patches] " Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f11a261e-4364-bd27-5ba0-5f409b6abb41@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=imeevma@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox