From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng3.m.smailru.net (smtpng3.m.smailru.net [94.100.177.149]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id D4DBC430D56 for ; Thu, 7 Nov 2019 04:05:15 +0300 (MSK) From: Nikita Pettik Date: Thu, 7 Nov 2019 04:04:55 +0300 Message-Id: <20191107010455.64457-16-korablev@tarantool.org> In-Reply-To: <20191107010455.64457-1-korablev@tarantool.org> References: <20191107010455.64457-1-korablev@tarantool.org> Subject: [Tarantool-patches] [PATCH 15/15] netbox: introduce prepared statements List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Cc: v.shpilevoy@tarantool.org This patch introduces support of prepared statements in IProto protocol. To achieve this new IProto command is added: IPROTO_PREPARE. It can come whether with IPROTO_SQL_TEXT key and it means to prepare SQL statement (for details see previous commit) or with IPROTO_QUERY_ID which in turn results in prepared statement invalidation. Also to reply on PREPARE request a few response keys are added: IPROTO_BIND_METADATA (contains parameters metadata), IPROTO_BIND_COUNT (count of parameters to be bound), IPROTO_QUERY_ID (contains id of prepared statement). Closes #2592 @TarantoolBot document Title: Prepared statements in SQL Now it is possible to 'prepare' (i.e. compile into byte-code and save to the cache) statement and execute it several times. Mechanism is similar to ones in other DBs. Prepared statement is identified by numeric sequential ids, which are returned alongside with prepared statement handle. Prepared statement cache is local to session. Its size is adjusted by box.cfg{sql_cache_size} variable (can be set dynamically; note that size of already created caches is not changed). Any DDL operation leads to expiration of all prepared statements: they should be manually removed. It can be done with box.session.sql_cache_erase(). Prepared statements are available in local mode (i.e. via box.prepare() function) and are supported in IProto protocol. Typical workflow with prepared statements is following: s = box.prepare("SELECT * FROM t WHERE id = ?;") s:execute({1}) or box.execute(s.query_id, {1}) s:execute({2}) or box.execute(s.query_id, {2}) s:unprepare() or box.unprepare(s.query_id) In terms of remote connection: cn = netbox:connect(addr) s = cn:prepare("SELECT * FROM t WHERE id = ?;") cn:execute(s.query_id, {1}) cn:unprepare(s.query_id) --- src/box/execute.c | 86 +++++++++++++++++++++++++++ src/box/iproto.cc | 68 +++++++++++++++++---- src/box/iproto_constants.c | 7 ++- src/box/iproto_constants.h | 5 ++ src/box/lua/net_box.c | 104 +++++++++++++++++++++++++++++++-- src/box/lua/net_box.lua | 28 +++++++++ src/box/xrow.c | 19 ++++-- src/box/xrow.h | 4 +- test/box/misc.result | 1 + test/sql/engine.cfg | 1 + test/sql/iproto.result | 2 +- test/sql/prepared.result | 143 +++++++++++++++++++++++++++------------------ test/sql/prepared.test.lua | 94 +++++++++++++++++++++-------- 13 files changed, 458 insertions(+), 104 deletions(-) diff --git a/src/box/execute.c b/src/box/execute.c index 3fd1afd8a..51b309ac1 100644 --- a/src/box/execute.c +++ b/src/box/execute.c @@ -326,6 +326,65 @@ sql_get_metadata(struct sql_stmt *stmt, struct obuf *out, int column_count) return 0; } +static inline int +sql_get_params_metadata(struct sql_stmt *stmt, struct obuf *out) +{ + int bind_count = sql_bind_parameter_count(stmt); + int size = mp_sizeof_uint(IPROTO_BIND_METADATA) + + mp_sizeof_array(bind_count); + char *pos = (char *) obuf_alloc(out, size); + if (pos == NULL) { + diag_set(OutOfMemory, size, "obuf_alloc", "pos"); + return -1; + } + pos = mp_encode_uint(pos, IPROTO_BIND_METADATA); + pos = mp_encode_array(pos, bind_count); + for (int i = 0; i < bind_count; ++i) { + size_t size = mp_sizeof_map(2) + + mp_sizeof_uint(IPROTO_FIELD_NAME) + + mp_sizeof_uint(IPROTO_FIELD_TYPE); + const char *name = sql_bind_parameter_name(stmt, i); + if (name == NULL) + name = "?"; + const char *type = "ANY"; + size += mp_sizeof_str(strlen(name)); + size += mp_sizeof_str(strlen(type)); + char *pos = (char *) obuf_alloc(out, size); + if (pos == NULL) { + diag_set(OutOfMemory, size, "obuf_alloc", "pos"); + return -1; + } + pos = mp_encode_map(pos, 2); + pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); + pos = mp_encode_str(pos, name, strlen(name)); + pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); + pos = mp_encode_str(pos, type, strlen(type)); + } + return 0; +} + +static int +sql_get_prepare_common_keys(struct sql_stmt *stmt, struct obuf *out, int keys, + uint32_t query_id) +{ + int size = mp_sizeof_map(keys) + + mp_sizeof_uint(IPROTO_QUERY_ID) + mp_sizeof_uint(query_id) + + mp_sizeof_uint(IPROTO_BIND_COUNT) + mp_sizeof_uint(sql_bind_parameter_count(stmt)); + char *pos = (char *) obuf_alloc(out, size); + if (pos == NULL) { + diag_set(OutOfMemory, size, "obuf_alloc", "pos"); + return -1; + } + pos = mp_encode_map(pos, keys); + pos = mp_encode_uint(pos, IPROTO_QUERY_ID); + pos = mp_encode_uint(pos, query_id); + pos = mp_encode_uint(pos, IPROTO_BIND_COUNT); + pos = mp_encode_uint(pos, sql_bind_parameter_count(stmt)); + if (sql_get_params_metadata(stmt, out) != 0) + return -1; + return 0; +} + static int port_sql_dump_msgpack(struct port *port, struct obuf *out) { @@ -407,6 +466,33 @@ port_sql_dump_msgpack(struct port *port, struct obuf *out) } break; } + case DQL_PREPARE: { + /* Format is following: + * query_id, + * param_count, + * params {name, type}, + * metadata {name, type} + */ + int keys = 4; + if (sql_get_prepare_common_keys(stmt, out, keys, + sql_port->query_id) != 0) + return -1; + if (sql_get_metadata(stmt, out, sql_column_count(stmt)) != 0) + return -1; + break; + } + case DML_PREPARE: { + /* Format is following: + * query_id, + * param_count, + * params {name, type}, + */ + int keys = 3; + if (sql_get_prepare_common_keys(stmt, out, keys, + sql_port->query_id) != 0) + return -1; + break; + } default: unreachable(); } return 0; diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 8f899fed8..266d1c28b 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -178,7 +178,7 @@ struct iproto_msg struct call_request call; /** Authentication request. */ struct auth_request auth; - /* SQL request, if this is the EXECUTE request. */ + /* SQL request, if this is the EXECUTE/PREPARE request. */ struct sql_request sql; /** In case of iproto parse error, saved diagnostics. */ struct diag diag; @@ -1155,6 +1155,7 @@ static const struct cmsg_hop *dml_route[IPROTO_TYPE_STAT_MAX] = { call_route, /* IPROTO_CALL */ sql_route, /* IPROTO_EXECUTE */ NULL, /* IPROTO_NOP */ + sql_route, /* IPROTO_PREPARE */ }; static const struct cmsg_hop join_route[] = { @@ -1210,6 +1211,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, cmsg_init(&msg->base, call_route); break; case IPROTO_EXECUTE: + case IPROTO_PREPARE: if (xrow_decode_sql(&msg->header, &msg->sql) != 0) goto error; cmsg_init(&msg->base, sql_route); @@ -1647,23 +1649,64 @@ tx_process_sql(struct cmsg *m) int bind_count = 0; const char *sql; uint32_t len; + bool is_unprepare = false; tx_fiber_init(msg->connection->session, msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; - assert(msg->header.type == IPROTO_EXECUTE); + assert(msg->header.type == IPROTO_EXECUTE || + msg->header.type == IPROTO_PREPARE); tx_inject_delay(); if (msg->sql.bind != NULL) { bind_count = sql_bind_list_decode(msg->sql.bind, &bind); if (bind_count < 0) goto error; } - sql = msg->sql.sql_text; - sql = mp_decode_str(&sql, &len); - if (sql_prepare_and_execute(sql, len, bind, bind_count, &port, - &fiber()->gc) != 0) - goto error; + /* + * There are four options: + * 1. Prepare SQL query (IPROTO_PREPARE + SQL string); + * 2. Unprepare SQL query (IPROTO_PREPARE + query id); + * 3. Execute SQL query (IPROTO_EXECUTE + SQL string); + * 4. Execute prepared query (IPROTO_EXECUTE + query id). + */ + if (msg->header.type == IPROTO_EXECUTE) { + if (msg->sql.sql_text != NULL) { + assert(msg->sql.query_id == NULL); + sql = msg->sql.sql_text; + sql = mp_decode_str(&sql, &len); + if (sql_prepare_and_execute(sql, len, bind, bind_count, + &port, &fiber()->gc) != 0) + goto error; + } else { + assert(msg->sql.sql_text == NULL); + assert(msg->sql.query_id != NULL); + sql = msg->sql.query_id; + uint32_t query_id = mp_decode_uint(&sql); + if (sql_execute_prepared(query_id, bind, bind_count, + &port, &fiber()->gc) != 0) + goto error; + } + } else { + /* IPROTO_PREPARE */ + if (msg->sql.sql_text != NULL) { + assert(msg->sql.query_id == NULL); + sql = msg->sql.sql_text; + sql = mp_decode_str(&sql, &len); + if (sql_prepare(sql, len, &port) != 0) + goto error; + } else { + /* UNPREPARE */ + assert(msg->sql.sql_text == NULL); + assert(msg->sql.query_id != NULL); + sql = msg->sql.query_id; + uint32_t query_id = mp_decode_uint(&sql); + if (sql_unprepare(query_id) != 0) + goto error; + is_unprepare = true; + } + } + /* * Take an obuf only after execute(). Else the buffer can * become out of date during yield. @@ -1675,12 +1718,15 @@ tx_process_sql(struct cmsg *m) port_destroy(&port); goto error; } - if (port_dump_msgpack(&port, out) != 0) { + /* Nothing to dump in case of UNPREPARE request. */ + if (! is_unprepare) { + if (port_dump_msgpack(&port, out) != 0) { + port_destroy(&port); + obuf_rollback_to_svp(out, &header_svp); + goto error; + } port_destroy(&port); - obuf_rollback_to_svp(out, &header_svp); - goto error; } - port_destroy(&port); iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); iproto_wpos_create(&msg->wpos, out); return; diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 09ded1ecb..2be8e5768 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -107,6 +107,7 @@ const char *iproto_type_strs[] = "CALL", "EXECUTE", NULL, /* NOP */ + "PREPARE", }; #define bit(c) (1ULL<sql_text = NULL; request->bind = NULL; + request->query_id = NULL; for (uint32_t i = 0; i < map_size; ++i) { uint8_t key = *data; - if (key != IPROTO_SQL_BIND && key != IPROTO_SQL_TEXT) { + if (key != IPROTO_SQL_BIND && key != IPROTO_SQL_TEXT && + key != IPROTO_QUERY_ID) { mp_check(&data, end); /* skip the key */ mp_check(&data, end); /* skip the value */ continue; @@ -588,12 +590,21 @@ error: goto error; if (key == IPROTO_SQL_BIND) request->bind = value; - else + else if (key == IPROTO_SQL_TEXT) request->sql_text = value; + else + request->query_id = value; + } + if (request->sql_text != NULL && request->query_id != NULL) { + xrow_on_decode_err(row->body[0].iov_base, end, ER_INVALID_MSGPACK, + "SQL text and query id are incompatible "\ + "options in one request: choose one"); + return -1; } - if (request->sql_text == NULL) { + if (request->sql_text == NULL && request->query_id == NULL) { xrow_on_decode_err(row->body[0].iov_base, end, ER_MISSING_REQUEST_FIELD, - iproto_key_name(IPROTO_SQL_TEXT)); + tt_sprintf("%s or %s", iproto_key_name(IPROTO_SQL_TEXT), + iproto_key_name(IPROTO_QUERY_ID))); return -1; } if (data != end) diff --git a/src/box/xrow.h b/src/box/xrow.h index 60def2d3c..aca35aa6a 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -526,10 +526,12 @@ int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version); -/** EXECUTE request. */ +/** EXECUTE/PREPARE request. */ struct sql_request { /** SQL statement text. */ const char *sql_text; + /** Id of prepared statement. In this case @sql_text == NULL. */ + const char *query_id; /** MessagePack array of parameters. */ const char *bind; }; diff --git a/test/box/misc.result b/test/box/misc.result index b75f615d5..d48b7cfcc 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -231,6 +231,7 @@ t; - EVAL - CALL - ERROR + - PREPARE - REPLACE - UPSERT - AUTH diff --git a/test/sql/engine.cfg b/test/sql/engine.cfg index a1b4b0fc5..e38bec24e 100644 --- a/test/sql/engine.cfg +++ b/test/sql/engine.cfg @@ -10,6 +10,7 @@ "local": {"remote": "false"} }, "prepared.test.lua": { + "remote": {"remote": "true"}, "local": {"remote": "false"} }, "*": { diff --git a/test/sql/iproto.result b/test/sql/iproto.result index 1e5c30aec..5a94a07d8 100644 --- a/test/sql/iproto.result +++ b/test/sql/iproto.result @@ -119,7 +119,7 @@ cn:execute('select id as identifier from test where a = 5;') -- netbox API errors. cn:execute(100) --- -- error: Syntax error near '100' +- error: Prepared statement with corresponding id 100 does not exist ... cn:execute('select 1', nil, {dry_run = true}) --- diff --git a/test/sql/prepared.result b/test/sql/prepared.result index 1646af94f..0de6882c6 100644 --- a/test/sql/prepared.result +++ b/test/sql/prepared.result @@ -12,34 +12,52 @@ fiber = require('fiber') -- Wrappers to make remote and local execution interface return -- same result pattern. -- -test_run:cmd("setopt delimiter ';'") +is_remote = test_run:get_cfg('remote') == 'true' | --- - | - true | ... -execute = function(...) - local res, err = box.execute(...) - if err ~= nil then - error(err) - end - return res -end; +execute = nil | --- | ... -prepare = function(...) - local res, err = box.prepare(...) - if err ~= nil then - error(err) - end - return res -end; +prepare = nil + | --- + | ... +unprepare = nil | --- | ... -unprepare = function(...) - local res, err = box.unprepare(...) - if err ~= nil then - error(err) + +test_run:cmd("setopt delimiter ';'") + | --- + | - true + | ... +if is_remote then + box.schema.user.grant('guest','read, write, execute', 'universe') + box.schema.user.grant('guest', 'create', 'space') + cn = remote.connect(box.cfg.listen) + execute = function(...) return cn:execute(...) end + prepare = function(...) return cn:prepare(...) end + unprepare = function(...) return cn:unprepare(...) end +else + execute = function(...) + local res, err = box.execute(...) + if err ~= nil then + error(err) + end + return res + end + prepare = function(...) + local res, err = box.prepare(...) + if err ~= nil then + error(err) + end + return res + end + unprepare = function(...) + local res, err = box.unprepare(...) + if err ~= nil then + error(err) + end + return res end - return res end; | --- | ... @@ -127,31 +145,25 @@ execute(s.query_id, {1, 3}) | type: string | rows: [] | ... -s:execute({1, 2}) + +test_run:cmd("setopt delimiter ';'") | --- - | - metadata: - | - name: ID - | type: integer - | - name: A - | type: number - | - name: B - | type: string - | rows: - | - [1, 2, '3'] + | - true | ... -s:execute({1, 3}) +if not is_remote then + res = s:execute({1, 2}) + assert(res ~= nil) + res = s:execute({1, 3}) + assert(res ~= nil) + s:unprepare() +else + unprepare(s.query_id) +end; | --- - | - metadata: - | - name: ID - | type: integer - | - name: A - | type: number - | - name: B - | type: string - | rows: [] | ... -s:unprepare() +test_run:cmd("setopt delimiter ''"); | --- + | - true | ... -- Test preparation of different types of queries. @@ -350,22 +362,23 @@ execute(s.query_id) | rows: | - [2] | ... -s:execute() +test_run:cmd("setopt delimiter ';'") | --- - | - metadata: - | - name: A - | type: number - | rows: - | - [2] + | - true | ... -s:execute() +if not is_remote then + res = s:execute() + assert(res ~= nil) + res = s:execute() + assert(res ~= nil) +end; | --- - | - metadata: - | - name: A - | type: number - | rows: - | - [2] | ... +test_run:cmd("setopt delimiter ''"); + | --- + | - true + | ... + unprepare(s.query_id) | --- | - null @@ -576,6 +589,12 @@ test_run:cmd("setopt delimiter ';'") | - true | ... box.cfg{sql_cache_size = 3000} +if is_remote then + cn:close() + cn = remote.connect(box.cfg.listen) +end; + | --- + | ... res = nil; | --- | ... @@ -624,10 +643,14 @@ ok = nil; | --- | ... _ = fiber.create(function() - for i = 1, 5 do - pcall(prepare, "SELECT * FROM test;") + if is_remote then + cn:eval("box.session.sql_cache_erase()") + else + for i = 1, 5 do + pcall(prepare, "SELECT * FROM test;") + end + box.session.sql_cache_erase() end - box.session.sql_cache_erase() ok, res = pcall(prepare, "SELECT * FROM test;") end); | --- @@ -644,6 +667,14 @@ assert(res ~= nil); | - true | ... +if is_remote then + cn:close() + box.schema.user.revoke('guest', 'read, write, execute', 'universe') + box.schema.user.revoke('guest', 'create', 'space') +end; + | --- + | ... + test_run:cmd("setopt delimiter ''"); | --- | - true diff --git a/test/sql/prepared.test.lua b/test/sql/prepared.test.lua index 680f0bdb3..a89063cdf 100644 --- a/test/sql/prepared.test.lua +++ b/test/sql/prepared.test.lua @@ -5,27 +5,41 @@ fiber = require('fiber') -- Wrappers to make remote and local execution interface return -- same result pattern. -- +is_remote = test_run:get_cfg('remote') == 'true' +execute = nil +prepare = nil +unprepare = nil + test_run:cmd("setopt delimiter ';'") -execute = function(...) - local res, err = box.execute(...) - if err ~= nil then - error(err) +if is_remote then + box.schema.user.grant('guest','read, write, execute', 'universe') + box.schema.user.grant('guest', 'create', 'space') + cn = remote.connect(box.cfg.listen) + execute = function(...) return cn:execute(...) end + prepare = function(...) return cn:prepare(...) end + unprepare = function(...) return cn:unprepare(...) end +else + execute = function(...) + local res, err = box.execute(...) + if err ~= nil then + error(err) + end + return res end - return res -end; -prepare = function(...) - local res, err = box.prepare(...) - if err ~= nil then - error(err) + prepare = function(...) + local res, err = box.prepare(...) + if err ~= nil then + error(err) + end + return res end - return res -end; -unprepare = function(...) - local res, err = box.unprepare(...) - if err ~= nil then - error(err) + unprepare = function(...) + local res, err = box.unprepare(...) + if err ~= nil then + error(err) + end + return res end - return res end; test_run:cmd("setopt delimiter ''"); @@ -45,9 +59,18 @@ s.params s.params_count execute(s.query_id, {1, 2}) execute(s.query_id, {1, 3}) -s:execute({1, 2}) -s:execute({1, 3}) -s:unprepare() + +test_run:cmd("setopt delimiter ';'") +if not is_remote then + res = s:execute({1, 2}) + assert(res ~= nil) + res = s:execute({1, 3}) + assert(res ~= nil) + s:unprepare() +else + unprepare(s.query_id) +end; +test_run:cmd("setopt delimiter ''"); -- Test preparation of different types of queries. -- Let's start from DDL. It doesn't make much sense since @@ -111,8 +134,15 @@ space:replace{7, 8.5, '9'} s = prepare("SELECT a FROM test WHERE b = '3';") execute(s.query_id) execute(s.query_id) -s:execute() -s:execute() +test_run:cmd("setopt delimiter ';'") +if not is_remote then + res = s:execute() + assert(res ~= nil) + res = s:execute() + assert(res ~= nil) +end; +test_run:cmd("setopt delimiter ''"); + unprepare(s.query_id) s = prepare("SELECT count(*), count(a - 3), max(b), abs(id) FROM test WHERE b = '3';") @@ -182,6 +212,10 @@ unprepare(s1.query_id) -- test_run:cmd("setopt delimiter ';'") box.cfg{sql_cache_size = 3000} +if is_remote then + cn:close() + cn = remote.connect(box.cfg.listen) +end; res = nil; _ = fiber.create(function() s = prepare("SELECT * FROM test;") @@ -207,16 +241,26 @@ res; res = nil; ok = nil; _ = fiber.create(function() - for i = 1, 5 do - pcall(prepare, "SELECT * FROM test;") + if is_remote then + cn:eval("box.session.sql_cache_erase()") + else + for i = 1, 5 do + pcall(prepare, "SELECT * FROM test;") + end + box.session.sql_cache_erase() end - box.session.sql_cache_erase() ok, res = pcall(prepare, "SELECT * FROM test;") end); while ok == nil do fiber.sleep(0.00001) end; assert(ok == true); assert(res ~= nil); +if is_remote then + cn:close() + box.schema.user.revoke('guest', 'read, write, execute', 'universe') + box.schema.user.revoke('guest', 'create', 'space') +end; + test_run:cmd("setopt delimiter ''"); box.space.TEST:drop() -- 2.15.1