* [tarantool-patches] [PATCH 1/1] sql: get obuf for iproto response dump after execute()
@ 2018-04-06 18:18 Vladislav Shpilevoy
0 siblings, 0 replies; only message in thread
From: Vladislav Shpilevoy @ 2018-04-06 18:18 UTC (permalink / raw)
To: tarantool-patches; +Cc: kostja
The #3255 fix was not applied to SQL after merge into
2.0. This is the fix.
The SQL execution must be separated from response encoding. At
first, only after execution the obuf to write to is known. At
second, in the future one SQL request will produce multiple
SQL responses.
Closes #3326
---
Branch: https://github.com/tarantool/tarantool/tree/gh-3326-sql-iproto-fix
Issue: https://github.com/tarantool/tarantool/issues/3326
src/box/execute.c | 129 ++++++++++++++++++++---------------------------
src/box/execute.h | 55 +++++++++++++-------
src/box/iproto.cc | 13 ++++-
test/sql/errinj.result | 38 +++++++++++++-
test/sql/errinj.test.lua | 19 ++++++-
5 files changed, 157 insertions(+), 97 deletions(-)
diff --git a/src/box/execute.c b/src/box/execute.c
index 0d4f357af..e808123ba 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -551,10 +551,10 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
}
static inline int
-sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, int column_count,
- struct port *port, struct region *region)
+sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, struct port *port,
+ struct region *region)
{
- int rc;
+ int rc, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
/* Either ROW or DONE or ERROR. */
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
@@ -575,106 +575,85 @@ sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, int column_count,
return 0;
}
-/**
- * Execute the prepared statement and write to the @out obuf the
- * result. Result is either rows array in a case of not zero
- * column count (SELECT), or SQL info in other cases.
- * @param db SQLite engine.
- * @param stmt Prepared statement.
- * @param out Out buffer.
- * @param sync IProto request sync.
- * @param region Runtime allocator for temporary objects.
- *
- * @retval 0 Success.
- * @retval -1 Client or memory error.
- */
-static inline int
-sql_execute_and_encode(sqlite3 *db, struct sqlite3_stmt *stmt, struct obuf *out,
- uint64_t sync, struct region *region)
+int
+sql_prepare_and_execute(const struct sql_request *request,
+ struct sql_response *response, struct region *region)
{
- struct port port;
- struct port_tuple *port_tuple = (struct port_tuple *)&port;
- port_tuple_create(&port);
- int column_count = sqlite3_column_count(stmt);
- if (sql_execute(db, stmt, column_count, &port, region) != 0)
- goto err_execute;
+ const char *sql = request->sql_text;
+ uint32_t len;
+ sql = mp_decode_str(&sql, &len);
+ struct sqlite3_stmt *stmt;
+ sqlite3 *db = sql_get();
+ if (db == NULL) {
+ diag_set(ClientError, ER_LOADING);
+ return -1;
+ }
+ if (sqlite3_prepare_v2(db, sql, len, &stmt, NULL) != SQLITE_OK) {
+ diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db));
+ return -1;
+ }
+ assert(stmt != NULL);
+ port_tuple_create(&response->port);
+ response->prep_stmt = stmt;
+ response->sync = request->sync;
+ if (sql_bind(request, stmt) == 0 &&
+ sql_execute(db, stmt, &response->port, region) == 0)
+ return 0;
+ port_destroy(&response->port);
+ sqlite3_finalize(stmt);
+ return -1;
+}
- /*
- * Encode response.
- */
+int
+sql_response_dump(struct sql_response *response, struct obuf *out)
+{
struct obuf_svp header_svp;
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
- goto err_execute;
- int keys;
+ return -1;
+ sqlite3 *db = sql_get();
+ struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
+ struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
+ int keys, rc = 0, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
- if (sql_get_description(stmt, out, column_count) != 0)
- goto err_body;
+ if (sql_get_description(stmt, out, column_count) != 0) {
+err:
+ obuf_rollback_to_svp(out, &header_svp);
+ rc = -1;
+ goto finish;
+ }
keys = 2;
if (iproto_reply_array_key(out, port_tuple->size,
IPROTO_DATA) != 0)
- goto err_body;
+ goto err;
/*
* Just like SELECT, SQL uses output format compatible
* with Tarantool 1.6
*/
- if (port_dump_16(&port, out) < 0) {
+ if (port_dump_16(&response->port, out) < 0) {
/* Failed port dump destroyes the port. */
- goto err_body;
+ goto err;
}
} else {
keys = 1;
assert(port_tuple->size == 0);
if (iproto_reply_map_key(out, 1, IPROTO_SQL_INFO) != 0)
- goto err_body;
+ goto err;
int changes = sqlite3_changes(db);
int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
mp_sizeof_uint(changes);
char *buf = obuf_alloc(out, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "obuf_alloc", "buf");
- goto err_body;
+ goto err;
}
buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
buf = mp_encode_uint(buf, changes);
}
- port_destroy(&port);
- iproto_reply_sql(out, &header_svp, sync, schema_version, keys);
- return 0;
-
-err_body:
- obuf_rollback_to_svp(out, &header_svp);
-err_execute:
- port_destroy(&port);
- return -1;
-}
-
-int
-sql_prepare_and_execute(const struct sql_request *request, struct obuf *out,
- struct region *region)
-{
- const char *sql = request->sql_text;
- uint32_t len;
- sql = mp_decode_str(&sql, &len);
- struct sqlite3_stmt *stmt;
- sqlite3 *db = sql_get();
- if (db == NULL) {
- diag_set(ClientError, ER_LOADING);
- return -1;
- }
- if (sqlite3_prepare_v2(db, sql, len, &stmt, NULL) != SQLITE_OK) {
- diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db));
- return -1;
- }
- assert(stmt != NULL);
- if (sql_bind(request, stmt) != 0)
- goto err_stmt;
- if (sql_execute_and_encode(db, stmt, out, request->sync,
- region) != 0)
- goto err_stmt;
+ iproto_reply_sql(out, &header_svp, response->sync, schema_version,
+ keys);
+finish:
+ port_destroy(&response->port);
sqlite3_finalize(stmt);
- return 0;
-err_stmt:
- sqlite3_finalize(stmt);
- return -1;
+ return rc;
}
diff --git a/src/box/execute.h b/src/box/execute.h
index 149d25c66..f21393b7d 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -33,6 +33,7 @@
#include <stdint.h>
#include <stdbool.h>
+#include "port.h"
#if defined(__cplusplus)
extern "C" {
@@ -62,22 +63,19 @@ struct sql_request {
uint32_t bind_count;
};
-/**
- * Parse the EXECUTE request.
- * @param row Encoded data.
- * @param[out] request Request to decode to.
- * @param region Allocator.
- *
- * @retval 0 Sucess.
- * @retval -1 Format or memory error.
- */
-int
-xrow_decode_sql(const struct xrow_header *row, struct sql_request *request,
- struct region *region);
+/** Response on EXECUTE request. */
+struct sql_response {
+ /** Request sync. */
+ uint64_t sync;
+ /** Port with response data if any. */
+ struct port port;
+ /** Prepared SQL statement with metadata. */
+ void *prep_stmt;
+};
/**
- * Prepare and execute an SQL statement and encode the response in
- * an iproto message.
+ * Dump a built response into @an out buffer. The response is
+ * destroyed.
* Response structure:
* +----------------------------------------------+
* | IPROTO_OK, sync, schema_version ... | iproto_header
@@ -102,9 +100,32 @@ xrow_decode_sql(const struct xrow_header *row, struct sql_request *request,
* | } |
* | } |
* +----------------------------------------------+
+ * @param response EXECUTE response.
+ * @param out Output buffer.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory error.
+ */
+int
+sql_response_dump(struct sql_response *response, struct obuf *out);
+
+/**
+ * Parse the EXECUTE request.
+ * @param row Encoded data.
+ * @param[out] request Request to decode to.
+ * @param region Allocator.
*
+ * @retval 0 Sucess.
+ * @retval -1 Format or memory error.
+ */
+int
+xrow_decode_sql(const struct xrow_header *row, struct sql_request *request,
+ struct region *region);
+
+/**
+ * Prepare and execute an SQL statement.
* @param request IProto request.
- * @param out Out buffer of the iproto message.
+ * @param[out] response Response to store result.
* @param region Runtime allocator for temporary objects
* (columns, tuples ...).
*
@@ -112,8 +133,8 @@ xrow_decode_sql(const struct xrow_header *row, struct sql_request *request,
* @retval -1 Client or memory error.
*/
int
-sql_prepare_and_execute(const struct sql_request *request, struct obuf *out,
- struct region *region);
+sql_prepare_and_execute(const struct sql_request *request,
+ struct sql_response *response, struct region *region);
#if defined(__cplusplus)
} /* extern "C" { */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 16d88ba7c..9669b5f32 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1398,14 +1398,23 @@ static void
tx_process_sql(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
- struct obuf *out = msg->connection->tx.p_obuf;
+ struct obuf *out;
+ struct sql_response response;
tx_fiber_init(msg->connection->session, msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
assert(msg->header.type == IPROTO_EXECUTE);
- if (sql_prepare_and_execute(&msg->sql, out, &fiber()->gc) != 0)
+ tx_inject_delay();
+ if (sql_prepare_and_execute(&msg->sql, &response, &fiber()->gc) != 0)
+ goto error;
+ /*
+ * Take an obuf only after execute(). Else the buffer can
+ * become out of date during yield.
+ */
+ out = msg->connection->tx.p_obuf;
+ if (sql_response_dump(&response, out) != 0)
goto error;
iproto_wpos_create(&msg->wpos, out);
return;
diff --git a/test/sql/errinj.result b/test/sql/errinj.result
index 0a984ca81..00a0d6bab 100644
--- a/test/sql/errinj.result
+++ b/test/sql/errinj.result
@@ -75,9 +75,43 @@ select_res
cn:close()
---
...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+box.sql.execute('drop table test')
---
...
-box.sql.execute('drop table test')
+--
+-- gh-3326: after the iproto start using new buffers rotation
+-- policy, SQL responses could be corrupted, when DDL/DML is mixed
+-- with DQL. Same as gh-3255.
+--
+box.sql.execute('CREATE TABLE test (id integer primary key)')
+---
+...
+cn = remote.connect(box.cfg.listen)
+---
+...
+ch = fiber.channel(200)
+---
+...
+errinj.set("ERRINJ_IPROTO_TX_DELAY", true)
+---
+- ok
+...
+for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn:execute('REPLACE INTO test VALUES (1)') end ch:put(true) end) end
+---
+...
+for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn.space.TEST:get{1} end ch:put(true) end) end
+---
+...
+for i = 1, 200 do ch:get() end
+---
+...
+errinj.set("ERRINJ_IPROTO_TX_DELAY", false)
+---
+- ok
+...
+box.sql.execute('DROP TABLE test')
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
---
...
diff --git a/test/sql/errinj.test.lua b/test/sql/errinj.test.lua
index df032b65c..63d306312 100644
--- a/test/sql/errinj.test.lua
+++ b/test/sql/errinj.test.lua
@@ -25,5 +25,22 @@ insert_res
select_res
cn:close()
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
box.sql.execute('drop table test')
+
+--
+-- gh-3326: after the iproto start using new buffers rotation
+-- policy, SQL responses could be corrupted, when DDL/DML is mixed
+-- with DQL. Same as gh-3255.
+--
+box.sql.execute('CREATE TABLE test (id integer primary key)')
+cn = remote.connect(box.cfg.listen)
+
+ch = fiber.channel(200)
+errinj.set("ERRINJ_IPROTO_TX_DELAY", true)
+for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn:execute('REPLACE INTO test VALUES (1)') end ch:put(true) end) end
+for i = 1, 100 do fiber.create(function() for j = 1, 10 do cn.space.TEST:get{1} end ch:put(true) end) end
+for i = 1, 200 do ch:get() end
+errinj.set("ERRINJ_IPROTO_TX_DELAY", false)
+
+box.sql.execute('DROP TABLE test')
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2018-04-06 18:18 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-04-06 18:18 [tarantool-patches] [PATCH 1/1] sql: get obuf for iproto response dump after execute() Vladislav Shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox