From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Subject: Re: [tarantool-patches] [PATCH v3 4/7] iproto: replace obuf by mpstream in execute.c References: <33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com> From: Vladislav Shpilevoy Message-ID: Date: Wed, 28 Nov 2018 16:10:55 +0300 MIME-Version: 1.0 In-Reply-To: <33c6e6cbd7d667980212902f6825d3d7e941ec77.1543344471.git.imeevma@gmail.com> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit To: tarantool-patches@freelists.org, imeevma@tarantool.org, Vladimir Davydov List-ID: Hi! Thanks a lot for a great work on fixes! This patch and all the previous ones are ok to me. Vova, please, review the first 4 commits. On 27/11/2018 22:25, imeevma@tarantool.org wrote: > It is useful to replace obuf by mpstream. It allows us to design > vstream. Interface vstream will be used as universal interface to > dump result of SQL queries. > > Needed for #3505 > --- > src/box/execute.c | 105 ++++++++++++++++-------------------------------------- > src/box/execute.h | 7 ++-- > src/box/iproto.cc | 17 ++++++++- > 3 files changed, 50 insertions(+), 79 deletions(-) > > diff --git a/src/box/execute.c b/src/box/execute.c > index d73681d..9ba9e66 100644 > --- a/src/box/execute.c > +++ b/src/box/execute.c > @@ -35,7 +35,6 @@ > #include "sql/sqliteLimit.h" > #include "errcode.h" > #include "small/region.h" > -#include "small/obuf.h" > #include "diag.h" > #include "sql.h" > #include "xrow.h" > @@ -43,6 +42,7 @@ > #include "port.h" > #include "tuple.h" > #include "sql/vdbe.h" > +#include "mpstream.h" > > const char *sql_type_strs[] = { > NULL, > @@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) > * @retval -1 Client or memory error. > */ > static inline int > -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, > +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, > int column_count) > { > assert(column_count > 0); > - int size = mp_sizeof_uint(IPROTO_METADATA) + > - mp_sizeof_array(column_count); > - char *pos = (char *) obuf_alloc(out, size); > - if (pos == NULL) { > - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); > - return -1; > - } > - pos = mp_encode_uint(pos, IPROTO_METADATA); > - pos = mp_encode_array(pos, column_count); > + mpstream_encode_uint(stream, IPROTO_METADATA); > + mpstream_encode_array(stream, column_count); > for (int i = 0; i < column_count; ++i) { > - size_t size = mp_sizeof_map(2) + > - mp_sizeof_uint(IPROTO_FIELD_NAME) + > - mp_sizeof_uint(IPROTO_FIELD_TYPE); > const char *name = sqlite3_column_name(stmt, i); > const char *type = sqlite3_column_datatype(stmt, i); > /* > @@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, > assert(name != NULL); > if (type == NULL) > type = "UNKNOWN"; > - size += mp_sizeof_str(strlen(name)); > - size += mp_sizeof_str(strlen(type)); > - char *pos = (char *) obuf_alloc(out, size); > - if (pos == NULL) { > - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); > - return -1; > - } > - pos = mp_encode_map(pos, 2); > - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); > - pos = mp_encode_str(pos, name, strlen(name)); > - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); > - pos = mp_encode_str(pos, type, strlen(type)); > + mpstream_encode_map(stream, 2); > + mpstream_encode_uint(stream, IPROTO_FIELD_NAME); > + mpstream_encode_str(stream, name); > + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); > + mpstream_encode_str(stream, type); > } > return 0; > } > @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request, > } > > int > -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) > +sql_response_dump(struct sql_response *response, int *keys, > + struct mpstream *stream) > { > sqlite3 *db = sql_get(); > struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; > - struct port_tuple *port_tuple = (struct port_tuple *) &response->port; > int rc = 0, column_count = sqlite3_column_count(stmt); > if (column_count > 0) { > - if (sql_get_description(stmt, out, column_count) != 0) { > + if (sql_get_description(stmt, stream, column_count) != 0) { > err: > rc = -1; > goto finish; > } > *keys = 2; > - int size = mp_sizeof_uint(IPROTO_DATA) + > - mp_sizeof_array(port_tuple->size); > - char *pos = (char *) obuf_alloc(out, size); > - if (pos == NULL) { > - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); > - goto err; > - } > - pos = mp_encode_uint(pos, IPROTO_DATA); > - pos = mp_encode_array(pos, port_tuple->size); > - /* > - * Just like SELECT, SQL uses output format compatible > - * with Tarantool 1.6 > - */ > - if (port_dump_msgpack_16(&response->port, out) < 0) { > + mpstream_encode_uint(stream, IPROTO_DATA); > + mpstream_flush(stream); > + if (port_dump_msgpack(&response->port, stream->ctx) < 0) { > /* Failed port dump destroyes the port. */ > goto err; > } > + mpstream_reset(stream); > } else { > *keys = 1; > - assert(port_tuple->size == 0); > struct stailq *autoinc_id_list = > vdbe_autoinc_id_list((struct Vdbe *)stmt); > uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; > - int size = mp_sizeof_uint(IPROTO_SQL_INFO) + > - mp_sizeof_map(map_size); > - char *pos = (char *) obuf_alloc(out, size); > - if (pos == NULL) { > - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); > - goto err; > - } > - pos = mp_encode_uint(pos, IPROTO_SQL_INFO); > - pos = mp_encode_map(pos, map_size); > + mpstream_encode_uint(stream, IPROTO_SQL_INFO); > + mpstream_encode_map(stream, map_size); > uint64_t id_count = 0; > - int changes = db->nChange; > - size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + > - mp_sizeof_uint(changes); > if (!stailq_empty(autoinc_id_list)) { > struct autoinc_id_entry *id_entry; > - stailq_foreach_entry(id_entry, autoinc_id_list, link) { > - size += id_entry->id >= 0 ? > - mp_sizeof_uint(id_entry->id) : > - mp_sizeof_int(id_entry->id); > + stailq_foreach_entry(id_entry, autoinc_id_list, link) > id_count++; > - } > - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + > - mp_sizeof_array(id_count); > - } > - char *buf = obuf_alloc(out, size); > - if (buf == NULL) { > - diag_set(OutOfMemory, size, "obuf_alloc", "buf"); > - goto err; > } > - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); > - buf = mp_encode_uint(buf, changes); > + > + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); > + mpstream_encode_uint(stream, db->nChange); > if (!stailq_empty(autoinc_id_list)) { > - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); > - buf = mp_encode_array(buf, id_count); > + mpstream_encode_uint(stream, > + SQL_INFO_AUTOINCREMENT_IDS); > + mpstream_encode_array(stream, id_count); > struct autoinc_id_entry *id_entry; > stailq_foreach_entry(id_entry, autoinc_id_list, link) { > - buf = id_entry->id >= 0 ? > - mp_encode_uint(buf, id_entry->id) : > - mp_encode_int(buf, id_entry->id); > + int64_t value = id_entry->id; > + if (id_entry->id >= 0) > + mpstream_encode_uint(stream, value); > + else > + mpstream_encode_int(stream, value); > } > } > } > diff --git a/src/box/execute.h b/src/box/execute.h > index 5f3d5eb..65ac81c 100644 > --- a/src/box/execute.h > +++ b/src/box/execute.h > @@ -48,10 +48,10 @@ enum sql_info_key { > > extern const char *sql_info_key_strs[]; > > -struct obuf; > struct region; > struct sql_bind; > struct xrow_header; > +struct mpstream; > > /** EXECUTE request. */ > struct sql_request { > @@ -105,13 +105,14 @@ struct sql_response { > * +----------------------------------------------+ > * @param response EXECUTE response. > * @param[out] keys number of keys in dumped map. > - * @param out Output buffer. > + * @param stream stream to where result is written. > * > * @retval 0 Success. > * @retval -1 Memory error. > */ > int > -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); > +sql_response_dump(struct sql_response *response, int *keys, > + struct mpstream *stream); > > /** > * Parse the EXECUTE request. > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 7c11d05..b110900 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -61,6 +61,7 @@ > #include "rmean.h" > #include "execute.h" > #include "errinj.h" > +#include "mpstream.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -1573,12 +1574,20 @@ error: > tx_reply_error(msg); > } > > +/** Callback to forward and error from mpstream methods. */ > +static void > +set_encode_error(void *error_ctx) > +{ > + *(bool *)error_ctx = true; > +} > + > static void > tx_process_sql(struct cmsg *m) > { > struct iproto_msg *msg = tx_accept_msg(m); > struct obuf *out; > struct sql_response response; > + bool is_error = false; > > tx_fiber_init(msg->connection->session, msg->header.sync); > > @@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m) > /* Prepare memory for the iproto header. */ > if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) > goto error; > - if (sql_response_dump(&response, &keys, out) != 0) { > + struct mpstream stream; > + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, > + set_encode_error, &is_error); > + if (is_error) > + goto error; > + if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) { > obuf_rollback_to_svp(out, &header_svp); > goto error; > } > + mpstream_flush(&stream); > iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); > iproto_wpos_create(&msg->wpos, out); > return; >