Re: [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c

Мерген Имеев imeevma at tarantool.org
Fri Nov 30 22:25:30 MSK 2018


Added Vova to recipients.
--
Отправлено из Mail.Ru для Android
пятница, 30 ноября 2018г., 22:01 +03:00 от imeevma at tarantool.org:
>This patch is the most dubious patch due to the implicit use of
>mpstream as a stream for obuf. There is currently no approved
>solution. Discussion and patch below.
>On 11/30/18 1:55 PM, Vladimir Davydov wrote:
>> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>>
>>>
>>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma at tarantool.org wrote:
>>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>>    }
>>>>>>>    int
>>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>>> +      struct mpstream *stream)
>>>>>>>    {
>>>>>>>      sqlite3 *db = sql_get();
>>>>>>>      struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>>> -  struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>>      int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>>      if (column_count > 0) {
>>>>>>> -    if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>>> +    if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>>    err:
>>>>>>>          rc = -1;
>>>>>>>          goto finish;
>>>>>>>        }
>>>>>>>        *keys = 2;
>>>>>>> -    int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>>> -         mp_sizeof_array(port_tuple->size);
>>>>>>> -    char *pos = (char *) obuf_alloc(out, size);
>>>>>>> -    if (pos == NULL) {
>>>>>>> -      diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>>> -      goto err;
>>>>>>> -    }
>>>>>>> -    pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>>> -    pos = mp_encode_array(pos, port_tuple->size);
>>>>>>> -    /*
>>>>>>> -     * Just like SELECT, SQL uses output format compatible
>>>>>>> -     * with Tarantool 1.6
>>>>>>> -     */
>>>>>>> -    if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>>> +    mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>>> +    mpstream_flush(stream);
>>>>>>> +    if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>>
>>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>>
>>>>>> And when you introduce vstream later, you simply move this code to
>>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>>> used in mpstream to port_dump instead of obuf?
>>>>>
>>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>>> and is used for box.select.
>>>>>
>>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>>
>>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>>> in both cases we not just dump in a specific format, but to a concrete
>>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>>> by obuf destination.
>>>>
>>>> There's port_dump_plain, which dumps port contents in a specific format.
>>>> So port_dump_obuf would look ambiguous.
>>>>
>>>>>
>>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>>> not hot.
>>>>
>>>> That would interconnect port and mpstream, make them dependent on each
>>>> other. I don't think that would be good.
>>>>
>>>>>
>>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>>> box.select and iproto_call will use port_dump_obuf.
>>>>>
>>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>>
>>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>>> because it's very limited in its applicability. People already prefer to
>>>> use box.call over box.insert/select/etc over iproto, and with the
>>>> appearance of box.execute they are likely to stop using plain box.select
>>>> at all.
>>>>
>>>> That said, personally I would try to pass reserve/alloc methods to port,
>>>> see how it goes.
>>>>
>>>
>>> I do not see a reason to slow down box.select if we can don't do it.
>>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>>> functions including select.
>>
>> box.select called from Lua code doesn't use port_dump_msgpack.
>>
>>>
>>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>>> port_dump_obuf and add port_dump_msgpack which does not depend on
>>> mpstream and takes alloc/reserve/ctx directly.
>>
>> Better call the optimized version (the one without callbacks)
>> port_dump_msgpack_obuf to avoid confusion IMO.
>>
>> Anyway, I'd try to run cbench to see if it really perfomrs better
>> than the one using callbacks.
>commit a857e1129f9e38bde8c5c6037149887f695ec6dc
>Author: Mergen Imeev <imeevma at gmail.com>
>Date:   Fri Nov 9 22:31:07 2018 +0300
>    iproto: replace obuf by mpstream in execute.c
>    It is useful to replace obuf by mpstream. It allows us to design
>    vstream. Interface vstream will be used as universal interface to
>    dump result of SQL queries.
>    Needed for #3505
>diff --git a/src/box/execute.c b/src/box/execute.c
>index 3a6cadf..0d266dd 100644
>--- a/src/box/execute.c
>+++ b/src/box/execute.c
>@@ -35,7 +35,6 @@
> #include "sql/sqliteLimit.h"
> #include "errcode.h"
> #include "small/region.h"
>-#include "small/obuf.h"
> #include "diag.h"
> #include "sql.h"
> #include "xrow.h"
>@@ -43,6 +42,7 @@
> #include "port.h"
> #include "tuple.h"
> #include "sql/vdbe.h"
>+#include "mpstream.h"
> const char *sql_type_strs[] = {
>  NULL,
>@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
>  * @retval -1 Client or memory error.
>  */
> static inline int
>-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
>+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
>        int column_count)
> {
>  assert(column_count > 0);
>- int size = mp_sizeof_uint(IPROTO_METADATA) +
>-      mp_sizeof_array(column_count);
>- char *pos = (char *) obuf_alloc(out, size);
>- if (pos == NULL) {
>-   diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>-   return -1;
>- }
>- pos = mp_encode_uint(pos, IPROTO_METADATA);
>- pos = mp_encode_array(pos, column_count);
>+ mpstream_encode_uint(stream, IPROTO_METADATA);
>+ mpstream_encode_array(stream, column_count);
>  for (int i = 0; i < column_count; ++i) {
>-   size_t size = mp_sizeof_map(2) +
>-           mp_sizeof_uint(IPROTO_FIELD_NAME) +
>-           mp_sizeof_uint(IPROTO_FIELD_TYPE);
>    const char *name = sqlite3_column_name(stmt, i);
>    const char *type = sqlite3_column_datatype(stmt, i);
>    /*
>@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
>     * column_name simply returns them.
>     */
>    assert(name != NULL);
>-   size += mp_sizeof_str(strlen(name));
>-   size += mp_sizeof_str(strlen(type));
>-   char *pos = (char *) obuf_alloc(out, size);
>-   if (pos == NULL) {
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>-     return -1;
>-   }
>-   pos = mp_encode_map(pos, 2);
>-   pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
>-   pos = mp_encode_str(pos, name, strlen(name));
>-   pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
>-   pos = mp_encode_str(pos, type, strlen(type));
>+   mpstream_encode_map(stream, 2);
>+   mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
>+   mpstream_encode_str(stream, name);
>+   mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
>+   mpstream_encode_str(stream, type);
>  }
>  return 0;
> }
>@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> }
> int
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>+sql_response_dump(struct sql_response *response, int *keys,
>+     struct mpstream *stream)
> {
>  sqlite3 *db = sql_get();
>  struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>  int rc = 0, column_count = sqlite3_column_count(stmt);
>  if (column_count > 0) {
>-   if (sql_get_description(stmt, out, column_count) != 0) {
>+   if (sql_get_description(stmt, stream, column_count) != 0) {
> err:
>      rc = -1;
>      goto finish;
>    }
>    *keys = 2;
>-   int size = mp_sizeof_uint(IPROTO_DATA) +
>-        mp_sizeof_array(port_tuple->size);
>-   char *pos = (char *) obuf_alloc(out, size);
>-   if (pos == NULL) {
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>-     goto err;
>-   }
>-   pos = mp_encode_uint(pos, IPROTO_DATA);
>-   pos = mp_encode_array(pos, port_tuple->size);
>-   /*
>-    * Just like SELECT, SQL uses output format compatible
>-    * with Tarantool 1.6
>-    */
>-   if (port_dump_msgpack_16(&response->port, out) < 0) {
>+   mpstream_encode_uint(stream, IPROTO_DATA);
>+   mpstream_flush(stream);
>+   if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>      /* Failed port dump destroyes the port. */
>      goto err;
>    }
>+   mpstream_reset(stream);
>  } else {
>    *keys = 1;
>-   assert(port_tuple->size == 0);
>    struct stailq *autoinc_id_list =
>      vdbe_autoinc_id_list((struct Vdbe *)stmt);
>    uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
>-   int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
>-        mp_sizeof_map(map_size);
>-   char *pos = (char *) obuf_alloc(out, size);
>-   if (pos == NULL) {
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>-     goto err;
>-   }
>-   pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
>-   pos = mp_encode_map(pos, map_size);
>+   mpstream_encode_uint(stream, IPROTO_SQL_INFO);
>+   mpstream_encode_map(stream, map_size);
>    uint64_t id_count = 0;
>-   int changes = db->nChange;
>-   size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
>-          mp_sizeof_uint(changes);
>    if (!stailq_empty(autoinc_id_list)) {
>      struct autoinc_id_entry *id_entry;
>-     stailq_foreach_entry(id_entry, autoinc_id_list, link) {
>-       size += id_entry->id >= 0 ?
>-         mp_sizeof_uint(id_entry->id) :
>-         mp_sizeof_int(id_entry->id);
>+     stailq_foreach_entry(id_entry, autoinc_id_list, link)
>        id_count++;
>-     }
>-     size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
>-       mp_sizeof_array(id_count);
>-   }
>-   char *buf = obuf_alloc(out, size);
>-   if (buf == NULL) {
>-     diag_set(OutOfMemory, size, "obuf_alloc", "buf");
>-     goto err;
>    }
>-   buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
>-   buf = mp_encode_uint(buf, changes);
>+
>+   mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
>+   mpstream_encode_uint(stream, db->nChange);
>    if (!stailq_empty(autoinc_id_list)) {
>-     buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
>-     buf = mp_encode_array(buf, id_count);
>+     mpstream_encode_uint(stream,
>+              SQL_INFO_AUTOINCREMENT_IDS);
>+     mpstream_encode_array(stream, id_count);
>      struct autoinc_id_entry *id_entry;
>      stailq_foreach_entry(id_entry, autoinc_id_list, link) {
>-       buf = id_entry->id >= 0 ?
>-             mp_encode_uint(buf, id_entry->id) :
>-             mp_encode_int(buf, id_entry->id);
>+       int64_t value = id_entry->id;
>+       if (id_entry->id >= 0)
>+         mpstream_encode_uint(stream, value);
>+       else
>+         mpstream_encode_int(stream, value);
>      }
>    }
>  }
>diff --git a/src/box/execute.h b/src/box/execute.h
>index 5f3d5eb..65ac81c 100644
>--- a/src/box/execute.h
>+++ b/src/box/execute.h
>@@ -48,10 +48,10 @@ enum sql_info_key {
> extern const char *sql_info_key_strs[];
>-struct obuf;
> struct region;
> struct sql_bind;
> struct xrow_header;
>+struct mpstream;
> /** EXECUTE request. */
> struct sql_request {
>@@ -105,13 +105,14 @@ struct sql_response {
>  * +----------------------------------------------+
>  * @param response EXECUTE response.
>  * @param[out] keys number of keys in dumped map.
>- * @param out Output buffer.
>+ * @param stream stream to where result is written.
>  *
>  * @retval  0 Success.
>  * @retval -1 Memory error.
>  */
> int
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
>+sql_response_dump(struct sql_response *response, int *keys,
>+     struct mpstream *stream);
> /**
>  * Parse the EXECUTE request.
>diff --git a/src/box/iproto.cc b/src/box/iproto.cc
>index 7c11d05..b110900 100644
>--- a/src/box/iproto.cc
>+++ b/src/box/iproto.cc
>@@ -61,6 +61,7 @@
> #include "rmean.h"
> #include "execute.h"
> #include "errinj.h"
>+#include "mpstream.h"
> enum {
>  IPROTO_SALT_SIZE = 32,
>@@ -1573,12 +1574,20 @@ error:
>  tx_reply_error(msg);
> }
>+/** Callback to forward and error from mpstream methods. */
>+static void
>+set_encode_error(void *error_ctx)
>+{
>+ *(bool *)error_ctx = true;
>+}
>+
> static void
> tx_process_sql(struct cmsg *m)
> {
>  struct iproto_msg *msg = tx_accept_msg(m);
>  struct obuf *out;
>  struct sql_response response;
>+ bool is_error = false;
>  tx_fiber_init(msg->connection->session, msg->header.sync);
>@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
>  /* Prepare memory for the iproto header. */
>  if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
>    goto error;
>- if (sql_response_dump(&response, &keys, out) != 0) {
>+ struct mpstream stream;
>+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
>+         set_encode_error, &is_error);
>+ if (is_error)
>+   goto error;
>+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
>    obuf_rollback_to_svp(out, &header_svp);
>    goto error;
>  }
>+ mpstream_flush(&stream);
>  iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
>  iproto_wpos_create(&msg->wpos, out);
>  return;
>-- 
>2.7.4
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.tarantool.org/pipermail/tarantool-patches/attachments/20181130/10099dcb/attachment.html>


More information about the Tarantool-patches mailing list