From: imeevma@tarantool.org To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org Subject: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c Date: Tue, 27 Nov 2018 22:25:43 +0300 [thread overview] Message-ID: <33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com> (raw) In-Reply-To: <cover.1543344471.git.imeevma@gmail.com> It is useful to replace obuf by mpstream. It allows us to design vstream. Interface vstream will be used as universal interface to dump result of SQL queries. Needed for #3505 --- src/box/execute.c | 105 ++++++++++++++++-------------------------------------- src/box/execute.h | 7 ++-- src/box/iproto.cc | 17 ++++++++- 3 files changed, 50 insertions(+), 79 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index d73681d..9ba9e66 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -35,7 +35,6 @@ #include "sql/sqliteLimit.h" #include "errcode.h" #include "small/region.h" -#include "small/obuf.h" #include "diag.h" #include "sql.h" #include "xrow.h" @@ -43,6 +42,7 @@ #include "port.h" #include "tuple.h" #include "sql/vdbe.h" +#include "mpstream.h" const char *sql_type_strs[] = { NULL, @@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) * @retval -1 Client or memory error. */ static inline int -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, int column_count) { assert(column_count > 0); - int size = mp_sizeof_uint(IPROTO_METADATA) + - mp_sizeof_array(column_count); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_uint(pos, IPROTO_METADATA); - pos = mp_encode_array(pos, column_count); + mpstream_encode_uint(stream, IPROTO_METADATA); + mpstream_encode_array(stream, column_count); for (int i = 0; i < column_count; ++i) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_FIELD_NAME) + - mp_sizeof_uint(IPROTO_FIELD_TYPE); const char *name = sqlite3_column_name(stmt, i); const char *type = sqlite3_column_datatype(stmt, i); /* @@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, assert(name != NULL); if (type == NULL) type = "UNKNOWN"; - size += mp_sizeof_str(strlen(name)); - size += mp_sizeof_str(strlen(type)); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_map(pos, 2); - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); - pos = mp_encode_str(pos, name, strlen(name)); - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); - pos = mp_encode_str(pos, type, strlen(type)); + mpstream_encode_map(stream, 2); + mpstream_encode_uint(stream, IPROTO_FIELD_NAME); + mpstream_encode_str(stream, name); + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); + mpstream_encode_str(stream, type); } return 0; } @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request, } int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream) { sqlite3 *db = sql_get(); struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; - struct port_tuple *port_tuple = (struct port_tuple *) &response->port; int rc = 0, column_count = sqlite3_column_count(stmt); if (column_count > 0) { - if (sql_get_description(stmt, out, column_count) != 0) { + if (sql_get_description(stmt, stream, column_count) != 0) { err: rc = -1; goto finish; } *keys = 2; - int size = mp_sizeof_uint(IPROTO_DATA) + - mp_sizeof_array(port_tuple->size); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - goto err; - } - pos = mp_encode_uint(pos, IPROTO_DATA); - pos = mp_encode_array(pos, port_tuple->size); - /* - * Just like SELECT, SQL uses output format compatible - * with Tarantool 1.6 - */ - if (port_dump_msgpack_16(&response->port, out) < 0) { + mpstream_encode_uint(stream, IPROTO_DATA); + mpstream_flush(stream); + if (port_dump_msgpack(&response->port, stream->ctx) < 0) { /* Failed port dump destroyes the port. */ goto err; } + mpstream_reset(stream); } else { *keys = 1; - assert(port_tuple->size == 0); struct stailq *autoinc_id_list = vdbe_autoinc_id_list((struct Vdbe *)stmt); uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; - int size = mp_sizeof_uint(IPROTO_SQL_INFO) + - mp_sizeof_map(map_size); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - goto err; - } - pos = mp_encode_uint(pos, IPROTO_SQL_INFO); - pos = mp_encode_map(pos, map_size); + mpstream_encode_uint(stream, IPROTO_SQL_INFO); + mpstream_encode_map(stream, map_size); uint64_t id_count = 0; - int changes = db->nChange; - size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + - mp_sizeof_uint(changes); if (!stailq_empty(autoinc_id_list)) { struct autoinc_id_entry *id_entry; - stailq_foreach_entry(id_entry, autoinc_id_list, link) { - size += id_entry->id >= 0 ? - mp_sizeof_uint(id_entry->id) : - mp_sizeof_int(id_entry->id); + stailq_foreach_entry(id_entry, autoinc_id_list, link) id_count++; - } - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + - mp_sizeof_array(id_count); - } - char *buf = obuf_alloc(out, size); - if (buf == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "buf"); - goto err; } - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); - buf = mp_encode_uint(buf, changes); + + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); + mpstream_encode_uint(stream, db->nChange); if (!stailq_empty(autoinc_id_list)) { - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); - buf = mp_encode_array(buf, id_count); + mpstream_encode_uint(stream, + SQL_INFO_AUTOINCREMENT_IDS); + mpstream_encode_array(stream, id_count); struct autoinc_id_entry *id_entry; stailq_foreach_entry(id_entry, autoinc_id_list, link) { - buf = id_entry->id >= 0 ? - mp_encode_uint(buf, id_entry->id) : - mp_encode_int(buf, id_entry->id); + int64_t value = id_entry->id; + if (id_entry->id >= 0) + mpstream_encode_uint(stream, value); + else + mpstream_encode_int(stream, value); } } } diff --git a/src/box/execute.h b/src/box/execute.h index 5f3d5eb..65ac81c 100644 --- a/src/box/execute.h +++ b/src/box/execute.h @@ -48,10 +48,10 @@ enum sql_info_key { extern const char *sql_info_key_strs[]; -struct obuf; struct region; struct sql_bind; struct xrow_header; +struct mpstream; /** EXECUTE request. */ struct sql_request { @@ -105,13 +105,14 @@ struct sql_response { * +----------------------------------------------+ * @param response EXECUTE response. * @param[out] keys number of keys in dumped map. - * @param out Output buffer. + * @param stream stream to where result is written. * * @retval 0 Success. * @retval -1 Memory error. */ int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream); /** * Parse the EXECUTE request. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 7c11d05..b110900 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -61,6 +61,7 @@ #include "rmean.h" #include "execute.h" #include "errinj.h" +#include "mpstream.h" enum { IPROTO_SALT_SIZE = 32, @@ -1573,12 +1574,20 @@ error: tx_reply_error(msg); } +/** Callback to forward and error from mpstream methods. */ +static void +set_encode_error(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void tx_process_sql(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct obuf *out; struct sql_response response; + bool is_error = false; tx_fiber_init(msg->connection->session, msg->header.sync); @@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m) /* Prepare memory for the iproto header. */ if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) goto error; - if (sql_response_dump(&response, &keys, out) != 0) { + struct mpstream stream; + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, + set_encode_error, &is_error); + if (is_error) + goto error; + if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) { obuf_rollback_to_svp(out, &header_svp); goto error; } + mpstream_flush(&stream); iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); iproto_wpos_create(&msg->wpos, out); return; -- 2.7.4
next prev parent reply other threads:[~2018-11-27 19:25 UTC|newest] Thread overview: 19+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-11-27 19:25 [tarantool-patches] [PATCH v3 0/7] Remove box.sql.execute imeevma 2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 1/7] box: store sql text and length in sql_request imeevma 2018-11-29 10:45 ` Vladimir Davydov 2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 3/7] iproto: remove iproto functions from execute.c imeevma 2018-11-29 10:51 ` Vladimir Davydov 2018-11-27 19:25 ` imeevma [this message] 2018-11-28 13:10 ` [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c Vladislav Shpilevoy 2018-11-29 10:53 ` Vladimir Davydov 2018-11-29 14:04 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-30 10:19 ` Vladimir Davydov 2018-11-30 10:45 ` Vladislav Shpilevoy 2018-11-30 10:55 ` Vladimir Davydov 2018-11-27 19:25 ` [tarantool-patches] [PATCH v3 7/7] sql: check new box.sql.execute() imeevma 2018-11-28 13:33 ` [tarantool-patches] [PATCH v3 2/7] box: add method dump_lua to port imeevma 2018-11-29 10:48 ` Vladimir Davydov 2018-11-28 13:45 ` [tarantool-patches] [PATCH v3 5/7] sql: create interface vstream imeevma 2018-11-28 18:25 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-28 13:50 ` [tarantool-patches] [PATCH v3 6/7] lua: create vstream implementation for Lua imeevma 2018-11-28 18:25 ` [tarantool-patches] " Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com \ --to=imeevma@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox