From: imeevma@tarantool.org To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org Subject: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c Date: Sat, 17 Nov 2018 17:03:56 +0300 [thread overview] Message-ID: <502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com> (raw) In-Reply-To: <cover.1542460773.git.imeevma@gmail.com> It is useful to replace obuf by mpstream. It allows us to design vstream. Interface vstream will be used as universal interface to dump result of SQL queries. Needed for #3505 --- src/box/execute.c | 92 +++++++++++++++++++++++-------------------------------- src/box/execute.h | 6 ++-- src/box/iproto.cc | 16 +++++++++- 3 files changed, 58 insertions(+), 56 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index 285ae2e..5c2ec19 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -35,7 +35,6 @@ #include "sql/sqliteLimit.h" #include "errcode.h" #include "small/region.h" -#include "small/obuf.h" #include "diag.h" #include "sql.h" #include "xrow.h" @@ -454,23 +453,21 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) * @retval -1 Client or memory error. */ static inline int -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, int column_count) { assert(column_count > 0); - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); if (pos == NULL) { - diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "obuf_alloc", + diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "mpstream_reserve", "pos"); return -1; } pos = mp_store_u8(pos, IPROTO_METADATA); pos = mp_store_u8(pos, 0xdd); pos = mp_store_u32(pos, column_count); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); for (int i = 0; i < column_count; ++i) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_FIELD_NAME) + - mp_sizeof_uint(IPROTO_FIELD_TYPE); const char *name = sqlite3_column_name(stmt, i); const char *type = sqlite3_column_datatype(stmt, i); /* @@ -479,18 +476,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, * column_name simply returns them. */ assert(name != NULL); - size += mp_sizeof_str(strlen(name)); - size += mp_sizeof_str(strlen(type)); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_map(pos, 2); - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); - pos = mp_encode_str(pos, name, strlen(name)); - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); - pos = mp_encode_str(pos, type, strlen(type)); + mpstream_encode_map(stream, 2); + mpstream_encode_uint(stream, IPROTO_FIELD_NAME); + mpstream_encode_str(stream, name); + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); + mpstream_encode_str(stream, type); } return 0; } @@ -549,81 +539,77 @@ sql_prepare_and_execute(const struct sql_request *request, } int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream) { sqlite3 *db = sql_get(); struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; struct port_tuple *port_tuple = (struct port_tuple *) &response->port; int rc = 0, column_count = sqlite3_column_count(stmt); if (column_count > 0) { - if (sql_get_description(stmt, out, column_count) != 0) { + if (sql_get_description(stmt, stream, column_count) != 0) { err: rc = -1; goto finish; } *keys = 2; - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); if (pos == NULL) { diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, - "obuf_alloc", "pos"); + "mpstream_reserve", "pos"); goto err; } pos = mp_store_u8(pos, IPROTO_DATA); pos = mp_store_u8(pos, 0xdd); pos = mp_store_u32(pos, port_tuple->size); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); + + mpstream_flush(stream); /* * Just like SELECT, SQL uses output format compatible * with Tarantool 1.6 */ - if (port_dump_msgpack_16(&response->port, out) < 0) { + if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) { /* Failed port dump destroyes the port. */ goto err; } + mpstream_reset(stream); } else { *keys = 1; assert(port_tuple->size == 0); struct stailq *autoinc_id_list = vdbe_autoinc_id_list((struct Vdbe *)stmt); uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); - if (pos == NULL) { - diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, - "obuf_alloc", "pos"); - goto err; - } - pos = mp_store_u8(pos, IPROTO_SQL_INFO); - pos = mp_store_u8(pos, 0xdf); - pos = mp_store_u32(pos, map_size); uint64_t id_count = 0; - int changes = db->nChange; - int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + - mp_sizeof_uint(changes); if (!stailq_empty(autoinc_id_list)) { struct autoinc_id_entry *id_entry; - stailq_foreach_entry(id_entry, autoinc_id_list, link) { - size += id_entry->id >= 0 ? - mp_sizeof_uint(id_entry->id) : - mp_sizeof_int(id_entry->id); + stailq_foreach_entry(id_entry, autoinc_id_list, link) id_count++; - } - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + - mp_sizeof_array(id_count); } - char *buf = obuf_alloc(out, size); - if (buf == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "buf"); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); + if (pos == NULL) { + diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, + "mpstream_reserve", "pos"); goto err; } - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); - buf = mp_encode_uint(buf, changes); + pos = mp_store_u8(pos, IPROTO_SQL_INFO); + pos = mp_store_u8(pos, 0xdf); + pos = mp_store_u32(pos, map_size); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); + + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); + mpstream_encode_uint(stream, db->nChange); if (!stailq_empty(autoinc_id_list)) { - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); - buf = mp_encode_array(buf, id_count); + mpstream_encode_uint(stream, + SQL_INFO_AUTOINCREMENT_IDS); + mpstream_encode_array(stream, id_count); struct autoinc_id_entry *id_entry; stailq_foreach_entry(id_entry, autoinc_id_list, link) { - buf = id_entry->id >= 0 ? - mp_encode_uint(buf, id_entry->id) : - mp_encode_int(buf, id_entry->id); + int64_t value = id_entry->id; + if (id_entry->id >= 0) + mpstream_encode_uint(stream, value); + else + mpstream_encode_int(stream, value); } } } diff --git a/src/box/execute.h b/src/box/execute.h index 2a242e8..7eb2121 100644 --- a/src/box/execute.h +++ b/src/box/execute.h @@ -34,6 +34,7 @@ #include <stdint.h> #include <stdbool.h> #include "port.h" +#include "mpstream.h" #if defined(__cplusplus) extern "C" { @@ -105,13 +106,14 @@ struct sql_response { * +----------------------------------------------+ * @param response EXECUTE response. * @param[out] keys number of keys in dumped map. - * @param out Output buffer. + * @param stream stream to where result is written. * * @retval 0 Success. * @retval -1 Memory error. */ int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream); /** * Parse MessagePack array of SQL parameters and store a result diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 5fb2aff..83b268b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1634,12 +1634,20 @@ error: tx_reply_error(msg); } +/** Callback to forward and error from mpstream methods. */ +static void +set_encode_error(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void tx_process_sql(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct obuf *out; struct sql_response response; + bool is_error = false; tx_fiber_init(msg->connection->session, msg->header.sync); @@ -1659,10 +1667,16 @@ tx_process_sql(struct cmsg *m) /* Prepare memory for the iproto header. */ if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) goto error; - if (sql_response_dump(&response, &keys, out) != 0) { + struct mpstream stream; + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, + set_encode_error, &is_error); + if (is_error) + goto error; + if (sql_response_dump(&response, &keys, &stream) != 0) { obuf_rollback_to_svp(out, &header_svp); goto error; } + mpstream_flush(&stream); iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); iproto_wpos_create(&msg->wpos, out); return; -- 2.7.4
next prev parent reply other threads:[~2018-11-17 14:03 UTC|newest] Thread overview: 18+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-11-17 14:03 [tarantool-patches] [PATCH v1 00/10] sql: remove box.sql.execute imeevma 2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 01/10] box: store sql text and length in sql_request imeevma 2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 02/10] iproto: remove iproto functions from execute.c imeevma 2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:03 ` imeevma [this message] 2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 04/10] sql: create interface vstream imeevma 2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 05/10] sql: EXPLAIN through net.box leads to SEGFAULT imeevma 2018-11-19 13:47 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 06/10] sql: SELECT from system spaces returns unpacked msgpack imeevma 2018-11-19 13:48 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 07/10] sql: too many autogenerated ids leads to SEGFAULT imeevma 2018-11-19 13:47 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 08/10] box: add method dump_lua to port imeevma 2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 09/10] lua: create vstream implementation for Lua imeevma 2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy 2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 10/10] sql: check new box.sql.execute() imeevma 2018-11-19 12:54 ` [tarantool-patches] Re: [PATCH v1 00/10] sql: remove box.sql.execute Vladislav Shpilevoy
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com \ --to=imeevma@tarantool.org \ --cc=tarantool-patches@freelists.org \ --cc=v.shpilevoy@tarantool.org \ --subject='Re: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox