From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org, imeevma@tarantool.org,
Vladimir Davydov <vdavydov.dev@gmail.com>
Subject: Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c
Date: Wed, 28 Nov 2018 16:10:55 +0300 [thread overview]
Message-ID: <f11a261e-4364-bd27-5ba0-5f409b6abb41@tarantool.org> (raw)
In-Reply-To: <33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com>
Hi! Thanks a lot for a great work on fixes!
This patch and all the previous ones are ok to me.
Vova, please, review the first 4 commits.
On 27/11/2018 22:25, imeevma@tarantool.org wrote:
> It is useful to replace obuf by mpstream. It allows us to design
> vstream. Interface vstream will be used as universal interface to
> dump result of SQL queries.
>
> Needed for #3505
> ---
> src/box/execute.c | 105 ++++++++++++++++--------------------------------------
> src/box/execute.h | 7 ++--
> src/box/iproto.cc | 17 ++++++++-
> 3 files changed, 50 insertions(+), 79 deletions(-)
>
> diff --git a/src/box/execute.c b/src/box/execute.c
> index d73681d..9ba9e66 100644
> --- a/src/box/execute.c
> +++ b/src/box/execute.c
> @@ -35,7 +35,6 @@
> #include "sql/sqliteLimit.h"
> #include "errcode.h"
> #include "small/region.h"
> -#include "small/obuf.h"
> #include "diag.h"
> #include "sql.h"
> #include "xrow.h"
> @@ -43,6 +42,7 @@
> #include "port.h"
> #include "tuple.h"
> #include "sql/vdbe.h"
> +#include "mpstream.h"
>
> const char *sql_type_strs[] = {
> NULL,
> @@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
> * @retval -1 Client or memory error.
> */
> static inline int
> -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
> +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
> int column_count)
> {
> assert(column_count > 0);
> - int size = mp_sizeof_uint(IPROTO_METADATA) +
> - mp_sizeof_array(column_count);
> - char *pos = (char *) obuf_alloc(out, size);
> - if (pos == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> - return -1;
> - }
> - pos = mp_encode_uint(pos, IPROTO_METADATA);
> - pos = mp_encode_array(pos, column_count);
> + mpstream_encode_uint(stream, IPROTO_METADATA);
> + mpstream_encode_array(stream, column_count);
> for (int i = 0; i < column_count; ++i) {
> - size_t size = mp_sizeof_map(2) +
> - mp_sizeof_uint(IPROTO_FIELD_NAME) +
> - mp_sizeof_uint(IPROTO_FIELD_TYPE);
> const char *name = sqlite3_column_name(stmt, i);
> const char *type = sqlite3_column_datatype(stmt, i);
> /*
> @@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
> assert(name != NULL);
> if (type == NULL)
> type = "UNKNOWN";
> - size += mp_sizeof_str(strlen(name));
> - size += mp_sizeof_str(strlen(type));
> - char *pos = (char *) obuf_alloc(out, size);
> - if (pos == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> - return -1;
> - }
> - pos = mp_encode_map(pos, 2);
> - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
> - pos = mp_encode_str(pos, name, strlen(name));
> - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
> - pos = mp_encode_str(pos, type, strlen(type));
> + mpstream_encode_map(stream, 2);
> + mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
> + mpstream_encode_str(stream, name);
> + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
> + mpstream_encode_str(stream, type);
> }
> return 0;
> }
> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> }
>
> int
> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> +sql_response_dump(struct sql_response *response, int *keys,
> + struct mpstream *stream)
> {
> sqlite3 *db = sql_get();
> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> int rc = 0, column_count = sqlite3_column_count(stmt);
> if (column_count > 0) {
> - if (sql_get_description(stmt, out, column_count) != 0) {
> + if (sql_get_description(stmt, stream, column_count) != 0) {
> err:
> rc = -1;
> goto finish;
> }
> *keys = 2;
> - int size = mp_sizeof_uint(IPROTO_DATA) +
> - mp_sizeof_array(port_tuple->size);
> - char *pos = (char *) obuf_alloc(out, size);
> - if (pos == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> - goto err;
> - }
> - pos = mp_encode_uint(pos, IPROTO_DATA);
> - pos = mp_encode_array(pos, port_tuple->size);
> - /*
> - * Just like SELECT, SQL uses output format compatible
> - * with Tarantool 1.6
> - */
> - if (port_dump_msgpack_16(&response->port, out) < 0) {
> + mpstream_encode_uint(stream, IPROTO_DATA);
> + mpstream_flush(stream);
> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
> /* Failed port dump destroyes the port. */
> goto err;
> }
> + mpstream_reset(stream);
> } else {
> *keys = 1;
> - assert(port_tuple->size == 0);
> struct stailq *autoinc_id_list =
> vdbe_autoinc_id_list((struct Vdbe *)stmt);
> uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
> - int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
> - mp_sizeof_map(map_size);
> - char *pos = (char *) obuf_alloc(out, size);
> - if (pos == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> - goto err;
> - }
> - pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
> - pos = mp_encode_map(pos, map_size);
> + mpstream_encode_uint(stream, IPROTO_SQL_INFO);
> + mpstream_encode_map(stream, map_size);
> uint64_t id_count = 0;
> - int changes = db->nChange;
> - size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
> - mp_sizeof_uint(changes);
> if (!stailq_empty(autoinc_id_list)) {
> struct autoinc_id_entry *id_entry;
> - stailq_foreach_entry(id_entry, autoinc_id_list, link) {
> - size += id_entry->id >= 0 ?
> - mp_sizeof_uint(id_entry->id) :
> - mp_sizeof_int(id_entry->id);
> + stailq_foreach_entry(id_entry, autoinc_id_list, link)
> id_count++;
> - }
> - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
> - mp_sizeof_array(id_count);
> - }
> - char *buf = obuf_alloc(out, size);
> - if (buf == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "buf");
> - goto err;
> }
> - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
> - buf = mp_encode_uint(buf, changes);
> +
> + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
> + mpstream_encode_uint(stream, db->nChange);
> if (!stailq_empty(autoinc_id_list)) {
> - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
> - buf = mp_encode_array(buf, id_count);
> + mpstream_encode_uint(stream,
> + SQL_INFO_AUTOINCREMENT_IDS);
> + mpstream_encode_array(stream, id_count);
> struct autoinc_id_entry *id_entry;
> stailq_foreach_entry(id_entry, autoinc_id_list, link) {
> - buf = id_entry->id >= 0 ?
> - mp_encode_uint(buf, id_entry->id) :
> - mp_encode_int(buf, id_entry->id);
> + int64_t value = id_entry->id;
> + if (id_entry->id >= 0)
> + mpstream_encode_uint(stream, value);
> + else
> + mpstream_encode_int(stream, value);
> }
> }
> }
> diff --git a/src/box/execute.h b/src/box/execute.h
> index 5f3d5eb..65ac81c 100644
> --- a/src/box/execute.h
> +++ b/src/box/execute.h
> @@ -48,10 +48,10 @@ enum sql_info_key {
>
> extern const char *sql_info_key_strs[];
>
> -struct obuf;
> struct region;
> struct sql_bind;
> struct xrow_header;
> +struct mpstream;
>
> /** EXECUTE request. */
> struct sql_request {
> @@ -105,13 +105,14 @@ struct sql_response {
> * +----------------------------------------------+
> * @param response EXECUTE response.
> * @param[out] keys number of keys in dumped map.
> - * @param out Output buffer.
> + * @param stream stream to where result is written.
> *
> * @retval 0 Success.
> * @retval -1 Memory error.
> */
> int
> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
> +sql_response_dump(struct sql_response *response, int *keys,
> + struct mpstream *stream);
>
> /**
> * Parse the EXECUTE request.
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 7c11d05..b110900 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -61,6 +61,7 @@
> #include "rmean.h"
> #include "execute.h"
> #include "errinj.h"
> +#include "mpstream.h"
>
> enum {
> IPROTO_SALT_SIZE = 32,
> @@ -1573,12 +1574,20 @@ error:
> tx_reply_error(msg);
> }
>
> +/** Callback to forward and error from mpstream methods. */
> +static void
> +set_encode_error(void *error_ctx)
> +{
> + *(bool *)error_ctx = true;
> +}
> +
> static void
> tx_process_sql(struct cmsg *m)
> {
> struct iproto_msg *msg = tx_accept_msg(m);
> struct obuf *out;
> struct sql_response response;
> + bool is_error = false;
>
> tx_fiber_init(msg->connection->session, msg->header.sync);
>
> @@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
> /* Prepare memory for the iproto header. */
> if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
> goto error;
> - if (sql_response_dump(&response, &keys, out) != 0) {
> + struct mpstream stream;
> + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
> + set_encode_error, &is_error);
> + if (is_error)
> + goto error;
> + if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
> obuf_rollback_to_svp(out, &header_svp);
> goto error;
> }
> + mpstream_flush(&stream);
> iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
> iproto_wpos_create(&msg->wpos, out);
> return;
>
next prev parent reply other threads:[~2018-11-28 13:10 UTC|newest]
Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-11-27 19:25 [tarantool-patches] [PATCH v3 0/7] Remove box.sql.execute imeevma
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 1/7] box: store sql text and length in sql_request imeevma
2018-11-29 10:45 ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 3/7] iproto: remove iproto functions from execute.c imeevma
2018-11-29 10:51 ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c imeevma
2018-11-28 13:10 ` Vladislav Shpilevoy [this message]
2018-11-29 10:53 ` Vladimir Davydov
2018-11-29 14:04 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-30 10:19 ` Vladimir Davydov
2018-11-30 10:45 ` Vladislav Shpilevoy
2018-11-30 10:55 ` Vladimir Davydov
2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 7/7] sql: check new box.sql.execute() imeevma
2018-11-28 13:33 ` [tarantool-patches] [PATCH v3 2/7] box: add method dump_lua to port imeevma
2018-11-29 10:48 ` Vladimir Davydov
2018-11-28 13:45 ` [tarantool-patches] [PATCH v3 5/7] sql: create interface vstream imeevma
2018-11-28 18:25 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-28 13:50 ` [tarantool-patches] [PATCH v3 6/7] lua: create vstream implementation for Lua imeevma
2018-11-28 18:25 ` [tarantool-patches] " Vladislav Shpilevoy
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=f11a261e-4364-bd27-5ba0-5f409b6abb41@tarantool.org \
--to=v.shpilevoy@tarantool.org \
--cc=imeevma@tarantool.org \
--cc=tarantool-patches@freelists.org \
--cc=vdavydov.dev@gmail.com \
--subject='Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox