From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 602652DBFA for ; Fri, 6 Apr 2018 14:18:27 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id 3ZjhKrCkrBqP for ; Fri, 6 Apr 2018 14:18:27 -0400 (EDT) Received: from smtp48.i.mail.ru (smtp48.i.mail.ru [94.100.177.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 88D672DBF9 for ; Fri, 6 Apr 2018 14:18:26 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH 1/1] sql: get obuf for iproto response dump after execute() Date: Fri, 6 Apr 2018 21:18:22 +0300 Message-Id: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org The #3255 fix was not applied to SQL after merge into 2.0. This is the fix. The SQL execution must be separated from response encoding. At first, only after execution the obuf to write to is known. At second, in the future one SQL request will produce multiple SQL responses. Closes #3326 --- Branch: https://github.com/tarantool/tarantool/tree/gh-3326-sql-iproto-fix Issue: https://github.com/tarantool/tarantool/issues/3326 src/box/execute.c | 129 ++++++++++++++++++++--------------------------- src/box/execute.h | 55 +++++++++++++------- src/box/iproto.cc | 13 ++++- test/sql/errinj.result | 38 +++++++++++++- test/sql/errinj.test.lua | 19 ++++++- 5 files changed, 157 insertions(+), 97 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index 0d4f357af..e808123ba 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -551,10 +551,10 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, } static inline int -sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, int column_count, - struct port *port, struct region *region) +sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, struct port *port, + struct region *region) { - int rc; + int rc, column_count = sqlite3_column_count(stmt); if (column_count > 0) { /* Either ROW or DONE or ERROR. */ while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { @@ -575,106 +575,85 @@ sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, int column_count, return 0; } -/** - * Execute the prepared statement and write to the @out obuf the - * result. Result is either rows array in a case of not zero - * column count (SELECT), or SQL info in other cases. - * @param db SQLite engine. - * @param stmt Prepared statement. - * @param out Out buffer. - * @param sync IProto request sync. - * @param region Runtime allocator for temporary objects. - * - * @retval 0 Success. - * @retval -1 Client or memory error. - */ -static inline int -sql_execute_and_encode(sqlite3 *db, struct sqlite3_stmt *stmt, struct obuf *out, - uint64_t sync, struct region *region) +int +sql_prepare_and_execute(const struct sql_request *request, + struct sql_response *response, struct region *region) { - struct port port; - struct port_tuple *port_tuple = (struct port_tuple *)&port; - port_tuple_create(&port); - int column_count = sqlite3_column_count(stmt); - if (sql_execute(db, stmt, column_count, &port, region) != 0) - goto err_execute; + const char *sql = request->sql_text; + uint32_t len; + sql = mp_decode_str(&sql, &len); + struct sqlite3_stmt *stmt; + sqlite3 *db = sql_get(); + if (db == NULL) { + diag_set(ClientError, ER_LOADING); + return -1; + } + if (sqlite3_prepare_v2(db, sql, len, &stmt, NULL) != SQLITE_OK) { + diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db)); + return -1; + } + assert(stmt != NULL); + port_tuple_create(&response->port); + response->prep_stmt = stmt; + response->sync = request->sync; + if (sql_bind(request, stmt) == 0 && + sql_execute(db, stmt, &response->port, region) == 0) + return 0; + port_destroy(&response->port); + sqlite3_finalize(stmt); + return -1; +} - /* - * Encode response. - */ +int +sql_response_dump(struct sql_response *response, struct obuf *out) +{ struct obuf_svp header_svp; /* Prepare memory for the iproto header. */ if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) - goto err_execute; - int keys; + return -1; + sqlite3 *db = sql_get(); + struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; + struct port_tuple *port_tuple = (struct port_tuple *) &response->port; + int keys, rc = 0, column_count = sqlite3_column_count(stmt); if (column_count > 0) { - if (sql_get_description(stmt, out, column_count) != 0) - goto err_body; + if (sql_get_description(stmt, out, column_count) != 0) { +err: + obuf_rollback_to_svp(out, &header_svp); + rc = -1; + goto finish; + } keys = 2; if (iproto_reply_array_key(out, port_tuple->size, IPROTO_DATA) != 0) - goto err_body; + goto err; /* * Just like SELECT, SQL uses output format compatible * with Tarantool 1.6 */ - if (port_dump_16(&port, out) < 0) { + if (port_dump_16(&response->port, out) < 0) { /* Failed port dump destroyes the port. */ - goto err_body; + goto err; } } else { keys = 1; assert(port_tuple->size == 0); if (iproto_reply_map_key(out, 1, IPROTO_SQL_INFO) != 0) - goto err_body; + goto err; int changes = sqlite3_changes(db); int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + mp_sizeof_uint(changes); char *buf = obuf_alloc(out, size); if (buf == NULL) { diag_set(OutOfMemory, size, "obuf_alloc", "buf"); - goto err_body; + goto err; } buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); buf = mp_encode_uint(buf, changes); } - port_destroy(&port); - iproto_reply_sql(out, &header_svp, sync, schema_version, keys); - return 0; - -err_body: - obuf_rollback_to_svp(out, &header_svp); -err_execute: - port_destroy(&port); - return -1; -} - -int -sql_prepare_and_execute(const struct sql_request *request, struct obuf *out, - struct region *region) -{ - const char *sql = request->sql_text; - uint32_t len; - sql = mp_decode_str(&sql, &len); - struct sqlite3_stmt *stmt; - sqlite3 *db = sql_get(); - if (db == NULL) { - diag_set(ClientError, ER_LOADING); - return -1; - } - if (sqlite3_prepare_v2(db, sql, len, &stmt, NULL) != SQLITE_OK) { - diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db)); - return -1; - } - assert(stmt != NULL); - if (sql_bind(request, stmt) != 0) - goto err_stmt; - if (sql_execute_and_encode(db, stmt, out, request->sync, - region) != 0) - goto err_stmt; + iproto_reply_sql(out, &header_svp, response->sync, schema_version, + keys); +finish: + port_destroy(&response->port); sqlite3_finalize(stmt); - return 0; -err_stmt: - sqlite3_finalize(stmt); - return -1; + return rc; } diff --git a/src/box/execute.h b/src/box/execute.h index 149d25c66..f21393b7d 100644 --- a/src/box/execute.h +++ b/src/box/execute.h @@ -33,6 +33,7 @@ #include #include +#include "port.h" #if defined(__cplusplus) extern "C" { @@ -62,22 +63,19 @@ struct sql_request { uint32_t bind_count; }; -/** - * Parse the EXECUTE request. - * @param row Encoded data. - * @param[out] request Request to decode to. - * @param region Allocator. - * - * @retval 0 Sucess. - * @retval -1 Format or memory error. - */ -int -xrow_decode_sql(const struct xrow_header *row, struct sql_request *request, - struct region *region); +/** Response on EXECUTE request. */ +struct sql_response { + /** Request sync. */ + uint64_t sync; + /** Port with response data if any. */ + struct port port; + /** Prepared SQL statement with metadata. */ + void *prep_stmt; +}; /** - * Prepare and execute an SQL statement and encode the response in - * an iproto message. + * Dump a built response into @an out buffer. The response is + * destroyed. * Response structure: * +----------------------------------------------+ * | IPROTO_OK, sync, schema_version ... | iproto_header @@ -102,9 +100,32 @@ xrow_decode_sql(const struct xrow_header *row, struct sql_request *request, * | } | * | } | * +----------------------------------------------+ + * @param response EXECUTE response. + * @param out Output buffer. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +sql_response_dump(struct sql_response *response, struct obuf *out); + +/** + * Parse the EXECUTE request. + * @param row Encoded data. + * @param[out] request Request to decode to. + * @param region Allocator. * + * @retval 0 Sucess. + * @retval -1 Format or memory error. + */ +int +xrow_decode_sql(const struct xrow_header *row, struct sql_request *request, + struct region *region); + +/** + * Prepare and execute an SQL statement. * @param request IProto request. - * @param out Out buffer of the iproto message. + * @param[out] response Response to store result. * @param region Runtime allocator for temporary objects * (columns, tuples ...). * @@ -112,8 +133,8 @@ xrow_decode_sql(const struct xrow_header *row, struct sql_request *request, * @retval -1 Client or memory error. */ int -sql_prepare_and_execute(const struct sql_request *request, struct obuf *out, - struct region *region); +sql_prepare_and_execute(const struct sql_request *request, + struct sql_response *response, struct region *region); #if defined(__cplusplus) } /* extern "C" { */ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 16d88ba7c..9669b5f32 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1398,14 +1398,23 @@ static void tx_process_sql(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); - struct obuf *out = msg->connection->tx.p_obuf; + struct obuf *out; + struct sql_response response; tx_fiber_init(msg->connection->session, msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; assert(msg->header.type == IPROTO_EXECUTE); - if (sql_prepare_and_execute(&msg->sql, out, &fiber()->gc) != 0) + tx_inject_delay(); + if (sql_prepare_and_execute(&msg->sql, &response, &fiber()->gc) != 0) + goto error; + /* + * Take an obuf only after execute(). Else the buffer can + * become out of date during yield. + */ + out = msg->connection->tx.p_obuf; + if (sql_response_dump(&response, out) != 0) goto error; iproto_wpos_create(&msg->wpos, out); return; diff --git a/test/sql/errinj.result b/test/sql/errinj.result index 0a984ca81..00a0d6bab 100644 --- a/test/sql/errinj.result +++ b/test/sql/errinj.result @@ -75,9 +75,43 @@ select_res cn:close() --- ... -box.schema.user.revoke('guest', 'read,write,execute', 'universe') +box.sql.execute('drop table test') --- ... -box.sql.execute('drop table test') +-- +-- gh-3326: after the iproto start using new buffers rotation +-- policy, SQL responses could be corrupted, when DDL/DML is mixed +-- with DQL. Same as gh-3255. +-- +box.sql.execute('CREATE TABLE test (id integer primary key)') +--- +... +cn = remote.connect(box.cfg.listen) +--- +... +ch = fiber.channel(200) +--- +... +errinj.set("ERRINJ_IPROTO_TX_DELAY", true) +--- +- ok +... +for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn:execute('REPLACE INTO test VALUES (1)') end ch:put(true) end) end +--- +... +for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn.space.TEST:get{1} end ch:put(true) end) end +--- +... +for i = 1, 200 do ch:get() end +--- +... +errinj.set("ERRINJ_IPROTO_TX_DELAY", false) +--- +- ok +... +box.sql.execute('DROP TABLE test') +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... diff --git a/test/sql/errinj.test.lua b/test/sql/errinj.test.lua index df032b65c..63d306312 100644 --- a/test/sql/errinj.test.lua +++ b/test/sql/errinj.test.lua @@ -25,5 +25,22 @@ insert_res select_res cn:close() -box.schema.user.revoke('guest', 'read,write,execute', 'universe') box.sql.execute('drop table test') + +-- +-- gh-3326: after the iproto start using new buffers rotation +-- policy, SQL responses could be corrupted, when DDL/DML is mixed +-- with DQL. Same as gh-3255. +-- +box.sql.execute('CREATE TABLE test (id integer primary key)') +cn = remote.connect(box.cfg.listen) + +ch = fiber.channel(200) +errinj.set("ERRINJ_IPROTO_TX_DELAY", true) +for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn:execute('REPLACE INTO test VALUES (1)') end ch:put(true) end) end +for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn.space.TEST:get{1} end ch:put(true) end) end +for i = 1, 200 do ch:get() end +errinj.set("ERRINJ_IPROTO_TX_DELAY", false) + +box.sql.execute('DROP TABLE test') +box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- 2.14.3 (Apple Git-98)