* Re: [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
@ 2018-11-30 19:25 Мерген Имеев
2018-12-01 17:24 ` Vladimir Davydov
0 siblings, 1 reply; 3+ messages in thread
From: Мерген Имеев @ 2018-11-30 19:25 UTC (permalink / raw)
To: imeevma; +Cc: v.shpilevoy, tarantool-patches, kostja, vdavydov.dev
[-- Attachment #1: Type: text/plain, Size: 15198 bytes --]
Added Vova to recipients.
--
Отправлено из Mail.Ru для Android
пятница, 30 ноября 2018г., 22:01 +03:00 от imeevma@tarantool.org:
>This patch is the most dubious patch due to the implicit use of
>mpstream as a stream for obuf. There is currently no approved
>solution. Discussion and patch below.
>On 11/30/18 1:55 PM, Vladimir Davydov wrote:
>> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>>
>>>
>>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>> }
>>>>>>> int
>>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>>> + struct mpstream *stream)
>>>>>>> {
>>>>>>> sqlite3 *db = sql_get();
>>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>> if (column_count > 0) {
>>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>> err:
>>>>>>> rc = -1;
>>>>>>> goto finish;
>>>>>>> }
>>>>>>> *keys = 2;
>>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>>> - if (pos == NULL) {
>>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>>> - goto err;
>>>>>>> - }
>>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>>> - /*
>>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>>> - * with Tarantool 1.6
>>>>>>> - */
>>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>>> + mpstream_flush(stream);
>>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>>
>>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>>
>>>>>> And when you introduce vstream later, you simply move this code to
>>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>>> used in mpstream to port_dump instead of obuf?
>>>>>
>>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>>> and is used for box.select.
>>>>>
>>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>>
>>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>>> in both cases we not just dump in a specific format, but to a concrete
>>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>>> by obuf destination.
>>>>
>>>> There's port_dump_plain, which dumps port contents in a specific format.
>>>> So port_dump_obuf would look ambiguous.
>>>>
>>>>>
>>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>>> not hot.
>>>>
>>>> That would interconnect port and mpstream, make them dependent on each
>>>> other. I don't think that would be good.
>>>>
>>>>>
>>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>>> box.select and iproto_call will use port_dump_obuf.
>>>>>
>>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>>
>>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>>> because it's very limited in its applicability. People already prefer to
>>>> use box.call over box.insert/select/etc over iproto, and with the
>>>> appearance of box.execute they are likely to stop using plain box.select
>>>> at all.
>>>>
>>>> That said, personally I would try to pass reserve/alloc methods to port,
>>>> see how it goes.
>>>>
>>>
>>> I do not see a reason to slow down box.select if we can don't do it.
>>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>>> functions including select.
>>
>> box.select called from Lua code doesn't use port_dump_msgpack.
>>
>>>
>>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>>> port_dump_obuf and add port_dump_msgpack which does not depend on
>>> mpstream and takes alloc/reserve/ctx directly.
>>
>> Better call the optimized version (the one without callbacks)
>> port_dump_msgpack_obuf to avoid confusion IMO.
>>
>> Anyway, I'd try to run cbench to see if it really perfomrs better
>> than the one using callbacks.
>commit a857e1129f9e38bde8c5c6037149887f695ec6dc
>Author: Mergen Imeev <imeevma@gmail.com>
>Date: Fri Nov 9 22:31:07 2018 +0300
> iproto: replace obuf by mpstream in execute.c
> It is useful to replace obuf by mpstream. It allows us to design
> vstream. Interface vstream will be used as universal interface to
> dump result of SQL queries.
> Needed for #3505
>diff --git a/src/box/execute.c b/src/box/execute.c
>index 3a6cadf..0d266dd 100644
>--- a/src/box/execute.c
>+++ b/src/box/execute.c
>@@ -35,7 +35,6 @@
> #include "sql/sqliteLimit.h"
> #include "errcode.h"
> #include "small/region.h"
>-#include "small/obuf.h"
> #include "diag.h"
> #include "sql.h"
> #include "xrow.h"
>@@ -43,6 +42,7 @@
> #include "port.h"
> #include "tuple.h"
> #include "sql/vdbe.h"
>+#include "mpstream.h"
> const char *sql_type_strs[] = {
> NULL,
>@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
> * @retval -1 Client or memory error.
> */
> static inline int
>-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
>+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
> int column_count)
> {
> assert(column_count > 0);
>- int size = mp_sizeof_uint(IPROTO_METADATA) +
>- mp_sizeof_array(column_count);
>- char *pos = (char *) obuf_alloc(out, size);
>- if (pos == NULL) {
>- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>- return -1;
>- }
>- pos = mp_encode_uint(pos, IPROTO_METADATA);
>- pos = mp_encode_array(pos, column_count);
>+ mpstream_encode_uint(stream, IPROTO_METADATA);
>+ mpstream_encode_array(stream, column_count);
> for (int i = 0; i < column_count; ++i) {
>- size_t size = mp_sizeof_map(2) +
>- mp_sizeof_uint(IPROTO_FIELD_NAME) +
>- mp_sizeof_uint(IPROTO_FIELD_TYPE);
> const char *name = sqlite3_column_name(stmt, i);
> const char *type = sqlite3_column_datatype(stmt, i);
> /*
>@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
> * column_name simply returns them.
> */
> assert(name != NULL);
>- size += mp_sizeof_str(strlen(name));
>- size += mp_sizeof_str(strlen(type));
>- char *pos = (char *) obuf_alloc(out, size);
>- if (pos == NULL) {
>- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>- return -1;
>- }
>- pos = mp_encode_map(pos, 2);
>- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
>- pos = mp_encode_str(pos, name, strlen(name));
>- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
>- pos = mp_encode_str(pos, type, strlen(type));
>+ mpstream_encode_map(stream, 2);
>+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
>+ mpstream_encode_str(stream, name);
>+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
>+ mpstream_encode_str(stream, type);
> }
> return 0;
> }
>@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> }
> int
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>+sql_response_dump(struct sql_response *response, int *keys,
>+ struct mpstream *stream)
> {
> sqlite3 *db = sql_get();
> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> int rc = 0, column_count = sqlite3_column_count(stmt);
> if (column_count > 0) {
>- if (sql_get_description(stmt, out, column_count) != 0) {
>+ if (sql_get_description(stmt, stream, column_count) != 0) {
> err:
> rc = -1;
> goto finish;
> }
> *keys = 2;
>- int size = mp_sizeof_uint(IPROTO_DATA) +
>- mp_sizeof_array(port_tuple->size);
>- char *pos = (char *) obuf_alloc(out, size);
>- if (pos == NULL) {
>- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>- goto err;
>- }
>- pos = mp_encode_uint(pos, IPROTO_DATA);
>- pos = mp_encode_array(pos, port_tuple->size);
>- /*
>- * Just like SELECT, SQL uses output format compatible
>- * with Tarantool 1.6
>- */
>- if (port_dump_msgpack_16(&response->port, out) < 0) {
>+ mpstream_encode_uint(stream, IPROTO_DATA);
>+ mpstream_flush(stream);
>+ if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
> /* Failed port dump destroyes the port. */
> goto err;
> }
>+ mpstream_reset(stream);
> } else {
> *keys = 1;
>- assert(port_tuple->size == 0);
> struct stailq *autoinc_id_list =
> vdbe_autoinc_id_list((struct Vdbe *)stmt);
> uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
>- int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
>- mp_sizeof_map(map_size);
>- char *pos = (char *) obuf_alloc(out, size);
>- if (pos == NULL) {
>- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>- goto err;
>- }
>- pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
>- pos = mp_encode_map(pos, map_size);
>+ mpstream_encode_uint(stream, IPROTO_SQL_INFO);
>+ mpstream_encode_map(stream, map_size);
> uint64_t id_count = 0;
>- int changes = db->nChange;
>- size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
>- mp_sizeof_uint(changes);
> if (!stailq_empty(autoinc_id_list)) {
> struct autoinc_id_entry *id_entry;
>- stailq_foreach_entry(id_entry, autoinc_id_list, link) {
>- size += id_entry->id >= 0 ?
>- mp_sizeof_uint(id_entry->id) :
>- mp_sizeof_int(id_entry->id);
>+ stailq_foreach_entry(id_entry, autoinc_id_list, link)
> id_count++;
>- }
>- size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
>- mp_sizeof_array(id_count);
>- }
>- char *buf = obuf_alloc(out, size);
>- if (buf == NULL) {
>- diag_set(OutOfMemory, size, "obuf_alloc", "buf");
>- goto err;
> }
>- buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
>- buf = mp_encode_uint(buf, changes);
>+
>+ mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
>+ mpstream_encode_uint(stream, db->nChange);
> if (!stailq_empty(autoinc_id_list)) {
>- buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
>- buf = mp_encode_array(buf, id_count);
>+ mpstream_encode_uint(stream,
>+ SQL_INFO_AUTOINCREMENT_IDS);
>+ mpstream_encode_array(stream, id_count);
> struct autoinc_id_entry *id_entry;
> stailq_foreach_entry(id_entry, autoinc_id_list, link) {
>- buf = id_entry->id >= 0 ?
>- mp_encode_uint(buf, id_entry->id) :
>- mp_encode_int(buf, id_entry->id);
>+ int64_t value = id_entry->id;
>+ if (id_entry->id >= 0)
>+ mpstream_encode_uint(stream, value);
>+ else
>+ mpstream_encode_int(stream, value);
> }
> }
> }
>diff --git a/src/box/execute.h b/src/box/execute.h
>index 5f3d5eb..65ac81c 100644
>--- a/src/box/execute.h
>+++ b/src/box/execute.h
>@@ -48,10 +48,10 @@ enum sql_info_key {
> extern const char *sql_info_key_strs[];
>-struct obuf;
> struct region;
> struct sql_bind;
> struct xrow_header;
>+struct mpstream;
> /** EXECUTE request. */
> struct sql_request {
>@@ -105,13 +105,14 @@ struct sql_response {
> * +----------------------------------------------+
> * @param response EXECUTE response.
> * @param[out] keys number of keys in dumped map.
>- * @param out Output buffer.
>+ * @param stream stream to where result is written.
> *
> * @retval 0 Success.
> * @retval -1 Memory error.
> */
> int
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
>+sql_response_dump(struct sql_response *response, int *keys,
>+ struct mpstream *stream);
> /**
> * Parse the EXECUTE request.
>diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>index 7c11d05..b110900 100644
>--- a/src/box/iproto.cc
>+++ b/src/box/iproto.cc
>@@ -61,6 +61,7 @@
> #include "rmean.h"
> #include "execute.h"
> #include "errinj.h"
>+#include "mpstream.h"
> enum {
> IPROTO_SALT_SIZE = 32,
>@@ -1573,12 +1574,20 @@ error:
> tx_reply_error(msg);
> }
>+/** Callback to forward and error from mpstream methods. */
>+static void
>+set_encode_error(void *error_ctx)
>+{
>+ *(bool *)error_ctx = true;
>+}
>+
> static void
> tx_process_sql(struct cmsg *m)
> {
> struct iproto_msg *msg = tx_accept_msg(m);
> struct obuf *out;
> struct sql_response response;
>+ bool is_error = false;
> tx_fiber_init(msg->connection->session, msg->header.sync);
>@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
> /* Prepare memory for the iproto header. */
> if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
> goto error;
>- if (sql_response_dump(&response, &keys, out) != 0) {
>+ struct mpstream stream;
>+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
>+ set_encode_error, &is_error);
>+ if (is_error)
>+ goto error;
>+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
> obuf_rollback_to_svp(out, &header_svp);
> goto error;
> }
>+ mpstream_flush(&stream);
> iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
> iproto_wpos_create(&msg->wpos, out);
> return;
>--
>2.7.4
[-- Attachment #2: Type: text/html, Size: 22613 bytes --]
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-11-30 19:25 [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c Мерген Имеев
@ 2018-12-01 17:24 ` Vladimir Davydov
0 siblings, 0 replies; 3+ messages in thread
From: Vladimir Davydov @ 2018-12-01 17:24 UTC (permalink / raw)
To: Мерген
Имеев
Cc: v.shpilevoy, tarantool-patches, kostja
On Fri, Nov 30, 2018 at 10:25:30PM +0300, Мерген Имеев wrote:
> >@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
> > * column_name simply returns them.
> > */
> > assert(name != NULL);
> >- size += mp_sizeof_str(strlen(name));
> >- size += mp_sizeof_str(strlen(type));
> >- char *pos = (char *) obuf_alloc(out, size);
> >- if (pos == NULL) {
> >- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> >- return -1;
> >- }
> >- pos = mp_encode_map(pos, 2);
> >- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
> >- pos = mp_encode_str(pos, name, strlen(name));
> >- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
> >- pos = mp_encode_str(pos, type, strlen(type));
> >+ mpstream_encode_map(stream, 2);
> >+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
> >+ mpstream_encode_str(stream, name);
> >+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
> >+ mpstream_encode_str(stream, type);
> > }
> > return 0;
> > }
The patch is mangled - tabs have been replaced with spaces - so I can't
review it as it is now. Please fix your editor/mua/whatever and resend.
^ permalink raw reply [flat|nested] 3+ messages in thread
* [PATCH v4 0/5] Remove box.sql.execute()
@ 2018-11-30 19:00 imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
0 siblings, 1 reply; 3+ messages in thread
From: imeevma @ 2018-11-30 19:00 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, vdavydov.dev, kostja
The goal of this patch-set is to make functions from execute.c
the only way to execute SQL statements. This goal includes
similar output for executed SQL statements no matter how they
were executed: through net.box or through box.
This is the fourth version of patch-set. It is not complete. It
still has no last part, which is replacing box.sql.execute by
box.execute, because it will lead to massive test editing.
The main goal of this version of patch-set is to look at design of
new box.sql.execute().
The main purpose of this version of the patch-set is to look at
the new design of the box.sql.execute().
For now this patch-set blocked by #3832. Small temporary fix added
to temporary patch of patch-set.
https://github.com/tarantool/tarantool/issues/3505
https://github.com/tarantool/tarantool/tree/imeevma/gh-3505-replace-box_sql_execute-by-box_execute
General information of difference from previous version of
patch-set:
- Some patches were removed from patch-set because they were
pushed to 2.1.
- New patch was added. It moves port to src/ from src/box
A bit about patches of patch-set with comments about changes in
this version:
Patch 1 moves port to src/ from src/box. It allows us to use port
in mpstream.c/mpstream.h.
Patch 2 allows us to design vstream by wrapping mpstream
functions. At the moment, this is the most dubious patch due to
the implicit use of mpstream as a stream for obuf.
Patch 3 creates interface vstream and its mpstream implementation.
Difference from previous version:
- Function mpsvtream_init() was created and function
mp_vstream_init_vtab() was removed.
Patch 4 creates vstream implementation for Lua and defines new
box.sql.new_execute() function that will become box.execute() in
next vesions.
Difference from previous version:
- luastream.c moved to src/lua from src/box/lua.
- luastream.h created. Some functionality were moved here from
luastream.c.
- Function luasvtream_init() was created and function
lua_vstream_init_vtab() was removed.
- Some refactoring has been done.
Patch 5 is temporary patch. It created to check that
box.sql.new_execute() is able to pass through tests created for
box.sql.execute(). Now it contains small fix that allows us to
pass to sql_response_dump() result with rows without types.
v1: https://www.freelists.org/post/tarantool-patches/PATCH-v1-0010-sql-remove-boxsqlexecute
v2: https://www.freelists.org/post/tarantool-patches/PATCH-v2-07-Remove-boxsqlexecute
v3: https://www.freelists.org/post/tarantool-patches/PATCH-v3-07-Remove-boxsqlexecute
Mergen Imeev (4):
iproto: replace obuf by mpstream in execute.c
sql: create interface vstream
lua: create vstream implementation for Lua
sql: check new box.sql.execute()
Vladislav Shpilevoy (1):
box: move port to src/
src/CMakeLists.txt | 2 +
src/box/execute.c | 320 +++++++++++++++++++++++++++++++++++++------------
src/box/execute.h | 39 +++++-
src/box/iproto.cc | 23 +++-
src/box/lua/schema.lua | 23 ++++
src/box/lua/sql.c | 109 ++++-------------
src/box/lua/sql.h | 4 +
src/box/port.c | 30 -----
src/box/port.h | 103 +---------------
src/lua/luastream.c | 148 +++++++++++++++++++++++
src/lua/luastream.h | 47 ++++++++
src/mpstream.c | 57 +++++++++
src/mpstream.h | 7 ++
src/port.c | 37 ++++++
src/port.h | 127 ++++++++++++++++++++
src/vstream.h | 176 +++++++++++++++++++++++++++
16 files changed, 949 insertions(+), 303 deletions(-)
create mode 100644 src/lua/luastream.c
create mode 100644 src/lua/luastream.h
create mode 100644 src/port.c
create mode 100644 src/port.h
create mode 100644 src/vstream.h
--
2.7.4
^ permalink raw reply [flat|nested] 3+ messages in thread
* [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
@ 2018-11-30 19:01 ` imeevma
0 siblings, 0 replies; 3+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, kostja
This patch is the most dubious patch due to the implicit use of
mpstream as a stream for obuf. There is currently no approved
solution. Discussion and patch below.
On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>> }
>>>>>> int
>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>> + struct mpstream *stream)
>>>>>> {
>>>>>> sqlite3 *db = sql_get();
>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>> if (column_count > 0) {
>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>> err:
>>>>>> rc = -1;
>>>>>> goto finish;
>>>>>> }
>>>>>> *keys = 2;
>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>> - if (pos == NULL) {
>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>> - goto err;
>>>>>> - }
>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>> - /*
>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>> - * with Tarantool 1.6
>>>>>> - */
>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>> + mpstream_flush(stream);
>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>
>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>
>>>>> And when you introduce vstream later, you simply move this code to
>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>> used in mpstream to port_dump instead of obuf?
>>>>
>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>> and is used for box.select.
>>>>
>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>
>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>> in both cases we not just dump in a specific format, but to a concrete
>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>> by obuf destination.
>>>
>>> There's port_dump_plain, which dumps port contents in a specific format.
>>> So port_dump_obuf would look ambiguous.
>>>
>>>>
>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>> not hot.
>>>
>>> That would interconnect port and mpstream, make them dependent on each
>>> other. I don't think that would be good.
>>>
>>>>
>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>> box.select and iproto_call will use port_dump_obuf.
>>>>
>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>
>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>> because it's very limited in its applicability. People already prefer to
>>> use box.call over box.insert/select/etc over iproto, and with the
>>> appearance of box.execute they are likely to stop using plain box.select
>>> at all.
>>>
>>> That said, personally I would try to pass reserve/alloc methods to port,
>>> see how it goes.
>>>
>>
>> I do not see a reason to slow down box.select if we can don't do it.
>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>> functions including select.
>
> box.select called from Lua code doesn't use port_dump_msgpack.
>
>>
>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>> port_dump_obuf and add port_dump_msgpack which does not depend on
>> mpstream and takes alloc/reserve/ctx directly.
>
> Better call the optimized version (the one without callbacks)
> port_dump_msgpack_obuf to avoid confusion IMO.
>
> Anyway, I'd try to run cbench to see if it really perfomrs better
> than the one using callbacks.
commit a857e1129f9e38bde8c5c6037149887f695ec6dc
Author: Mergen Imeev <imeevma@gmail.com>
Date: Fri Nov 9 22:31:07 2018 +0300
iproto: replace obuf by mpstream in execute.c
It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.
Needed for #3505
diff --git a/src/box/execute.c b/src/box/execute.c
index 3a6cadf..0d266dd 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
#include "sql/sqliteLimit.h"
#include "errcode.h"
#include "small/region.h"
-#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
@@ -43,6 +42,7 @@
#include "port.h"
#include "tuple.h"
#include "sql/vdbe.h"
+#include "mpstream.h"
const char *sql_type_strs[] = {
NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
* @retval -1 Client or memory error.
*/
static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
int column_count)
{
assert(column_count > 0);
- int size = mp_sizeof_uint(IPROTO_METADATA) +
- mp_sizeof_array(column_count);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_uint(pos, IPROTO_METADATA);
- pos = mp_encode_array(pos, column_count);
+ mpstream_encode_uint(stream, IPROTO_METADATA);
+ mpstream_encode_array(stream, column_count);
for (int i = 0; i < column_count; ++i) {
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_FIELD_NAME) +
- mp_sizeof_uint(IPROTO_FIELD_TYPE);
const char *name = sqlite3_column_name(stmt, i);
const char *type = sqlite3_column_datatype(stmt, i);
/*
@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
* column_name simply returns them.
*/
assert(name != NULL);
- size += mp_sizeof_str(strlen(name));
- size += mp_sizeof_str(strlen(type));
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
- pos = mp_encode_str(pos, name, strlen(name));
- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
- pos = mp_encode_str(pos, type, strlen(type));
+ mpstream_encode_map(stream, 2);
+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+ mpstream_encode_str(stream, name);
+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+ mpstream_encode_str(stream, type);
}
return 0;
}
@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
}
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream)
{
sqlite3 *db = sql_get();
struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
int rc = 0, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
- if (sql_get_description(stmt, out, column_count) != 0) {
+ if (sql_get_description(stmt, stream, column_count) != 0) {
err:
rc = -1;
goto finish;
}
*keys = 2;
- int size = mp_sizeof_uint(IPROTO_DATA) +
- mp_sizeof_array(port_tuple->size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_DATA);
- pos = mp_encode_array(pos, port_tuple->size);
- /*
- * Just like SELECT, SQL uses output format compatible
- * with Tarantool 1.6
- */
- if (port_dump_msgpack_16(&response->port, out) < 0) {
+ mpstream_encode_uint(stream, IPROTO_DATA);
+ mpstream_flush(stream);
+ if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
/* Failed port dump destroyes the port. */
goto err;
}
+ mpstream_reset(stream);
} else {
*keys = 1;
- assert(port_tuple->size == 0);
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
- int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
- mp_sizeof_map(map_size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
- pos = mp_encode_map(pos, map_size);
+ mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+ mpstream_encode_map(stream, map_size);
uint64_t id_count = 0;
- int changes = db->nChange;
- size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
- mp_sizeof_uint(changes);
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
- stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- size += id_entry->id >= 0 ?
- mp_sizeof_uint(id_entry->id) :
- mp_sizeof_int(id_entry->id);
+ stailq_foreach_entry(id_entry, autoinc_id_list, link)
id_count++;
- }
- size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
- mp_sizeof_array(id_count);
- }
- char *buf = obuf_alloc(out, size);
- if (buf == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "buf");
- goto err;
}
- buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
- buf = mp_encode_uint(buf, changes);
+
+ mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+ mpstream_encode_uint(stream, db->nChange);
if (!stailq_empty(autoinc_id_list)) {
- buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
- buf = mp_encode_array(buf, id_count);
+ mpstream_encode_uint(stream,
+ SQL_INFO_AUTOINCREMENT_IDS);
+ mpstream_encode_array(stream, id_count);
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- buf = id_entry->id >= 0 ?
- mp_encode_uint(buf, id_entry->id) :
- mp_encode_int(buf, id_entry->id);
+ int64_t value = id_entry->id;
+ if (id_entry->id >= 0)
+ mpstream_encode_uint(stream, value);
+ else
+ mpstream_encode_int(stream, value);
}
}
}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
extern const char *sql_info_key_strs[];
-struct obuf;
struct region;
struct sql_bind;
struct xrow_header;
+struct mpstream;
/** EXECUTE request. */
struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
* +----------------------------------------------+
* @param response EXECUTE response.
* @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream);
/**
* Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
#include "rmean.h"
#include "execute.h"
#include "errinj.h"
+#include "mpstream.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
tx_reply_error(msg);
}
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+ *(bool *)error_ctx = true;
+}
+
static void
tx_process_sql(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct sql_response response;
+ bool is_error = false;
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
goto error;
- if (sql_response_dump(&response, &keys, out) != 0) {
+ struct mpstream stream;
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ set_encode_error, &is_error);
+ if (is_error)
+ goto error;
+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
obuf_rollback_to_svp(out, &header_svp);
goto error;
}
+ mpstream_flush(&stream);
iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
iproto_wpos_create(&msg->wpos, out);
return;
--
2.7.4
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2018-12-01 17:24 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-11-30 19:25 [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c Мерген Имеев
2018-12-01 17:24 ` Vladimir Davydov
-- strict thread matches above, loose matches on Subject: below --
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox