Added Vova to recipients. -- Отправлено из Mail.Ru для Android пятница, 30 ноября 2018г., 22:01 +03:00 от imeevma@tarantool.org: >This patch is the most dubious patch due to the implicit use of >mpstream as a stream for obuf. There is currently no approved >solution. Discussion and patch below. >On 11/30/18 1:55 PM, Vladimir Davydov wrote: >> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote: >>> >>> >>> On 30/11/2018 13:19, Vladimir Davydov wrote: >>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote: >>>>> On 29/11/2018 13:53, Vladimir Davydov wrote: >>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote: >>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request, >>>>>>>    } >>>>>>>    int >>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) >>>>>>> +sql_response_dump(struct sql_response *response, int *keys, >>>>>>> +      struct mpstream *stream) >>>>>>>    { >>>>>>>      sqlite3 *db = sql_get(); >>>>>>>      struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; >>>>>>> -  struct port_tuple *port_tuple = (struct port_tuple *) &response->port; >>>>>>>      int rc = 0, column_count = sqlite3_column_count(stmt); >>>>>>>      if (column_count > 0) { >>>>>>> -    if (sql_get_description(stmt, out, column_count) != 0) { >>>>>>> +    if (sql_get_description(stmt, stream, column_count) != 0) { >>>>>>>    err: >>>>>>>          rc = -1; >>>>>>>          goto finish; >>>>>>>        } >>>>>>>        *keys = 2; >>>>>>> -    int size = mp_sizeof_uint(IPROTO_DATA) + >>>>>>> -         mp_sizeof_array(port_tuple->size); >>>>>>> -    char *pos = (char *) obuf_alloc(out, size); >>>>>>> -    if (pos == NULL) { >>>>>>> -      diag_set(OutOfMemory, size, "obuf_alloc", "pos"); >>>>>>> -      goto err; >>>>>>> -    } >>>>>>> -    pos = mp_encode_uint(pos, IPROTO_DATA); >>>>>>> -    pos = mp_encode_array(pos, port_tuple->size); >>>>>>> -    /* >>>>>>> -     * Just like SELECT, SQL uses output format compatible >>>>>>> -     * with Tarantool 1.6 >>>>>>> -     */ >>>>>>> -    if (port_dump_msgpack_16(&response->port, out) < 0) { >>>>>>> +    mpstream_encode_uint(stream, IPROTO_DATA); >>>>>>> +    mpstream_flush(stream); >>>>>>> +    if (port_dump_msgpack(&response->port, stream->ctx) < 0) { >>>>>> >>>>>> stream->ctx isn't guaranteed to be an obuf >>>>>> >>>>>> And when you introduce vstream later, you simply move this code to >>>>>> another file. This is confusing. May be we should pass alloc/reserve >>>>>> used in mpstream to port_dump instead of obuf? >>>>> >>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack >>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path >>>>> and is used for box.select. >>>>> >>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf >>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will >>>>> dump port to not obuf, then we will just add a new method to port_vtab. >>>>> >>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua - >>>>> in both cases we not just dump in a specific format, but to a concrete >>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted >>>>> by obuf destination. >>>> >>>> There's port_dump_plain, which dumps port contents in a specific format. >>>> So port_dump_obuf would look ambiguous. >>>> >>>>> >>>>> If you worry about how to call sql_response_dump() to not obuf, then there >>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and >>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use >>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select, >>>>> but use the full power of virtual functions in execute.c, which definitely is >>>>> not hot. >>>> >>>> That would interconnect port and mpstream, make them dependent on each >>>> other. I don't think that would be good. >>>> >>>>> >>>>> mpstream implementation of vstream will call port_dump_mpstream, and >>>>> luastream implementation of vstream will call port_dump_lua as it does now. >>>>> box.select and iproto_call will use port_dump_obuf. >>>>> >>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you? >>>> >>>> I may be wrong, but IMO there isn't much point in optimizing box.select, >>>> because it's very limited in its applicability. People already prefer to >>>> use box.call over box.insert/select/etc over iproto, and with the >>>> appearance of box.execute they are likely to stop using plain box.select >>>> at all. >>>> >>>> That said, personally I would try to pass reserve/alloc methods to port, >>>> see how it goes. >>>> >>> >>> I do not see a reason to slow down box.select if we can don't do it. >>> Yeas, people use IPROTO_CALL, but in stored functions they use box >>> functions including select. >> >> box.select called from Lua code doesn't use port_dump_msgpack. >> >>> >>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to >>> port_dump_obuf and add port_dump_msgpack which does not depend on >>> mpstream and takes alloc/reserve/ctx directly. >> >> Better call the optimized version (the one without callbacks) >> port_dump_msgpack_obuf to avoid confusion IMO. >> >> Anyway, I'd try to run cbench to see if it really perfomrs better >> than the one using callbacks. >commit a857e1129f9e38bde8c5c6037149887f695ec6dc >Author: Mergen Imeev >Date:   Fri Nov 9 22:31:07 2018 +0300 >    iproto: replace obuf by mpstream in execute.c >    It is useful to replace obuf by mpstream. It allows us to design >    vstream. Interface vstream will be used as universal interface to >    dump result of SQL queries. >    Needed for #3505 >diff --git a/src/box/execute.c b/src/box/execute.c >index 3a6cadf..0d266dd 100644 >--- a/src/box/execute.c >+++ b/src/box/execute.c >@@ -35,7 +35,6 @@ > #include "sql/sqliteLimit.h" > #include "errcode.h" > #include "small/region.h" >-#include "small/obuf.h" > #include "diag.h" > #include "sql.h" > #include "xrow.h" >@@ -43,6 +42,7 @@ > #include "port.h" > #include "tuple.h" > #include "sql/vdbe.h" >+#include "mpstream.h" > const char *sql_type_strs[] = { >  NULL, >@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt) >  * @retval -1 Client or memory error. >  */ > static inline int >-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, >+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream, >        int column_count) > { >  assert(column_count > 0); >- int size = mp_sizeof_uint(IPROTO_METADATA) + >-      mp_sizeof_array(column_count); >- char *pos = (char *) obuf_alloc(out, size); >- if (pos == NULL) { >-   diag_set(OutOfMemory, size, "obuf_alloc", "pos"); >-   return -1; >- } >- pos = mp_encode_uint(pos, IPROTO_METADATA); >- pos = mp_encode_array(pos, column_count); >+ mpstream_encode_uint(stream, IPROTO_METADATA); >+ mpstream_encode_array(stream, column_count); >  for (int i = 0; i < column_count; ++i) { >-   size_t size = mp_sizeof_map(2) + >-           mp_sizeof_uint(IPROTO_FIELD_NAME) + >-           mp_sizeof_uint(IPROTO_FIELD_TYPE); >    const char *name = sqlite3_column_name(stmt, i); >    const char *type = sqlite3_column_datatype(stmt, i); >    /* >@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out, >     * column_name simply returns them. >     */ >    assert(name != NULL); >-   size += mp_sizeof_str(strlen(name)); >-   size += mp_sizeof_str(strlen(type)); >-   char *pos = (char *) obuf_alloc(out, size); >-   if (pos == NULL) { >-     diag_set(OutOfMemory, size, "obuf_alloc", "pos"); >-     return -1; >-   } >-   pos = mp_encode_map(pos, 2); >-   pos = mp_encode_uint(pos, IPROTO_FIELD_NAME); >-   pos = mp_encode_str(pos, name, strlen(name)); >-   pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE); >-   pos = mp_encode_str(pos, type, strlen(type)); >+   mpstream_encode_map(stream, 2); >+   mpstream_encode_uint(stream, IPROTO_FIELD_NAME); >+   mpstream_encode_str(stream, name); >+   mpstream_encode_uint(stream, IPROTO_FIELD_TYPE); >+   mpstream_encode_str(stream, type); >  } >  return 0; > } >@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request, > } > int >-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out) >+sql_response_dump(struct sql_response *response, int *keys, >+     struct mpstream *stream) > { >  sqlite3 *db = sql_get(); >  struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt; >- struct port_tuple *port_tuple = (struct port_tuple *) &response->port; >  int rc = 0, column_count = sqlite3_column_count(stmt); >  if (column_count > 0) { >-   if (sql_get_description(stmt, out, column_count) != 0) { >+   if (sql_get_description(stmt, stream, column_count) != 0) { > err: >      rc = -1; >      goto finish; >    } >    *keys = 2; >-   int size = mp_sizeof_uint(IPROTO_DATA) + >-        mp_sizeof_array(port_tuple->size); >-   char *pos = (char *) obuf_alloc(out, size); >-   if (pos == NULL) { >-     diag_set(OutOfMemory, size, "obuf_alloc", "pos"); >-     goto err; >-   } >-   pos = mp_encode_uint(pos, IPROTO_DATA); >-   pos = mp_encode_array(pos, port_tuple->size); >-   /* >-    * Just like SELECT, SQL uses output format compatible >-    * with Tarantool 1.6 >-    */ >-   if (port_dump_msgpack_16(&response->port, out) < 0) { >+   mpstream_encode_uint(stream, IPROTO_DATA); >+   mpstream_flush(stream); >+   if (port_dump_msgpack(&response->port, stream->ctx) < 0) { >      /* Failed port dump destroyes the port. */ >      goto err; >    } >+   mpstream_reset(stream); >  } else { >    *keys = 1; >-   assert(port_tuple->size == 0); >    struct stailq *autoinc_id_list = >      vdbe_autoinc_id_list((struct Vdbe *)stmt); >    uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2; >-   int size = mp_sizeof_uint(IPROTO_SQL_INFO) + >-        mp_sizeof_map(map_size); >-   char *pos = (char *) obuf_alloc(out, size); >-   if (pos == NULL) { >-     diag_set(OutOfMemory, size, "obuf_alloc", "pos"); >-     goto err; >-   } >-   pos = mp_encode_uint(pos, IPROTO_SQL_INFO); >-   pos = mp_encode_map(pos, map_size); >+   mpstream_encode_uint(stream, IPROTO_SQL_INFO); >+   mpstream_encode_map(stream, map_size); >    uint64_t id_count = 0; >-   int changes = db->nChange; >-   size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) + >-          mp_sizeof_uint(changes); >    if (!stailq_empty(autoinc_id_list)) { >      struct autoinc_id_entry *id_entry; >-     stailq_foreach_entry(id_entry, autoinc_id_list, link) { >-       size += id_entry->id >= 0 ? >-         mp_sizeof_uint(id_entry->id) : >-         mp_sizeof_int(id_entry->id); >+     stailq_foreach_entry(id_entry, autoinc_id_list, link) >        id_count++; >-     } >-     size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) + >-       mp_sizeof_array(id_count); >-   } >-   char *buf = obuf_alloc(out, size); >-   if (buf == NULL) { >-     diag_set(OutOfMemory, size, "obuf_alloc", "buf"); >-     goto err; >    } >-   buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT); >-   buf = mp_encode_uint(buf, changes); >+ >+   mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT); >+   mpstream_encode_uint(stream, db->nChange); >    if (!stailq_empty(autoinc_id_list)) { >-     buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS); >-     buf = mp_encode_array(buf, id_count); >+     mpstream_encode_uint(stream, >+              SQL_INFO_AUTOINCREMENT_IDS); >+     mpstream_encode_array(stream, id_count); >      struct autoinc_id_entry *id_entry; >      stailq_foreach_entry(id_entry, autoinc_id_list, link) { >-       buf = id_entry->id >= 0 ? >-             mp_encode_uint(buf, id_entry->id) : >-             mp_encode_int(buf, id_entry->id); >+       int64_t value = id_entry->id; >+       if (id_entry->id >= 0) >+         mpstream_encode_uint(stream, value); >+       else >+         mpstream_encode_int(stream, value); >      } >    } >  } >diff --git a/src/box/execute.h b/src/box/execute.h >index 5f3d5eb..65ac81c 100644 >--- a/src/box/execute.h >+++ b/src/box/execute.h >@@ -48,10 +48,10 @@ enum sql_info_key { > extern const char *sql_info_key_strs[]; >-struct obuf; > struct region; > struct sql_bind; > struct xrow_header; >+struct mpstream; > /** EXECUTE request. */ > struct sql_request { >@@ -105,13 +105,14 @@ struct sql_response { >  * +----------------------------------------------+ >  * @param response EXECUTE response. >  * @param[out] keys number of keys in dumped map. >- * @param out Output buffer. >+ * @param stream stream to where result is written. >  * >  * @retval  0 Success. >  * @retval -1 Memory error. >  */ > int >-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out); >+sql_response_dump(struct sql_response *response, int *keys, >+     struct mpstream *stream); > /** >  * Parse the EXECUTE request. >diff --git a/src/box/iproto.cc b/src/box/iproto.cc >index 7c11d05..b110900 100644 >--- a/src/box/iproto.cc >+++ b/src/box/iproto.cc >@@ -61,6 +61,7 @@ > #include "rmean.h" > #include "execute.h" > #include "errinj.h" >+#include "mpstream.h" > enum { >  IPROTO_SALT_SIZE = 32, >@@ -1573,12 +1574,20 @@ error: >  tx_reply_error(msg); > } >+/** Callback to forward and error from mpstream methods. */ >+static void >+set_encode_error(void *error_ctx) >+{ >+ *(bool *)error_ctx = true; >+} >+ > static void > tx_process_sql(struct cmsg *m) > { >  struct iproto_msg *msg = tx_accept_msg(m); >  struct obuf *out; >  struct sql_response response; >+ bool is_error = false; >  tx_fiber_init(msg->connection->session, msg->header.sync); >@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m) >  /* Prepare memory for the iproto header. */ >  if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0) >    goto error; >- if (sql_response_dump(&response, &keys, out) != 0) { >+ struct mpstream stream; >+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, >+         set_encode_error, &is_error); >+ if (is_error) >+   goto error; >+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) { >    obuf_rollback_to_svp(out, &header_svp); >    goto error; >  } >+ mpstream_flush(&stream); >  iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys); >  iproto_wpos_create(&msg->wpos, out); >  return; >-- >2.7.4