From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id B88D02FAF9 for ; Sat, 17 Nov 2018 09:03:58 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id EuWGWtWvwg5w for ; Sat, 17 Nov 2018 09:03:58 -0500 (EST) Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 54F2B2FB28 for ; Sat, 17 Nov 2018 09:03:58 -0500 (EST) From: imeevma@tarantool.org Subject: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c Date: Sat, 17 Nov 2018 17:03:56 +0300 Message-Id: <502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com> In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org It is useful to replace obuf by mpstream. It allows us to design vstream. Interface vstream will be used as universal interface to dump result of SQL queries. Needed for #3505 --- src/box/execute.c | 92 +++++++++++++++++++++++-------------------------------- src/box/execute.h | 6 ++-- src/box/iproto.cc | 16 +++++++++- 3 files changed, 58 insertions(+), 56 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index 285ae2e..5c2ec19 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -35,7 +35,6 @@ #include "sql/sqliteLimit.h" #include "errcode.h" #include "small/region.h" -#include "small/obuf.h" #include "diag.h" #include "sql.h" #include "xrow.h" @@ -454,23 +453,21 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) * @retval -1 Client or memory error. */ static inline int -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, int column_count) { assert(column_count > 0); - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); if (pos == NULL) { - diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "obuf_alloc", + diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "mpstream_reserve", "pos"); return -1; } pos = mp_store_u8(pos, IPROTO_METADATA); pos = mp_store_u8(pos, 0xdd); pos = mp_store_u32(pos, column_count); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); for (int i = 0; i < column_count; ++i) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_FIELD_NAME) + - mp_sizeof_uint(IPROTO_FIELD_TYPE); const char *name = sqlite3_column_name(stmt, i); const char *type = sqlite3_column_datatype(stmt, i); /* @@ -479,18 +476,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, * column_name simply returns them. */ assert(name != NULL); - size += mp_sizeof_str(strlen(name)); - size += mp_sizeof_str(strlen(type)); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_map(pos, 2); - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); - pos = mp_encode_str(pos, name, strlen(name)); - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); - pos = mp_encode_str(pos, type, strlen(type)); + mpstream_encode_map(stream, 2); + mpstream_encode_uint(stream, IPROTO_FIELD_NAME); + mpstream_encode_str(stream, name); + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); + mpstream_encode_str(stream, type); } return 0; } @@ -549,81 +539,77 @@ sql_prepare_and_execute(const struct sql_request *request, } int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream) { sqlite3 *db = sql_get(); struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; struct port_tuple *port_tuple = (struct port_tuple *) &response->port; int rc = 0, column_count = sqlite3_column_count(stmt); if (column_count > 0) { - if (sql_get_description(stmt, out, column_count) != 0) { + if (sql_get_description(stmt, stream, column_count) != 0) { err: rc = -1; goto finish; } *keys = 2; - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); if (pos == NULL) { diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, - "obuf_alloc", "pos"); + "mpstream_reserve", "pos"); goto err; } pos = mp_store_u8(pos, IPROTO_DATA); pos = mp_store_u8(pos, 0xdd); pos = mp_store_u32(pos, port_tuple->size); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); + + mpstream_flush(stream); /* * Just like SELECT, SQL uses output format compatible * with Tarantool 1.6 */ - if (port_dump_msgpack_16(&response->port, out) < 0) { + if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) { /* Failed port dump destroyes the port. */ goto err; } + mpstream_reset(stream); } else { *keys = 1; assert(port_tuple->size == 0); struct stailq *autoinc_id_list = vdbe_autoinc_id_list((struct Vdbe *)stmt); uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; - char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN); - if (pos == NULL) { - diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, - "obuf_alloc", "pos"); - goto err; - } - pos = mp_store_u8(pos, IPROTO_SQL_INFO); - pos = mp_store_u8(pos, 0xdf); - pos = mp_store_u32(pos, map_size); uint64_t id_count = 0; - int changes = db->nChange; - int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + - mp_sizeof_uint(changes); if (!stailq_empty(autoinc_id_list)) { struct autoinc_id_entry *id_entry; - stailq_foreach_entry(id_entry, autoinc_id_list, link) { - size += id_entry->id >= 0 ? - mp_sizeof_uint(id_entry->id) : - mp_sizeof_int(id_entry->id); + stailq_foreach_entry(id_entry, autoinc_id_list, link) id_count++; - } - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + - mp_sizeof_array(id_count); } - char *buf = obuf_alloc(out, size); - if (buf == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "buf"); + char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN); + if (pos == NULL) { + diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, + "mpstream_reserve", "pos"); goto err; } - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); - buf = mp_encode_uint(buf, changes); + pos = mp_store_u8(pos, IPROTO_SQL_INFO); + pos = mp_store_u8(pos, 0xdf); + pos = mp_store_u32(pos, map_size); + mpstream_advance(stream, IPROTO_KEY_HEADER_LEN); + + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); + mpstream_encode_uint(stream, db->nChange); if (!stailq_empty(autoinc_id_list)) { - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); - buf = mp_encode_array(buf, id_count); + mpstream_encode_uint(stream, + SQL_INFO_AUTOINCREMENT_IDS); + mpstream_encode_array(stream, id_count); struct autoinc_id_entry *id_entry; stailq_foreach_entry(id_entry, autoinc_id_list, link) { - buf = id_entry->id >= 0 ? - mp_encode_uint(buf, id_entry->id) : - mp_encode_int(buf, id_entry->id); + int64_t value = id_entry->id; + if (id_entry->id >= 0) + mpstream_encode_uint(stream, value); + else + mpstream_encode_int(stream, value); } } } diff --git a/src/box/execute.h b/src/box/execute.h index 2a242e8..7eb2121 100644 --- a/src/box/execute.h +++ b/src/box/execute.h @@ -34,6 +34,7 @@ #include #include #include "port.h" +#include "mpstream.h" #if defined(__cplusplus) extern "C" { @@ -105,13 +106,14 @@ struct sql_response { * +----------------------------------------------+ * @param response EXECUTE response. * @param[out] keys number of keys in dumped map. - * @param out Output buffer. + * @param stream stream to where result is written. * * @retval 0 Success. * @retval -1 Memory error. */ int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream); /** * Parse MessagePack array of SQL parameters and store a result diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 5fb2aff..83b268b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1634,12 +1634,20 @@ error: tx_reply_error(msg); } +/** Callback to forward and error from mpstream methods. */ +static void +set_encode_error(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void tx_process_sql(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct obuf *out; struct sql_response response; + bool is_error = false; tx_fiber_init(msg->connection->session, msg->header.sync); @@ -1659,10 +1667,16 @@ tx_process_sql(struct cmsg *m) /* Prepare memory for the iproto header. */ if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) goto error; - if (sql_response_dump(&response, &keys, out) != 0) { + struct mpstream stream; + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, + set_encode_error, &is_error); + if (is_error) + goto error; + if (sql_response_dump(&response, &keys, &stream) != 0) { obuf_rollback_to_svp(out, &header_svp); goto error; } + mpstream_flush(&stream); iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); iproto_wpos_create(&msg->wpos, out); return; -- 2.7.4