From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 526A82FE7C for ; Thu, 22 Nov 2018 14:11:01 -0500 (EST) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id bP_laLJCI3XF for ; Thu, 22 Nov 2018 14:11:01 -0500 (EST) Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id DF01B2FC8A for ; Thu, 22 Nov 2018 14:11:00 -0500 (EST) From: imeevma@tarantool.org Subject: [tarantool-patches] [PATCH v2 4/7] iproto: replace obuf by mpstream in execute.c Date: Thu, 22 Nov 2018 22:10:59 +0300 Message-Id: <73a6bdaa61ebd26734c704a2785e7669da507cd4.1542910674.git.imeevma@gmail.com> In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org It is useful to replace obuf by mpstream. It allows us to design vstream. Interface vstream will be used as universal interface to dump result of SQL queries. Needed for #3505 --- src/box/execute.c | 100 +++++++++++++++++------------------------------------- src/box/execute.h | 6 ++-- src/box/iproto.cc | 17 +++++++++- 3 files changed, 51 insertions(+), 72 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index d73681d..1f0e5ab 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -35,7 +35,6 @@ #include "sql/sqliteLimit.h" #include "errcode.h" #include "small/region.h" -#include "small/obuf.h" #include "diag.h" #include "sql.h" #include "xrow.h" @@ -43,6 +42,7 @@ #include "port.h" #include "tuple.h" #include "sql/vdbe.h" +#include "mpstream.h" const char *sql_type_strs[] = { NULL, @@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) * @retval -1 Client or memory error. */ static inline int -sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, +sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, int column_count) { assert(column_count > 0); - int size = mp_sizeof_uint(IPROTO_METADATA) + - mp_sizeof_array(column_count); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_uint(pos, IPROTO_METADATA); - pos = mp_encode_array(pos, column_count); + mpstream_encode_uint(stream, IPROTO_METADATA); + mpstream_encode_array(stream, column_count); for (int i = 0; i < column_count; ++i) { - size_t size = mp_sizeof_map(2) + - mp_sizeof_uint(IPROTO_FIELD_NAME) + - mp_sizeof_uint(IPROTO_FIELD_TYPE); const char *name = sqlite3_column_name(stmt, i); const char *type = sqlite3_column_datatype(stmt, i); /* @@ -557,18 +547,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, assert(name != NULL); if (type == NULL) type = "UNKNOWN"; - size += mp_sizeof_str(strlen(name)); - size += mp_sizeof_str(strlen(type)); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - return -1; - } - pos = mp_encode_map(pos, 2); - pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); - pos = mp_encode_str(pos, name, strlen(name)); - pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); - pos = mp_encode_str(pos, type, strlen(type)); + mpstream_encode_map(stream, 2); + mpstream_encode_uint(stream, IPROTO_FIELD_NAME); + mpstream_encode_str(stream, name); + mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); + mpstream_encode_str(stream, type); } return 0; } @@ -627,81 +610,60 @@ sql_prepare_and_execute(const struct sql_request *request, } int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream) { sqlite3 *db = sql_get(); struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; struct port_tuple *port_tuple = (struct port_tuple *) &response->port; int rc = 0, column_count = sqlite3_column_count(stmt); if (column_count > 0) { - if (sql_get_description(stmt, out, column_count) != 0) { + if (sql_get_description(stmt, stream, column_count) != 0) { err: rc = -1; goto finish; } *keys = 2; - int size = mp_sizeof_uint(IPROTO_DATA) + - mp_sizeof_array(port_tuple->size); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - goto err; - } - pos = mp_encode_uint(pos, IPROTO_DATA); - pos = mp_encode_array(pos, port_tuple->size); + mpstream_encode_uint(stream, IPROTO_DATA); + mpstream_encode_array(stream, port_tuple->size); + mpstream_flush(stream); /* * Just like SELECT, SQL uses output format compatible * with Tarantool 1.6 */ - if (port_dump_msgpack_16(&response->port, out) < 0) { + if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) { /* Failed port dump destroyes the port. */ goto err; } + mpstream_reset(stream); } else { *keys = 1; assert(port_tuple->size == 0); struct stailq *autoinc_id_list = vdbe_autoinc_id_list((struct Vdbe *)stmt); uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; - int size = mp_sizeof_uint(IPROTO_SQL_INFO) + - mp_sizeof_map(map_size); - char *pos = (char *) obuf_alloc(out, size); - if (pos == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "pos"); - goto err; - } - pos = mp_encode_uint(pos, IPROTO_SQL_INFO); - pos = mp_encode_map(pos, map_size); + mpstream_encode_uint(stream, IPROTO_SQL_INFO); + mpstream_encode_map(stream, map_size); uint64_t id_count = 0; - int changes = db->nChange; - size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + - mp_sizeof_uint(changes); if (!stailq_empty(autoinc_id_list)) { struct autoinc_id_entry *id_entry; - stailq_foreach_entry(id_entry, autoinc_id_list, link) { - size += id_entry->id >= 0 ? - mp_sizeof_uint(id_entry->id) : - mp_sizeof_int(id_entry->id); + stailq_foreach_entry(id_entry, autoinc_id_list, link) id_count++; - } - size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + - mp_sizeof_array(id_count); } - char *buf = obuf_alloc(out, size); - if (buf == NULL) { - diag_set(OutOfMemory, size, "obuf_alloc", "buf"); - goto err; - } - buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); - buf = mp_encode_uint(buf, changes); + + mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); + mpstream_encode_uint(stream, db->nChange); if (!stailq_empty(autoinc_id_list)) { - buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); - buf = mp_encode_array(buf, id_count); + mpstream_encode_uint(stream, + SQL_INFO_AUTOINCREMENT_IDS); + mpstream_encode_array(stream, id_count); struct autoinc_id_entry *id_entry; stailq_foreach_entry(id_entry, autoinc_id_list, link) { - buf = id_entry->id >= 0 ? - mp_encode_uint(buf, id_entry->id) : - mp_encode_int(buf, id_entry->id); + int64_t value = id_entry->id; + if (id_entry->id >= 0) + mpstream_encode_uint(stream, value); + else + mpstream_encode_int(stream, value); } } } diff --git a/src/box/execute.h b/src/box/execute.h index 5f3d5eb..940f3a3 100644 --- a/src/box/execute.h +++ b/src/box/execute.h @@ -52,6 +52,7 @@ struct obuf; struct region; struct sql_bind; struct xrow_header; +struct mpstream; /** EXECUTE request. */ struct sql_request { @@ -105,13 +106,14 @@ struct sql_response { * +----------------------------------------------+ * @param response EXECUTE response. * @param[out] keys number of keys in dumped map. - * @param out Output buffer. + * @param stream stream to where result is written. * * @retval 0 Success. * @retval -1 Memory error. */ int -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); +sql_response_dump(struct sql_response *response, int *keys, + struct mpstream *stream); /** * Parse the EXECUTE request. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 7c11d05..b110900 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -61,6 +61,7 @@ #include "rmean.h" #include "execute.h" #include "errinj.h" +#include "mpstream.h" enum { IPROTO_SALT_SIZE = 32, @@ -1573,12 +1574,20 @@ error: tx_reply_error(msg); } +/** Callback to forward and error from mpstream methods. */ +static void +set_encode_error(void *error_ctx) +{ + *(bool *)error_ctx = true; +} + static void tx_process_sql(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct obuf *out; struct sql_response response; + bool is_error = false; tx_fiber_init(msg->connection->session, msg->header.sync); @@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m) /* Prepare memory for the iproto header. */ if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) goto error; - if (sql_response_dump(&response, &keys, out) != 0) { + struct mpstream stream; + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, + set_encode_error, &is_error); + if (is_error) + goto error; + if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) { obuf_rollback_to_svp(out, &header_svp); goto error; } + mpstream_flush(&stream); iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); iproto_wpos_create(&msg->wpos, out); return; -- 2.7.4