<HTML><BODY><p style='margin-top: 0px;' dir="ltr">Added Vova to recipients.<br>
--<br>
 Отправлено из Mail.Ru для Android<br>
пятница, 30 ноября 2018г., 22:01 +03:00 от imeevma@tarantool.org:<br>
>This patch is the most dubious patch due to the implicit use of<br>
>mpstream as a stream for obuf. There is currently no approved<br>
>solution. Discussion and patch below.<br>
>On 11/30/18 1:55 PM, Vladimir Davydov wrote:<br>
>> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:<br>
>>><br>
>>><br>
>>> On 30/11/2018 13:19, Vladimir Davydov wrote:<br>
>>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:<br>
>>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:<br>
>>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:<br>
>>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,<br>
>>>>>>>    }<br>
>>>>>>>    int<br>
>>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)<br>
>>>>>>> +sql_response_dump(struct sql_response *response, int *keys,<br>
>>>>>>> +      struct mpstream *stream)<br>
>>>>>>>    {<br>
>>>>>>>      sqlite3 *db = sql_get();<br>
>>>>>>>      struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;<br>
>>>>>>> -  struct port_tuple *port_tuple = (struct port_tuple *) &response->port;<br>
>>>>>>>      int rc = 0, column_count = sqlite3_column_count(stmt);<br>
>>>>>>>      if (column_count > 0) {<br>
>>>>>>> -    if (sql_get_description(stmt, out, column_count) != 0) {<br>
>>>>>>> +    if (sql_get_description(stmt, stream, column_count) != 0) {<br>
>>>>>>>    err:<br>
>>>>>>>          rc = -1;<br>
>>>>>>>          goto finish;<br>
>>>>>>>        }<br>
>>>>>>>        *keys = 2;<br>
>>>>>>> -    int size = mp_sizeof_uint(IPROTO_DATA) +<br>
>>>>>>> -         mp_sizeof_array(port_tuple->size);<br>
>>>>>>> -    char *pos = (char *) obuf_alloc(out, size);<br>
>>>>>>> -    if (pos == NULL) {<br>
>>>>>>> -      diag_set(OutOfMemory, size, "obuf_alloc", "pos");<br>
>>>>>>> -      goto err;<br>
>>>>>>> -    }<br>
>>>>>>> -    pos = mp_encode_uint(pos, IPROTO_DATA);<br>
>>>>>>> -    pos = mp_encode_array(pos, port_tuple->size);<br>
>>>>>>> -    /*<br>
>>>>>>> -     * Just like SELECT, SQL uses output format compatible<br>
>>>>>>> -     * with Tarantool 1.6<br>
>>>>>>> -     */<br>
>>>>>>> -    if (port_dump_msgpack_16(&response->port, out) < 0) {<br>
>>>>>>> +    mpstream_encode_uint(stream, IPROTO_DATA);<br>
>>>>>>> +    mpstream_flush(stream);<br>
>>>>>>> +    if (port_dump_msgpack(&response->port, stream->ctx) < 0) {<br>
>>>>>><br>
>>>>>> stream->ctx isn't guaranteed to be an obuf<br>
>>>>>><br>
>>>>>> And when you introduce vstream later, you simply move this code to<br>
>>>>>> another file. This is confusing. May be we should pass alloc/reserve<br>
>>>>>> used in mpstream to port_dump instead of obuf?<br>
>>>>><br>
>>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack<br>
>>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path<br>
>>>>> and is used for box.select.<br>
>>>>><br>
>>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf<br>
>>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will<br>
>>>>> dump port to not obuf, then we will just add a new method to port_vtab.<br>
>>>>><br>
>>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -<br>
>>>>> in both cases we not just dump in a specific format, but to a concrete<br>
>>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted<br>
>>>>> by obuf destination.<br>
>>>><br>
>>>> There's port_dump_plain, which dumps port contents in a specific format.<br>
>>>> So port_dump_obuf would look ambiguous.<br>
>>>><br>
>>>>><br>
>>>>> If you worry about how to call sql_response_dump() to not obuf, then there<br>
>>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and<br>
>>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use<br>
>>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,<br>
>>>>> but use the full power of virtual functions in execute.c, which definitely is<br>
>>>>> not hot.<br>
>>>><br>
>>>> That would interconnect port and mpstream, make them dependent on each<br>
>>>> other. I don't think that would be good.<br>
>>>><br>
>>>>><br>
>>>>> mpstream implementation of vstream will call port_dump_mpstream, and<br>
>>>>> luastream implementation of vstream will call port_dump_lua as it does now.<br>
>>>>> box.select and iproto_call will use port_dump_obuf.<br>
>>>>><br>
>>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?<br>
>>>><br>
>>>> I may be wrong, but IMO there isn't much point in optimizing box.select,<br>
>>>> because it's very limited in its applicability. People already prefer to<br>
>>>> use box.call over box.insert/select/etc over iproto, and with the<br>
>>>> appearance of box.execute they are likely to stop using plain box.select<br>
>>>> at all.<br>
>>>><br>
>>>> That said, personally I would try to pass reserve/alloc methods to port,<br>
>>>> see how it goes.<br>
>>>><br>
>>><br>
>>> I do not see a reason to slow down box.select if we can don't do it.<br>
>>> Yeas, people use IPROTO_CALL, but in stored functions they use box<br>
>>> functions including select.<br>
>><br>
>> box.select called from Lua code doesn't use port_dump_msgpack.<br>
>><br>
>>><br>
>>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to<br>
>>> port_dump_obuf and add port_dump_msgpack which does not depend on<br>
>>> mpstream and takes alloc/reserve/ctx directly.<br>
>><br>
>> Better call the optimized version (the one without callbacks)<br>
>> port_dump_msgpack_obuf to avoid confusion IMO.<br>
>><br>
>> Anyway, I'd try to run cbench to see if it really perfomrs better<br>
>> than the one using callbacks.<br>
>commit a857e1129f9e38bde8c5c6037149887f695ec6dc<br>
>Author: Mergen Imeev <imeevma@gmail.com><br>
>Date:   Fri Nov 9 22:31:07 2018 +0300<br>
>    iproto: replace obuf by mpstream in execute.c<br>
>    It is useful to replace obuf by mpstream. It allows us to design<br>
>    vstream. Interface vstream will be used as universal interface to<br>
>    dump result of SQL queries.<br>
>    Needed for #3505<br>
>diff --git a/src/box/execute.c b/src/box/execute.c<br>
>index 3a6cadf..0d266dd 100644<br>
>--- a/src/box/execute.c<br>
>+++ b/src/box/execute.c<br>
>@@ -35,7 +35,6 @@<br>
> #include "sql/sqliteLimit.h"<br>
> #include "errcode.h"<br>
> #include "small/region.h"<br>
>-#include "small/obuf.h"<br>
> #include "diag.h"<br>
> #include "sql.h"<br>
> #include "xrow.h"<br>
>@@ -43,6 +42,7 @@<br>
> #include "port.h"<br>
> #include "tuple.h"<br>
> #include "sql/vdbe.h"<br>
>+#include "mpstream.h"<br>
> const char *sql_type_strs[] = {<br>
>  NULL,<br>
>@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)<br>
>  * @retval -1 Client or memory error.<br>
>  */<br>
> static inline int<br>
>-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,<br>
>+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,<br>
>        int column_count)<br>
> {<br>
>  assert(column_count > 0);<br>
>- int size = mp_sizeof_uint(IPROTO_METADATA) +<br>
>-      mp_sizeof_array(column_count);<br>
>- char *pos = (char *) obuf_alloc(out, size);<br>
>- if (pos == NULL) {<br>
>-   diag_set(OutOfMemory, size, "obuf_alloc", "pos");<br>
>-   return -1;<br>
>- }<br>
>- pos = mp_encode_uint(pos, IPROTO_METADATA);<br>
>- pos = mp_encode_array(pos, column_count);<br>
>+ mpstream_encode_uint(stream, IPROTO_METADATA);<br>
>+ mpstream_encode_array(stream, column_count);<br>
>  for (int i = 0; i < column_count; ++i) {<br>
>-   size_t size = mp_sizeof_map(2) +<br>
>-           mp_sizeof_uint(IPROTO_FIELD_NAME) +<br>
>-           mp_sizeof_uint(IPROTO_FIELD_TYPE);<br>
>    const char *name = sqlite3_column_name(stmt, i);<br>
>    const char *type = sqlite3_column_datatype(stmt, i);<br>
>    /*<br>
>@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,<br>
>     * column_name simply returns them.<br>
>     */<br>
>    assert(name != NULL);<br>
>-   size += mp_sizeof_str(strlen(name));<br>
>-   size += mp_sizeof_str(strlen(type));<br>
>-   char *pos = (char *) obuf_alloc(out, size);<br>
>-   if (pos == NULL) {<br>
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");<br>
>-     return -1;<br>
>-   }<br>
>-   pos = mp_encode_map(pos, 2);<br>
>-   pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);<br>
>-   pos = mp_encode_str(pos, name, strlen(name));<br>
>-   pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);<br>
>-   pos = mp_encode_str(pos, type, strlen(type));<br>
>+   mpstream_encode_map(stream, 2);<br>
>+   mpstream_encode_uint(stream, IPROTO_FIELD_NAME);<br>
>+   mpstream_encode_str(stream, name);<br>
>+   mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);<br>
>+   mpstream_encode_str(stream, type);<br>
>  }<br>
>  return 0;<br>
> }<br>
>@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,<br>
> }<br>
> int<br>
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)<br>
>+sql_response_dump(struct sql_response *response, int *keys,<br>
>+     struct mpstream *stream)<br>
> {<br>
>  sqlite3 *db = sql_get();<br>
>  struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;<br>
>- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;<br>
>  int rc = 0, column_count = sqlite3_column_count(stmt);<br>
>  if (column_count > 0) {<br>
>-   if (sql_get_description(stmt, out, column_count) != 0) {<br>
>+   if (sql_get_description(stmt, stream, column_count) != 0) {<br>
> err:<br>
>      rc = -1;<br>
>      goto finish;<br>
>    }<br>
>    *keys = 2;<br>
>-   int size = mp_sizeof_uint(IPROTO_DATA) +<br>
>-        mp_sizeof_array(port_tuple->size);<br>
>-   char *pos = (char *) obuf_alloc(out, size);<br>
>-   if (pos == NULL) {<br>
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");<br>
>-     goto err;<br>
>-   }<br>
>-   pos = mp_encode_uint(pos, IPROTO_DATA);<br>
>-   pos = mp_encode_array(pos, port_tuple->size);<br>
>-   /*<br>
>-    * Just like SELECT, SQL uses output format compatible<br>
>-    * with Tarantool 1.6<br>
>-    */<br>
>-   if (port_dump_msgpack_16(&response->port, out) < 0) {<br>
>+   mpstream_encode_uint(stream, IPROTO_DATA);<br>
>+   mpstream_flush(stream);<br>
>+   if (port_dump_msgpack(&response->port, stream->ctx) < 0) {<br>
>      /* Failed port dump destroyes the port. */<br>
>      goto err;<br>
>    }<br>
>+   mpstream_reset(stream);<br>
>  } else {<br>
>    *keys = 1;<br>
>-   assert(port_tuple->size == 0);<br>
>    struct stailq *autoinc_id_list =<br>
>      vdbe_autoinc_id_list((struct Vdbe *)stmt);<br>
>    uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;<br>
>-   int size = mp_sizeof_uint(IPROTO_SQL_INFO) +<br>
>-        mp_sizeof_map(map_size);<br>
>-   char *pos = (char *) obuf_alloc(out, size);<br>
>-   if (pos == NULL) {<br>
>-     diag_set(OutOfMemory, size, "obuf_alloc", "pos");<br>
>-     goto err;<br>
>-   }<br>
>-   pos = mp_encode_uint(pos, IPROTO_SQL_INFO);<br>
>-   pos = mp_encode_map(pos, map_size);<br>
>+   mpstream_encode_uint(stream, IPROTO_SQL_INFO);<br>
>+   mpstream_encode_map(stream, map_size);<br>
>    uint64_t id_count = 0;<br>
>-   int changes = db->nChange;<br>
>-   size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +<br>
>-          mp_sizeof_uint(changes);<br>
>    if (!stailq_empty(autoinc_id_list)) {<br>
>      struct autoinc_id_entry *id_entry;<br>
>-     stailq_foreach_entry(id_entry, autoinc_id_list, link) {<br>
>-       size += id_entry->id >= 0 ?<br>
>-         mp_sizeof_uint(id_entry->id) :<br>
>-         mp_sizeof_int(id_entry->id);<br>
>+     stailq_foreach_entry(id_entry, autoinc_id_list, link)<br>
>        id_count++;<br>
>-     }<br>
>-     size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +<br>
>-       mp_sizeof_array(id_count);<br>
>-   }<br>
>-   char *buf = obuf_alloc(out, size);<br>
>-   if (buf == NULL) {<br>
>-     diag_set(OutOfMemory, size, "obuf_alloc", "buf");<br>
>-     goto err;<br>
>    }<br>
>-   buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);<br>
>-   buf = mp_encode_uint(buf, changes);<br>
>+<br>
>+   mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);<br>
>+   mpstream_encode_uint(stream, db->nChange);<br>
>    if (!stailq_empty(autoinc_id_list)) {<br>
>-     buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);<br>
>-     buf = mp_encode_array(buf, id_count);<br>
>+     mpstream_encode_uint(stream,<br>
>+              SQL_INFO_AUTOINCREMENT_IDS);<br>
>+     mpstream_encode_array(stream, id_count);<br>
>      struct autoinc_id_entry *id_entry;<br>
>      stailq_foreach_entry(id_entry, autoinc_id_list, link) {<br>
>-       buf = id_entry->id >= 0 ?<br>
>-             mp_encode_uint(buf, id_entry->id) :<br>
>-             mp_encode_int(buf, id_entry->id);<br>
>+       int64_t value = id_entry->id;<br>
>+       if (id_entry->id >= 0)<br>
>+         mpstream_encode_uint(stream, value);<br>
>+       else<br>
>+         mpstream_encode_int(stream, value);<br>
>      }<br>
>    }<br>
>  }<br>
>diff --git a/src/box/execute.h b/src/box/execute.h<br>
>index 5f3d5eb..65ac81c 100644<br>
>--- a/src/box/execute.h<br>
>+++ b/src/box/execute.h<br>
>@@ -48,10 +48,10 @@ enum sql_info_key {<br>
> extern const char *sql_info_key_strs[];<br>
>-struct obuf;<br>
> struct region;<br>
> struct sql_bind;<br>
> struct xrow_header;<br>
>+struct mpstream;<br>
> /** EXECUTE request. */<br>
> struct sql_request {<br>
>@@ -105,13 +105,14 @@ struct sql_response {<br>
>  * +----------------------------------------------+<br>
>  * @param response EXECUTE response.<br>
>  * @param[out] keys number of keys in dumped map.<br>
>- * @param out Output buffer.<br>
>+ * @param stream stream to where result is written.<br>
>  *<br>
>  * @retval  0 Success.<br>
>  * @retval -1 Memory error.<br>
>  */<br>
> int<br>
>-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);<br>
>+sql_response_dump(struct sql_response *response, int *keys,<br>
>+     struct mpstream *stream);<br>
> /**<br>
>  * Parse the EXECUTE request.<br>
>diff --git a/src/box/iproto.cc b/src/box/iproto.cc<br>
>index 7c11d05..b110900 100644<br>
>--- a/src/box/iproto.cc<br>
>+++ b/src/box/iproto.cc<br>
>@@ -61,6 +61,7 @@<br>
> #include "rmean.h"<br>
> #include "execute.h"<br>
> #include "errinj.h"<br>
>+#include "mpstream.h"<br>
> enum {<br>
>  IPROTO_SALT_SIZE = 32,<br>
>@@ -1573,12 +1574,20 @@ error:<br>
>  tx_reply_error(msg);<br>
> }<br>
>+/** Callback to forward and error from mpstream methods. */<br>
>+static void<br>
>+set_encode_error(void *error_ctx)<br>
>+{<br>
>+ *(bool *)error_ctx = true;<br>
>+}<br>
>+<br>
> static void<br>
> tx_process_sql(struct cmsg *m)<br>
> {<br>
>  struct iproto_msg *msg = tx_accept_msg(m);<br>
>  struct obuf *out;<br>
>  struct sql_response response;<br>
>+ bool is_error = false;<br>
>  tx_fiber_init(msg->connection->session, msg->header.sync);<br>
>@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)<br>
>  /* Prepare memory for the iproto header. */<br>
>  if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)<br>
>    goto error;<br>
>- if (sql_response_dump(&response, &keys, out) != 0) {<br>
>+ struct mpstream stream;<br>
>+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,<br>
>+         set_encode_error, &is_error);<br>
>+ if (is_error)<br>
>+   goto error;<br>
>+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {<br>
>    obuf_rollback_to_svp(out, &header_svp);<br>
>    goto error;<br>
>  }<br>
>+ mpstream_flush(&stream);<br>
>  iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);<br>
>  iproto_wpos_create(&msg->wpos, out);<br>
>  return;<br>
>-- <br>
>2.7.4</p>
</BODY></HTML>