From: imeevma@tarantool.org
To: tarantool-patches@freelists.org, v.shpilevoy@tarantool.org
Subject: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c
Date: Sat, 17 Nov 2018 17:03:56 +0300 [thread overview]
Message-ID: <502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com> (raw)
In-Reply-To: <cover.1542460773.git.imeevma@gmail.com>
It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.
Needed for #3505
---
src/box/execute.c | 92 +++++++++++++++++++++++--------------------------------
src/box/execute.h | 6 ++--
src/box/iproto.cc | 16 +++++++++-
3 files changed, 58 insertions(+), 56 deletions(-)
diff --git a/src/box/execute.c b/src/box/execute.c
index 285ae2e..5c2ec19 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
#include "sql/sqliteLimit.h"
#include "errcode.h"
#include "small/region.h"
-#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
@@ -454,23 +453,21 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
* @retval -1 Client or memory error.
*/
static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
int column_count)
{
assert(column_count > 0);
- char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+ char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
if (pos == NULL) {
- diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "obuf_alloc",
+ diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN, "mpstream_reserve",
"pos");
return -1;
}
pos = mp_store_u8(pos, IPROTO_METADATA);
pos = mp_store_u8(pos, 0xdd);
pos = mp_store_u32(pos, column_count);
+ mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
for (int i = 0; i < column_count; ++i) {
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_FIELD_NAME) +
- mp_sizeof_uint(IPROTO_FIELD_TYPE);
const char *name = sqlite3_column_name(stmt, i);
const char *type = sqlite3_column_datatype(stmt, i);
/*
@@ -479,18 +476,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
* column_name simply returns them.
*/
assert(name != NULL);
- size += mp_sizeof_str(strlen(name));
- size += mp_sizeof_str(strlen(type));
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
- pos = mp_encode_str(pos, name, strlen(name));
- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
- pos = mp_encode_str(pos, type, strlen(type));
+ mpstream_encode_map(stream, 2);
+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+ mpstream_encode_str(stream, name);
+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+ mpstream_encode_str(stream, type);
}
return 0;
}
@@ -549,81 +539,77 @@ sql_prepare_and_execute(const struct sql_request *request,
}
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream)
{
sqlite3 *db = sql_get();
struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
int rc = 0, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
- if (sql_get_description(stmt, out, column_count) != 0) {
+ if (sql_get_description(stmt, stream, column_count) != 0) {
err:
rc = -1;
goto finish;
}
*keys = 2;
- char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
+ char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
if (pos == NULL) {
diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
- "obuf_alloc", "pos");
+ "mpstream_reserve", "pos");
goto err;
}
pos = mp_store_u8(pos, IPROTO_DATA);
pos = mp_store_u8(pos, 0xdd);
pos = mp_store_u32(pos, port_tuple->size);
+ mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+ mpstream_flush(stream);
/*
* Just like SELECT, SQL uses output format compatible
* with Tarantool 1.6
*/
- if (port_dump_msgpack_16(&response->port, out) < 0) {
+ if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) {
/* Failed port dump destroyes the port. */
goto err;
}
+ mpstream_reset(stream);
} else {
*keys = 1;
assert(port_tuple->size == 0);
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
- char *pos = (char *) obuf_alloc(out, IPROTO_KEY_HEADER_LEN);
- if (pos == NULL) {
- diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
- "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_store_u8(pos, IPROTO_SQL_INFO);
- pos = mp_store_u8(pos, 0xdf);
- pos = mp_store_u32(pos, map_size);
uint64_t id_count = 0;
- int changes = db->nChange;
- int size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
- mp_sizeof_uint(changes);
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
- stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- size += id_entry->id >= 0 ?
- mp_sizeof_uint(id_entry->id) :
- mp_sizeof_int(id_entry->id);
+ stailq_foreach_entry(id_entry, autoinc_id_list, link)
id_count++;
- }
- size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
- mp_sizeof_array(id_count);
}
- char *buf = obuf_alloc(out, size);
- if (buf == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "buf");
+ char *pos = mpstream_reserve(stream, IPROTO_KEY_HEADER_LEN);
+ if (pos == NULL) {
+ diag_set(OutOfMemory, IPROTO_KEY_HEADER_LEN,
+ "mpstream_reserve", "pos");
goto err;
}
- buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
- buf = mp_encode_uint(buf, changes);
+ pos = mp_store_u8(pos, IPROTO_SQL_INFO);
+ pos = mp_store_u8(pos, 0xdf);
+ pos = mp_store_u32(pos, map_size);
+ mpstream_advance(stream, IPROTO_KEY_HEADER_LEN);
+
+ mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+ mpstream_encode_uint(stream, db->nChange);
if (!stailq_empty(autoinc_id_list)) {
- buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
- buf = mp_encode_array(buf, id_count);
+ mpstream_encode_uint(stream,
+ SQL_INFO_AUTOINCREMENT_IDS);
+ mpstream_encode_array(stream, id_count);
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- buf = id_entry->id >= 0 ?
- mp_encode_uint(buf, id_entry->id) :
- mp_encode_int(buf, id_entry->id);
+ int64_t value = id_entry->id;
+ if (id_entry->id >= 0)
+ mpstream_encode_uint(stream, value);
+ else
+ mpstream_encode_int(stream, value);
}
}
}
diff --git a/src/box/execute.h b/src/box/execute.h
index 2a242e8..7eb2121 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -34,6 +34,7 @@
#include <stdint.h>
#include <stdbool.h>
#include "port.h"
+#include "mpstream.h"
#if defined(__cplusplus)
extern "C" {
@@ -105,13 +106,14 @@ struct sql_response {
* +----------------------------------------------+
* @param response EXECUTE response.
* @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream);
/**
* Parse MessagePack array of SQL parameters and store a result
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 5fb2aff..83b268b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1634,12 +1634,20 @@ error:
tx_reply_error(msg);
}
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+ *(bool *)error_ctx = true;
+}
+
static void
tx_process_sql(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct sql_response response;
+ bool is_error = false;
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1659,10 +1667,16 @@ tx_process_sql(struct cmsg *m)
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
goto error;
- if (sql_response_dump(&response, &keys, out) != 0) {
+ struct mpstream stream;
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ set_encode_error, &is_error);
+ if (is_error)
+ goto error;
+ if (sql_response_dump(&response, &keys, &stream) != 0) {
obuf_rollback_to_svp(out, &header_svp);
goto error;
}
+ mpstream_flush(&stream);
iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
iproto_wpos_create(&msg->wpos, out);
return;
--
2.7.4
next prev parent reply other threads:[~2018-11-17 14:03 UTC|newest]
Thread overview: 18+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-11-17 14:03 [tarantool-patches] [PATCH v1 00/10] sql: remove box.sql.execute imeevma
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 01/10] box: store sql text and length in sql_request imeevma
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 02/10] iproto: remove iproto functions from execute.c imeevma
2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:03 ` imeevma [this message]
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 04/10] sql: create interface vstream imeevma
2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:03 ` [tarantool-patches] [PATCH v1 05/10] sql: EXPLAIN through net.box leads to SEGFAULT imeevma
2018-11-19 13:47 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 06/10] sql: SELECT from system spaces returns unpacked msgpack imeevma
2018-11-19 13:48 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 07/10] sql: too many autogenerated ids leads to SEGFAULT imeevma
2018-11-19 13:47 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 08/10] box: add method dump_lua to port imeevma
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 09/10] lua: create vstream implementation for Lua imeevma
2018-11-19 17:58 ` [tarantool-patches] " Vladislav Shpilevoy
2018-11-17 14:04 ` [tarantool-patches] [PATCH v1 10/10] sql: check new box.sql.execute() imeevma
2018-11-19 12:54 ` [tarantool-patches] Re: [PATCH v1 00/10] sql: remove box.sql.execute Vladislav Shpilevoy
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=502ecb5e584d564843facb9cb0db5a0120a82939.1542460773.git.imeevma@gmail.com \
--to=imeevma@tarantool.org \
--cc=tarantool-patches@freelists.org \
--cc=v.shpilevoy@tarantool.org \
--subject='Re: [tarantool-patches] [PATCH v1 03/10] iproto: replace obuf by mpstream in execute.c' \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox