* [PATCH v4 1/5] box: move port to src/
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
@ 2018-11-30 19:01 ` imeevma
2018-12-03 9:22 ` Vladimir Davydov
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
` (4 subsequent siblings)
5 siblings, 1 reply; 12+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, vdavydov.dev, kostja
Basic port structure does not depend on anything but
standard types. It just gives an interface and calls
virtual functions.
Its location in box/ was ok since it was not used
anywhere in src/. But next commits will add a new
method to mpstream so as to dump port. Mpstream is
implemented in src/, so lets move port over here.
Needed for #3505
---
src/CMakeLists.txt | 1 +
src/box/port.c | 30 -------------
src/box/port.h | 103 +------------------------------------------
src/port.c | 37 ++++++++++++++++
src/port.h | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 166 insertions(+), 132 deletions(-)
create mode 100644 src/port.c
create mode 100644 src/port.h
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b854265..e8554a8 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -108,6 +108,7 @@ set (core_sources
coll.c
coll_def.c
mpstream.c
+ port.c
)
if (TARGET_OS_NETBSD)
diff --git a/src/box/port.c b/src/box/port.c
index 853d24c..ef511ea 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -125,36 +125,6 @@ extern void
port_tuple_dump_lua(struct port *base, struct lua_State *L);
void
-port_destroy(struct port *port)
-{
- return port->vtab->destroy(port);
-}
-
-int
-port_dump_msgpack(struct port *port, struct obuf *out)
-{
- return port->vtab->dump_msgpack(port, out);
-}
-
-int
-port_dump_msgpack_16(struct port *port, struct obuf *out)
-{
- return port->vtab->dump_msgpack_16(port, out);
-}
-
-void
-port_dump_lua(struct port *port, struct lua_State *L)
-{
- port->vtab->dump_lua(port, L);
-}
-
-const char *
-port_dump_plain(struct port *port, uint32_t *size)
-{
- return port->vtab->dump_plain(port, size);
-}
-
-void
port_init(void)
{
mempool_create(&port_tuple_entry_pool, &cord()->slabc,
diff --git a/src/box/port.h b/src/box/port.h
index 751e44e..ad1b349 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -31,78 +31,13 @@
* SUCH DAMAGE.
*/
#include "trivia/util.h"
+#include <port.h>
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
struct tuple;
-struct obuf;
-struct lua_State;
-
-/**
- * A single port represents a destination of box_process output.
- * One such destination can be a Lua stack, or the binary
- * protocol.
- * An instance of a port is usually short lived, as it is created
- * for every server request. State of the instance is represented
- * by the tuples added to it. E.g.:
- *
- * struct port port;
- * port_tuple_create(&port);
- * for (tuple in tuples)
- * port_tuple_add(tuple);
- *
- * port_dump(&port, obuf);
- * port_destroy(&port);
- *
- * Beginning with Tarantool 1.5, tuple can have different internal
- * structure and port_tuple_add() requires a double
- * dispatch: first, by the type of the port the tuple is being
- * added to, second, by the type of the tuple format, since the
- * format defines the internal structure of the tuple.
- */
-
-struct port;
-
-struct port_vtab {
- /**
- * Dump the content of a port to an output buffer.
- * On success returns number of entries dumped.
- * On failure sets diag and returns -1.
- */
- int (*dump_msgpack)(struct port *port, struct obuf *out);
- /**
- * Same as dump_msgpack(), but use the legacy Tarantool
- * 1.6 format.
- */
- int (*dump_msgpack_16)(struct port *port, struct obuf *out);
- /** Dump the content of a port to Lua stack. */
- void (*dump_lua)(struct port *port, struct lua_State *L);
- /**
- * Dump a port content as a plain text into a buffer,
- * allocated inside.
- */
- const char *(*dump_plain)(struct port *port, uint32_t *size);
- /**
- * Destroy a port and release associated resources.
- */
- void (*destroy)(struct port *port);
-};
-
-/**
- * Abstract port instance. It is supposed to be converted to
- * a concrete port realization, e.g. port_tuple.
- */
-struct port {
- /** Virtual method table. */
- const struct port_vtab *vtab;
- /**
- * Implementation dependent content. Needed to declare
- * an abstract port instance on stack.
- */
- char pad[48];
-};
struct port_tuple_entry {
struct port_tuple_entry *next;
@@ -166,42 +101,6 @@ static_assert(sizeof(struct port_lua) <= sizeof(struct port),
void
port_lua_create(struct port *port, struct lua_State *L);
-/**
- * Destroy an abstract port instance.
- */
-void
-port_destroy(struct port *port);
-
-/**
- * Dump an abstract port instance to an output buffer.
- * Return number of entries dumped on success, -1 on error.
- */
-int
-port_dump_msgpack(struct port *port, struct obuf *out);
-
-/**
- * Same as port_dump(), but use the legacy Tarantool 1.6
- * format.
- */
-int
-port_dump_msgpack_16(struct port *port, struct obuf *out);
-
-/** Dump port content to Lua stack. */
-void
-port_dump_lua(struct port *port, struct lua_State *L);
-
-/**
- * Dump a port content as a plain text into a buffer,
- * allocated inside.
- * @param port Port with data to dump.
- * @param[out] size Length of a result plain text.
- *
- * @retval nil Error.
- * @retval not nil Plain text.
- */
-const char *
-port_dump_plain(struct port *port, uint32_t *size);
-
void
port_init(void);
diff --git a/src/port.c b/src/port.c
new file mode 100644
index 0000000..03694b4
--- /dev/null
+++ b/src/port.c
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "port.h"
+
+void
+port_destroy(struct port *port)
+{
+ port->vtab->destroy(port);
+}
diff --git a/src/port.h b/src/port.h
new file mode 100644
index 0000000..9266ae5
--- /dev/null
+++ b/src/port.h
@@ -0,0 +1,127 @@
+#ifndef INCLUDES_TARANTOOL_PORT_H
+#define INCLUDES_TARANTOOL_PORT_H
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <stdint.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct obuf;
+struct lua_State;
+struct port;
+
+/**
+ * A single port represents a destination of any output. One such
+ * destination can be a Lua stack, or the binary protocol. An
+ * instance of a port is usually short lived, as it is created
+ * per request. Used to virtualize functions which can return
+ * directly into Lua or into network.
+ */
+struct port_vtab {
+ /**
+ * Dump the content of a port to an output buffer.
+ * @param port Port to dump.
+ * @param out Buffer to dump to.
+ *
+ * @retval >= 0 Number of entries dumped.
+ * @retval < 0 Error.
+ */
+ int (*dump_msgpack)(struct port *port, struct obuf *out);
+ /**
+ * Same as dump_msgpack(), but do not add MsgPack array
+ * header. Used by the legacy Tarantool 1.6 format.
+ */
+ int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+ /** Dump the content of a port to Lua stack. */
+ void (*dump_lua)(struct port *port, struct lua_State *L);
+ /**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+ const char *(*dump_plain)(struct port *port, uint32_t *size);
+ /** Destroy a port and release associated resources. */
+ void (*destroy)(struct port *port);
+};
+
+/**
+ * Abstract port instance. It is supposed to be converted to
+ * a concrete port realization, e.g. port_tuple.
+ */
+struct port {
+ /** Virtual method table. */
+ const struct port_vtab *vtab;
+ /**
+ * Implementation dependent content. Needed to declare
+ * an abstract port instance on stack.
+ */
+ char pad[48];
+};
+
+/** Is not inlined just to be exported. */
+void
+port_destroy(struct port *port);
+
+static inline int
+port_dump_msgpack(struct port *port, struct obuf *out)
+{
+ return port->vtab->dump_msgpack(port, out);
+}
+
+static inline int
+port_dump_msgpack_16(struct port *port, struct obuf *out)
+{
+ return port->vtab->dump_msgpack_16(port, out);
+}
+
+static inline void
+port_dump_lua(struct port *port, struct lua_State *L)
+{
+ port->vtab->dump_lua(port, L);
+}
+
+static inline const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+ return port->vtab->dump_plain(port, size);
+}
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined __cplusplus */
+
+#endif /* INCLUDES_TARANTOOL_PORT_H */
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH v4 1/5] box: move port to src/
2018-11-30 19:01 ` [PATCH v4 1/5] box: move port to src/ imeevma
@ 2018-12-03 9:22 ` Vladimir Davydov
0 siblings, 0 replies; 12+ messages in thread
From: Vladimir Davydov @ 2018-12-03 9:22 UTC (permalink / raw)
To: imeevma; +Cc: v.shpilevoy, tarantool-patches, kostja
On Fri, Nov 30, 2018 at 10:01:09PM +0300, imeevma@tarantool.org wrote:
> Basic port structure does not depend on anything but
> standard types. It just gives an interface and calls
> virtual functions.
>
> Its location in box/ was ok since it was not used
> anywhere in src/. But next commits will add a new
> method to mpstream so as to dump port. Mpstream is
> implemented in src/, so lets move port over here.
>
> Needed for #3505
> ---
> src/CMakeLists.txt | 1 +
> src/box/port.c | 30 -------------
> src/box/port.h | 103 +------------------------------------------
> src/port.c | 37 ++++++++++++++++
> src/port.h | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++
> 5 files changed, 166 insertions(+), 132 deletions(-)
> create mode 100644 src/port.c
> create mode 100644 src/port.h
Pushed to 2.1.
^ permalink raw reply [flat|nested] 12+ messages in thread
* [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
2018-11-30 19:01 ` [PATCH v4 1/5] box: move port to src/ imeevma
@ 2018-11-30 19:01 ` imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream imeevma
` (3 subsequent siblings)
5 siblings, 0 replies; 12+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, kostja
This patch is the most dubious patch due to the implicit use of
mpstream as a stream for obuf. There is currently no approved
solution. Discussion and patch below.
On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>> @@ -627,81 +610,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>> }
>>>>>> int
>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>> + struct mpstream *stream)
>>>>>> {
>>>>>> sqlite3 *db = sql_get();
>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>> if (column_count > 0) {
>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>> err:
>>>>>> rc = -1;
>>>>>> goto finish;
>>>>>> }
>>>>>> *keys = 2;
>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>> - if (pos == NULL) {
>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>> - goto err;
>>>>>> - }
>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>> - /*
>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>> - * with Tarantool 1.6
>>>>>> - */
>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>> + mpstream_flush(stream);
>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>
>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>
>>>>> And when you introduce vstream later, you simply move this code to
>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>> used in mpstream to port_dump instead of obuf?
>>>>
>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>> and is used for box.select.
>>>>
>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>
>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>> in both cases we not just dump in a specific format, but to a concrete
>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>> by obuf destination.
>>>
>>> There's port_dump_plain, which dumps port contents in a specific format.
>>> So port_dump_obuf would look ambiguous.
>>>
>>>>
>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>> not hot.
>>>
>>> That would interconnect port and mpstream, make them dependent on each
>>> other. I don't think that would be good.
>>>
>>>>
>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>> box.select and iproto_call will use port_dump_obuf.
>>>>
>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>
>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>> because it's very limited in its applicability. People already prefer to
>>> use box.call over box.insert/select/etc over iproto, and with the
>>> appearance of box.execute they are likely to stop using plain box.select
>>> at all.
>>>
>>> That said, personally I would try to pass reserve/alloc methods to port,
>>> see how it goes.
>>>
>>
>> I do not see a reason to slow down box.select if we can don't do it.
>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>> functions including select.
>
> box.select called from Lua code doesn't use port_dump_msgpack.
>
>>
>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>> port_dump_obuf and add port_dump_msgpack which does not depend on
>> mpstream and takes alloc/reserve/ctx directly.
>
> Better call the optimized version (the one without callbacks)
> port_dump_msgpack_obuf to avoid confusion IMO.
>
> Anyway, I'd try to run cbench to see if it really perfomrs better
> than the one using callbacks.
commit a857e1129f9e38bde8c5c6037149887f695ec6dc
Author: Mergen Imeev <imeevma@gmail.com>
Date: Fri Nov 9 22:31:07 2018 +0300
iproto: replace obuf by mpstream in execute.c
It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.
Needed for #3505
diff --git a/src/box/execute.c b/src/box/execute.c
index 3a6cadf..0d266dd 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
#include "sql/sqliteLimit.h"
#include "errcode.h"
#include "small/region.h"
-#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
@@ -43,6 +42,7 @@
#include "port.h"
#include "tuple.h"
#include "sql/vdbe.h"
+#include "mpstream.h"
const char *sql_type_strs[] = {
NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
* @retval -1 Client or memory error.
*/
static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
int column_count)
{
assert(column_count > 0);
- int size = mp_sizeof_uint(IPROTO_METADATA) +
- mp_sizeof_array(column_count);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_uint(pos, IPROTO_METADATA);
- pos = mp_encode_array(pos, column_count);
+ mpstream_encode_uint(stream, IPROTO_METADATA);
+ mpstream_encode_array(stream, column_count);
for (int i = 0; i < column_count; ++i) {
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_FIELD_NAME) +
- mp_sizeof_uint(IPROTO_FIELD_TYPE);
const char *name = sqlite3_column_name(stmt, i);
const char *type = sqlite3_column_datatype(stmt, i);
/*
@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
* column_name simply returns them.
*/
assert(name != NULL);
- size += mp_sizeof_str(strlen(name));
- size += mp_sizeof_str(strlen(type));
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
- pos = mp_encode_str(pos, name, strlen(name));
- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
- pos = mp_encode_str(pos, type, strlen(type));
+ mpstream_encode_map(stream, 2);
+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+ mpstream_encode_str(stream, name);
+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+ mpstream_encode_str(stream, type);
}
return 0;
}
@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
}
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream)
{
sqlite3 *db = sql_get();
struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
int rc = 0, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
- if (sql_get_description(stmt, out, column_count) != 0) {
+ if (sql_get_description(stmt, stream, column_count) != 0) {
err:
rc = -1;
goto finish;
}
*keys = 2;
- int size = mp_sizeof_uint(IPROTO_DATA) +
- mp_sizeof_array(port_tuple->size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_DATA);
- pos = mp_encode_array(pos, port_tuple->size);
- /*
- * Just like SELECT, SQL uses output format compatible
- * with Tarantool 1.6
- */
- if (port_dump_msgpack_16(&response->port, out) < 0) {
+ mpstream_encode_uint(stream, IPROTO_DATA);
+ mpstream_flush(stream);
+ if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
/* Failed port dump destroyes the port. */
goto err;
}
+ mpstream_reset(stream);
} else {
*keys = 1;
- assert(port_tuple->size == 0);
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
- int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
- mp_sizeof_map(map_size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
- pos = mp_encode_map(pos, map_size);
+ mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+ mpstream_encode_map(stream, map_size);
uint64_t id_count = 0;
- int changes = db->nChange;
- size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
- mp_sizeof_uint(changes);
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
- stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- size += id_entry->id >= 0 ?
- mp_sizeof_uint(id_entry->id) :
- mp_sizeof_int(id_entry->id);
+ stailq_foreach_entry(id_entry, autoinc_id_list, link)
id_count++;
- }
- size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
- mp_sizeof_array(id_count);
- }
- char *buf = obuf_alloc(out, size);
- if (buf == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "buf");
- goto err;
}
- buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
- buf = mp_encode_uint(buf, changes);
+
+ mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+ mpstream_encode_uint(stream, db->nChange);
if (!stailq_empty(autoinc_id_list)) {
- buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
- buf = mp_encode_array(buf, id_count);
+ mpstream_encode_uint(stream,
+ SQL_INFO_AUTOINCREMENT_IDS);
+ mpstream_encode_array(stream, id_count);
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- buf = id_entry->id >= 0 ?
- mp_encode_uint(buf, id_entry->id) :
- mp_encode_int(buf, id_entry->id);
+ int64_t value = id_entry->id;
+ if (id_entry->id >= 0)
+ mpstream_encode_uint(stream, value);
+ else
+ mpstream_encode_int(stream, value);
}
}
}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
extern const char *sql_info_key_strs[];
-struct obuf;
struct region;
struct sql_bind;
struct xrow_header;
+struct mpstream;
/** EXECUTE request. */
struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
* +----------------------------------------------+
* @param response EXECUTE response.
* @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream);
/**
* Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
#include "rmean.h"
#include "execute.h"
#include "errinj.h"
+#include "mpstream.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
tx_reply_error(msg);
}
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+ *(bool *)error_ctx = true;
+}
+
static void
tx_process_sql(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct sql_response response;
+ bool is_error = false;
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
goto error;
- if (sql_response_dump(&response, &keys, out) != 0) {
+ struct mpstream stream;
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ set_encode_error, &is_error);
+ if (is_error)
+ goto error;
+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
obuf_rollback_to_svp(out, &header_svp);
goto error;
}
+ mpstream_flush(&stream);
iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
iproto_wpos_create(&msg->wpos, out);
return;
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
2018-11-30 19:01 ` [PATCH v4 1/5] box: move port to src/ imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
@ 2018-11-30 19:01 ` imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 4/5] lua: create vstream implementation for Lua imeevma
` (2 subsequent siblings)
5 siblings, 0 replies; 12+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, kostja
If we want to use functions from execute.h not only in IPROTO we
should create special interface. This interface will allow us to
create different implementations for mpstream and lua_State and
use functions from execute.c without changing them. This patch
creates such interface and its implementation for mpstream and
replaces mpstream functions in execute.c by methods of this
interface.
Needed for #3505
---
src/box/execute.c | 61 ++++++++++++--------
src/box/execute.h | 18 +++++-
src/box/iproto.cc | 11 ++--
src/mpstream.c | 57 ++++++++++++++++++
src/mpstream.h | 7 +++
src/vstream.h | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
6 files changed, 294 insertions(+), 30 deletions(-)
create mode 100644 src/vstream.h
diff --git a/src/box/execute.c b/src/box/execute.c
index 0d266dd..36b861f 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -42,7 +42,7 @@
#include "port.h"
#include "tuple.h"
#include "sql/vdbe.h"
-#include "mpstream.h"
+#include "vstream.h"
const char *sql_type_strs[] = {
NULL,
@@ -530,12 +530,12 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
* @retval -1 Client or memory error.
*/
static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
+sql_get_description(struct sqlite3_stmt *stmt, struct vstream *stream,
int column_count)
{
assert(column_count > 0);
- mpstream_encode_uint(stream, IPROTO_METADATA);
- mpstream_encode_array(stream, column_count);
+ vstream_encode_enum(stream, IPROTO_METADATA, "metadata");
+ vstream_encode_array(stream, column_count);
for (int i = 0; i < column_count; ++i) {
const char *name = sqlite3_column_name(stmt, i);
const char *type = sqlite3_column_datatype(stmt, i);
@@ -545,12 +545,19 @@ sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
* column_name simply returns them.
*/
assert(name != NULL);
- mpstream_encode_map(stream, 2);
- mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
- mpstream_encode_str(stream, name);
- mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
- mpstream_encode_str(stream, type);
+ vstream_encode_map(stream, 2);
+
+ vstream_encode_enum(stream, IPROTO_FIELD_NAME, "name");
+ vstream_encode_str(stream, name);
+ vstream_encode_map_commit(stream);
+
+ vstream_encode_enum(stream, IPROTO_FIELD_TYPE, "type");
+ vstream_encode_str(stream, type);
+ vstream_encode_map_commit(stream);
+
+ vstream_encode_array_commit(stream, i);
}
+ vstream_encode_map_commit(stream);
return 0;
}
@@ -609,7 +616,7 @@ sql_prepare_and_execute(const struct sql_request *request,
int
sql_response_dump(struct sql_response *response, int *keys,
- struct mpstream *stream)
+ struct vstream *stream)
{
sqlite3 *db = sql_get();
struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
@@ -621,42 +628,48 @@ err:
goto finish;
}
*keys = 2;
- mpstream_encode_uint(stream, IPROTO_DATA);
- mpstream_flush(stream);
- if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
+ vstream_encode_enum(stream, IPROTO_DATA, "rows");
+ if (vstream_encode_port(stream, &response->port) < 0) {
/* Failed port dump destroyes the port. */
goto err;
}
- mpstream_reset(stream);
+ vstream_encode_map_commit(stream);
} else {
*keys = 1;
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
- mpstream_encode_uint(stream, IPROTO_SQL_INFO);
- mpstream_encode_map(stream, map_size);
uint64_t id_count = 0;
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link)
id_count++;
}
-
- mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
- mpstream_encode_uint(stream, db->nChange);
+ if (!response->is_info_flattened) {
+ vstream_encode_enum(stream, IPROTO_SQL_INFO, "info");
+ vstream_encode_map(stream, map_size);
+ }
+ vstream_encode_enum(stream, SQL_INFO_ROW_COUNT, "rowcount");
+ vstream_encode_uint(stream, db->nChange);
+ vstream_encode_map_commit(stream);
if (!stailq_empty(autoinc_id_list)) {
- mpstream_encode_uint(stream,
- SQL_INFO_AUTOINCREMENT_IDS);
- mpstream_encode_array(stream, id_count);
+ vstream_encode_enum(stream, SQL_INFO_AUTOINCREMENT_IDS,
+ "autoincrement_ids");
+ vstream_encode_array(stream, id_count);
struct autoinc_id_entry *id_entry;
+ int i = 0;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
int64_t value = id_entry->id;
if (id_entry->id >= 0)
- mpstream_encode_uint(stream, value);
+ vstream_encode_uint(stream, value);
else
- mpstream_encode_int(stream, value);
+ vstream_encode_int(stream, value);
+ vstream_encode_array_commit(stream, i++);
}
+ vstream_encode_map_commit(stream);
}
+ if (!response->is_info_flattened)
+ vstream_encode_map_commit(stream);
}
finish:
port_destroy(&response->port);
diff --git a/src/box/execute.h b/src/box/execute.h
index 65ac81c..56b7339 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -51,7 +51,7 @@ extern const char *sql_info_key_strs[];
struct region;
struct sql_bind;
struct xrow_header;
-struct mpstream;
+struct vstream;
/** EXECUTE request. */
struct sql_request {
@@ -74,6 +74,20 @@ struct sql_response {
struct port port;
/** Prepared SQL statement with metadata. */
void *prep_stmt;
+ /**
+ * SQL response can be dumped into msgpack to be sent via
+ * iproto or onto Lua stack to be returned into an
+ * application. In the first case response body has
+ * explicit field IPROTO_SQL_INFO: {rowcount = ...,
+ * autoids = ...}. But in case of Lua this field is
+ * flattened. A result never has 'info' field, it has
+ * inlined 'rowcount' and 'autoids'. In iproto
+ * IPROTO_SQL_INFO field is sent mostly to explicitly
+ * distinguish two response types: DML/DDL vs DQL,
+ * IPROTO_SQL_INFO vs IPROTO_METADATA. So this flag is set
+ * by Lua and allows to flatten SQL_INFO fields.
+ */
+ bool is_info_flattened;
};
/**
@@ -112,7 +126,7 @@ struct sql_response {
*/
int
sql_response_dump(struct sql_response *response, int *keys,
- struct mpstream *stream);
+ struct vstream *stream);
/**
* Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b110900..6e284f7 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -62,6 +62,7 @@
#include "execute.h"
#include "errinj.h"
#include "mpstream.h"
+#include "vstream.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -1587,6 +1588,7 @@ tx_process_sql(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct sql_response response;
+ memset(&response, 0, sizeof(response));
bool is_error = false;
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1607,16 +1609,17 @@ tx_process_sql(struct cmsg *m)
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
goto error;
- struct mpstream stream;
- mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
- set_encode_error, &is_error);
+
+ struct vstream stream;
+ mpvstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ set_encode_error, &is_error);
if (is_error)
goto error;
if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
obuf_rollback_to_svp(out, &header_svp);
goto error;
}
- mpstream_flush(&stream);
+ mpstream_flush((struct mpstream *)&stream);
iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
iproto_wpos_create(&msg->wpos, out);
return;
diff --git a/src/mpstream.c b/src/mpstream.c
index e4f7950..6636da1 100644
--- a/src/mpstream.c
+++ b/src/mpstream.c
@@ -33,6 +33,8 @@
#include <assert.h>
#include <stdint.h>
#include "msgpuck.h"
+#include "vstream.h"
+#include "port.h"
void
mpstream_reserve_slow(struct mpstream *stream, size_t size)
@@ -175,3 +177,58 @@ mpstream_encode_bool(struct mpstream *stream, bool val)
char *pos = mp_encode_bool(data, val);
mpstream_advance(stream, pos - data);
}
+
+static int
+mp_vstream_encode_port(struct vstream *stream, struct port *port)
+{
+ struct mpstream *mpstream = (struct mpstream *)stream;
+ mpstream_flush(mpstream);
+ if (port_dump_msgpack(port, mpstream->ctx) < 0) {
+ /* Failed port dump destroyes the port. */
+ return -1;
+ }
+ mpstream_reset(mpstream);
+ return 0;
+}
+
+static void
+mp_vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+ (void)str;
+ if (num < 0)
+ mpstream_encode_int((struct mpstream *)stream, num);
+ else
+ mpstream_encode_uint((struct mpstream *)stream, num);
+}
+
+static void
+mp_vstream_noop(struct vstream *stream, ...)
+{
+ (void) stream;
+}
+
+static const struct vstream_vtab mp_vstream_vtab = {
+ /** encode_array = */ (encode_array_f)mpstream_encode_array,
+ /** encode_map = */ (encode_map_f)mpstream_encode_map,
+ /** encode_uint = */ (encode_uint_f)mpstream_encode_uint,
+ /** encode_int = */ (encode_int_f)mpstream_encode_int,
+ /** encode_float = */ (encode_float_f)mpstream_encode_float,
+ /** encode_double = */ (encode_double_f)mpstream_encode_double,
+ /** encode_strn = */ (encode_strn_f)mpstream_encode_strn,
+ /** encode_nil = */ (encode_nil_f)mpstream_encode_nil,
+ /** encode_bool = */ (encode_bool_f)mpstream_encode_bool,
+ /** encode_enum = */ mp_vstream_encode_enum,
+ /** encode_port = */ mp_vstream_encode_port,
+ /** encode_array_commit = */ (encode_array_commit_f)mp_vstream_noop,
+ /** encode_map_commit = */ (encode_map_commit_f)mp_vstream_noop,
+};
+
+void
+mpvstream_init(struct vstream *stream, void *ctx, mpstream_reserve_f reserve,
+ mpstream_alloc_f alloc, mpstream_error_f error, void *error_ctx)
+{
+ stream->vtab = &mp_vstream_vtab;
+ assert(sizeof(stream->inheritance_padding) >= sizeof(struct mpstream));
+ mpstream_init((struct mpstream *) stream, ctx, reserve, alloc, error,
+ error_ctx);
+}
diff --git a/src/mpstream.h b/src/mpstream.h
index e22d052..ce0e25d 100644
--- a/src/mpstream.h
+++ b/src/mpstream.h
@@ -78,6 +78,13 @@ mpstream_init(struct mpstream *stream, void *ctx,
mpstream_reserve_f reserve, mpstream_alloc_f alloc,
mpstream_error_f error, void *error_ctx);
+struct vstream;
+
+/** Initialize a vstream object as an instance of mpstream. */
+void
+mpvstream_init(struct vstream *stream, void *ctx, mpstream_reserve_f reserve,
+ mpstream_alloc_f alloc, mpstream_error_f error, void *error_ctx);
+
static inline void
mpstream_flush(struct mpstream *stream)
{
diff --git a/src/vstream.h b/src/vstream.h
new file mode 100644
index 0000000..fe7c49a
--- /dev/null
+++ b/src/vstream.h
@@ -0,0 +1,170 @@
+#ifndef TARANTOOL_VSTREAM_H_INCLUDED
+#define TARANTOOL_VSTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct vstream;
+struct port;
+
+typedef void (*encode_array_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_map_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_uint_f)(struct vstream *stream, uint64_t num);
+typedef void (*encode_int_f)(struct vstream *stream, int64_t num);
+typedef void (*encode_float_f)(struct vstream *stream, float num);
+typedef void (*encode_double_f)(struct vstream *stream, double num);
+typedef void (*encode_strn_f)(struct vstream *stream, const char *str,
+ uint32_t len);
+typedef void (*encode_nil_f)(struct vstream *stream);
+typedef void (*encode_bool_f)(struct vstream *stream, bool val);
+typedef void (*encode_enum_f)(struct vstream *stream, int64_t num,
+ const char *str);
+typedef int (*encode_port_f)(struct vstream *stream, struct port *port);
+typedef void (*encode_array_commit_f)(struct vstream *stream, uint32_t id);
+typedef void (*encode_map_commit_f)(struct vstream *stream);
+
+struct vstream_vtab {
+ encode_array_f encode_array;
+ encode_map_f encode_map;
+ encode_uint_f encode_uint;
+ encode_int_f encode_int;
+ encode_float_f encode_float;
+ encode_double_f encode_double;
+ encode_strn_f encode_strn;
+ encode_nil_f encode_nil;
+ encode_bool_f encode_bool;
+ encode_enum_f encode_enum;
+ encode_port_f encode_port;
+ encode_array_commit_f encode_array_commit;
+ encode_map_commit_f encode_map_commit;
+};
+
+struct vstream {
+ /** Here struct mpstream lives under the hood. */
+ char inheritance_padding[64];
+ /** Virtual function table. */
+ const struct vstream_vtab *vtab;
+};
+
+void
+mp_vstream_init_vtab(struct vstream *vstream);
+
+static inline void
+vstream_encode_array(struct vstream *stream, uint32_t size)
+{
+ return stream->vtab->encode_array(stream, size);
+}
+
+static inline void
+vstream_encode_map(struct vstream *stream, uint32_t size)
+{
+ return stream->vtab->encode_map(stream, size);
+}
+
+static inline void
+vstream_encode_uint(struct vstream *stream, uint64_t num)
+{
+ return stream->vtab->encode_uint(stream, num);
+}
+
+static inline void
+vstream_encode_int(struct vstream *stream, int64_t num)
+{
+ return stream->vtab->encode_int(stream, num);
+}
+
+static inline void
+vstream_encode_float(struct vstream *stream, float num)
+{
+ return stream->vtab->encode_float(stream, num);
+}
+
+static inline void
+vstream_encode_double(struct vstream *stream, double num)
+{
+ return stream->vtab->encode_double(stream, num);
+}
+
+static inline void
+vstream_encode_strn(struct vstream *stream, const char *str, uint32_t len)
+{
+ return stream->vtab->encode_strn(stream, str, len);
+}
+
+static inline void
+vstream_encode_str(struct vstream *stream, const char *str)
+{
+ return stream->vtab->encode_strn(stream, str, strlen(str));
+}
+
+static inline void
+vstream_encode_nil(struct vstream *stream)
+{
+ return stream->vtab->encode_nil(stream);
+}
+
+static inline void
+vstream_encode_bool(struct vstream *stream, bool val)
+{
+ return stream->vtab->encode_bool(stream, val);
+}
+
+static inline void
+vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+ return stream->vtab->encode_enum(stream, num, str);
+}
+
+static inline int
+vstream_encode_port(struct vstream *stream, struct port *port)
+{
+ return stream->vtab->encode_port(stream, port);
+}
+
+static inline void
+vstream_encode_map_commit(struct vstream *stream)
+{
+ return stream->vtab->encode_map_commit(stream);
+}
+
+static inline void
+vstream_encode_array_commit(struct vstream *stream, uint32_t id)
+{
+ return stream->vtab->encode_array_commit(stream, id);
+}
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_VSTREAM_H_INCLUDED */
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* [tarantool-patches] [PATCH v4 4/5] lua: create vstream implementation for Lua
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
` (2 preceding siblings ...)
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream imeevma
@ 2018-11-30 19:01 ` imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 5/5] sql: check new box.sql.execute() imeevma
2018-12-02 11:03 ` [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
5 siblings, 0 replies; 12+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, kostja
Thas patch creates vstream implementation for Lua and function
box.sql.new_execute() that uses this implementation. Also it
creates parameters binding for SQL statements executed through
box.
Part of #3505
Closes #3401
---
src/CMakeLists.txt | 1 +
src/box/execute.c | 191 ++++++++++++++++++++++++++++++++++++++++++++++++++++
src/box/execute.h | 15 +++++
src/box/lua/sql.c | 36 ++++++++++
src/box/lua/sql.h | 4 ++
src/lua/luastream.c | 148 ++++++++++++++++++++++++++++++++++++++++
src/lua/luastream.h | 47 +++++++++++++
src/vstream.h | 8 ++-
8 files changed, 449 insertions(+), 1 deletion(-)
create mode 100644 src/lua/luastream.c
create mode 100644 src/lua/luastream.h
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e8554a8..a0a054a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -182,6 +182,7 @@ set (server_sources
lua/crypto.c
lua/httpc.c
lua/utf8.c
+ lua/luastream.c
${lua_sources}
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/lyaml.cc
${PROJECT_SOURCE_DIR}/third_party/lua-yaml/b64.c
diff --git a/src/box/execute.c b/src/box/execute.c
index 36b861f..1384c96 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -43,6 +43,8 @@
#include "tuple.h"
#include "sql/vdbe.h"
#include "vstream.h"
+#include "lua/utils.h"
+#include "lua/msgpack.h"
const char *sql_type_strs[] = {
NULL,
@@ -299,6 +301,195 @@ error:
}
/**
+ * Decode a single bind column from Lua stack.
+ *
+ * @param L Lua stack.
+ * @param[out] bind Bind to decode to.
+ * @param idx Position of table with bind columns on Lua stack.
+ * @param i Ordinal bind number.
+ *
+ * @retval 0 Success.
+ * @retval -1 Memory or client error.
+ */
+static inline int
+lua_sql_bind_decode(struct lua_State *L, struct sql_bind *bind, int idx, int i)
+{
+ struct luaL_field field;
+ char *buf;
+ lua_rawgeti(L, idx, i + 1);
+ luaL_tofield(L, luaL_msgpack_default, -1, &field);
+ bind->pos = i + 1;
+ if (field.type == MP_MAP) {
+ /*
+ * A named parameter is an MP_MAP with
+ * one key - {'name': value}.
+ * Report parse error otherwise.
+ */
+ if (field.size != 1) {
+ diag_set(ClientError, ER_ILLEGAL_PARAMS, "SQL bind "\
+ "parameter should be {'name': value}");
+ return -1;
+ }
+ /*
+ * Get key and value of the only map element to
+ * lua stack.
+ */
+ lua_pushnil(L);
+ lua_next(L, lua_gettop(L) - 1);
+ /* At first we should deal with the value. */
+ luaL_tofield(L, luaL_msgpack_default, -1, &field);
+ lua_pop(L, 1);
+ /* Now key is on the top of Lua stack. */
+ size_t name_len = 0;
+ bind->name = luaL_checklstring(L, -1, &name_len);
+ if (bind->name == NULL) {
+ diag_set(ClientError, ER_ILLEGAL_PARAMS, "SQL bind "\
+ "parameter should be {'name': value}");
+ return -1;
+ }
+ /*
+ * Name should be saved in allocated memory as it
+ * will be poped from Lua stack.
+ */
+ buf = region_alloc(&fiber()->gc, name_len + 1);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, name_len + 1, "region_alloc",
+ "buf");
+ return -1;
+ }
+ memcpy(buf, bind->name, name_len + 1);
+ bind->name = buf;
+ bind->name_len = name_len;
+ lua_pop(L, 1);
+ } else {
+ bind->name = NULL;
+ bind->name_len = 0;
+ }
+ switch (field.type) {
+ case MP_UINT: {
+ bind->i64 = field.ival;
+ bind->type = SQLITE_INTEGER;
+ bind->bytes = sizeof(bind->i64);
+ break;
+ }
+ case MP_INT:
+ bind->i64 = field.ival;
+ bind->type = SQLITE_INTEGER;
+ bind->bytes = sizeof(bind->i64);
+ break;
+ case MP_STR:
+ /*
+ * Data should be saved in allocated memory as it
+ * will be poped from Lua stack.
+ */
+ buf = region_alloc(&fiber()->gc, field.sval.len + 1);
+ if (buf == NULL) {
+ diag_set(OutOfMemory, field.sval.len + 1,
+ "region_alloc", "buf");
+ return -1;
+ }
+ memcpy(buf, field.sval.data, field.sval.len + 1);
+ bind->s = buf;
+ bind->type = SQLITE_TEXT;
+ bind->bytes = field.sval.len;
+ break;
+ case MP_DOUBLE:
+ bind->d = field.dval;
+ bind->type = SQLITE_FLOAT;
+ bind->bytes = sizeof(bind->d);
+ break;
+ case MP_FLOAT:
+ bind->d = field.dval;
+ bind->type = SQLITE_FLOAT;
+ bind->bytes = sizeof(bind->d);
+ break;
+ case MP_NIL:
+ bind->type = SQLITE_NULL;
+ bind->bytes = 1;
+ break;
+ case MP_BOOL:
+ /* SQLite doesn't support boolean. Use int instead. */
+ bind->i64 = field.bval ? 1 : 0;
+ bind->type = SQLITE_INTEGER;
+ bind->bytes = sizeof(bind->i64);
+ break;
+ case MP_BIN:
+ bind->s = mp_decode_bin(&field.sval.data, &bind->bytes);
+ bind->type = SQLITE_BLOB;
+ break;
+ case MP_EXT:
+ /*
+ * Data should be saved in allocated memory as it
+ * will be poped from Lua stack.
+ */
+ buf = region_alloc(&fiber()->gc, sizeof(field));
+ if (buf == NULL) {
+ diag_set(OutOfMemory, sizeof(field), "region_alloc",
+ "buf");
+ return -1;
+ }
+ memcpy(buf, &field, sizeof(field));
+ bind->s = buf;
+ bind->bytes = sizeof(field);
+ bind->type = SQLITE_BLOB;
+ break;
+ case MP_ARRAY:
+ diag_set(ClientError, ER_SQL_BIND_TYPE, "ARRAY",
+ sql_bind_name(bind));
+ return -1;
+ case MP_MAP:
+ diag_set(ClientError, ER_SQL_BIND_TYPE, "MAP",
+ sql_bind_name(bind));
+ return -1;
+ default:
+ unreachable();
+ }
+ lua_pop(L, 1);
+ return 0;
+}
+
+int
+lua_sql_bind_list_decode(struct lua_State *L, struct sql_request *request,
+ int idx)
+{
+ assert(request != NULL);
+ if (! lua_istable(L, idx)) {
+ diag_set(ClientError, ER_INVALID_MSGPACK, "SQL parameter list");
+ return -1;
+ }
+ uint32_t bind_count = lua_objlen(L, idx);
+ if (bind_count == 0)
+ return 0;
+ if (bind_count > SQL_BIND_PARAMETER_MAX) {
+ diag_set(ClientError, ER_SQL_BIND_PARAMETER_MAX,
+ (int) bind_count);
+ return -1;
+ }
+ struct region *region = &fiber()->gc;
+ uint32_t used = region_used(region);
+ size_t size = sizeof(struct sql_bind) * bind_count;
+ /*
+ * Memory allocated here will be freed in
+ * sqlite3_finalize() or in txn_commit()/txn_rollback() if
+ * there is an active transaction.
+ */
+ struct sql_bind *bind = (struct sql_bind *) region_alloc(region, size);
+ if (bind == NULL) {
+ diag_set(OutOfMemory, size, "region_alloc", "bind");
+ return -1;
+ }
+ for (uint32_t i = 0; i < bind_count; ++i) {
+ if (lua_sql_bind_decode(L, &bind[i], idx, i) != 0) {
+ region_truncate(region, used);
+ return -1;
+ }
+ }
+ request->bind_count = bind_count;
+ request->bind = bind;
+ return 0;
+}
+
+/**
* Serialize a single column of a result set row.
* @param stmt Prepared and started statement. At least one
* sqlite3_step must be called.
diff --git a/src/box/execute.h b/src/box/execute.h
index 56b7339..276fa0e 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -52,6 +52,7 @@ struct region;
struct sql_bind;
struct xrow_header;
struct vstream;
+struct lua_State;
/** EXECUTE request. */
struct sql_request {
@@ -142,6 +143,20 @@ xrow_decode_sql(const struct xrow_header *row, struct sql_request *request,
struct region *region);
/**
+ * Parse Lua table of SQL parameters and store a result
+ * into the @request->bind, bind_count.
+ * @param L Lua stack to get data from.
+ * @param request Request to save decoded parameters.
+ * @param idx Position of table with parameters on Lua stack.
+ *
+ * @retval 0 Success.
+ * @retval -1 Client or memory error.
+ */
+int
+lua_sql_bind_list_decode(struct lua_State *L, struct sql_request *request,
+ int idx);
+
+/**
* Prepare and execute an SQL statement.
* @param request IProto request.
* @param[out] response Response to store result.
diff --git a/src/box/lua/sql.c b/src/box/lua/sql.c
index 17e2694..9f616e0 100644
--- a/src/box/lua/sql.c
+++ b/src/box/lua/sql.c
@@ -5,7 +5,10 @@
#include "box/sql/sqliteInt.h"
#include "box/info.h"
#include "lua/utils.h"
+#include "lua/luastream.h"
#include "info.h"
+#include "box/execute.h"
+#include "vstream.h"
static void
lua_push_column_names(struct lua_State *L, struct sqlite3_stmt *stmt)
@@ -111,6 +114,38 @@ sqlerror:
}
static int
+lbox_execute(struct lua_State *L)
+{
+ struct sqlite3 *db = sql_get();
+ if (db == NULL)
+ return luaL_error(L, "not ready");
+
+ size_t length;
+ const char *sql = lua_tolstring(L, 1, &length);
+ if (sql == NULL)
+ return luaL_error(L, "usage: box.execute(sqlstring)");
+
+ struct sql_request request = {};
+ request.sql_text = sql;
+ request.sql_text_len = length;
+ if (lua_gettop(L) == 2 && lua_sql_bind_list_decode(L, &request, 2) != 0)
+ return luaT_error(L);
+ struct sql_response response = {.is_info_flattened = true};
+ if (sql_prepare_and_execute(&request, &response, &fiber()->gc) != 0)
+ return luaT_error(L);
+
+ int keys;
+ struct vstream vstream;
+ luavstream_init(&vstream, L);
+ lua_newtable(L);
+ if (sql_response_dump(&response, &keys, &vstream) != 0) {
+ lua_pop(L, 1);
+ return luaT_error(L);
+ }
+ return 1;
+}
+
+static int
lua_sql_debug(struct lua_State *L)
{
struct info_handler info;
@@ -124,6 +159,7 @@ box_lua_sqlite_init(struct lua_State *L)
{
static const struct luaL_Reg module_funcs [] = {
{"execute", lua_sql_execute},
+ {"new_execute", lbox_execute},
{"debug", lua_sql_debug},
{NULL, NULL}
};
diff --git a/src/box/lua/sql.h b/src/box/lua/sql.h
index 65ff6d5..a83a5b8 100644
--- a/src/box/lua/sql.h
+++ b/src/box/lua/sql.h
@@ -36,9 +36,13 @@ extern "C" {
#endif
struct lua_State;
+struct luastream;
void box_lua_sqlite_init(struct lua_State *L);
+void
+luastream_init(struct luastream *stream, struct lua_State *L);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/lua/luastream.c b/src/lua/luastream.c
new file mode 100644
index 0000000..dfe0493
--- /dev/null
+++ b/src/lua/luastream.c
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "luastream.h"
+#include "lua/utils.h"
+#include "vstream.h"
+#include "port.h"
+
+void
+luastream_init(struct luastream *stream, struct lua_State *L)
+{
+ stream->L = L;
+}
+
+static void
+luastream_encode_array(struct luastream *stream, uint32_t size)
+{
+ lua_createtable(stream->L, size, 0);
+}
+
+static void
+luastream_encode_map(struct luastream *stream, uint32_t size)
+{
+ lua_createtable(stream->L, size, 0);
+}
+
+static void
+luastream_encode_uint(struct luastream *stream, uint64_t num)
+{
+ luaL_pushuint64(stream->L, num);
+}
+
+static void
+luastream_encode_int(struct luastream *stream, int64_t num)
+{
+ luaL_pushint64(stream->L, num);
+}
+
+static void
+luastream_encode_float(struct luastream *stream, float num)
+{
+ lua_pushnumber(stream->L, num);
+}
+
+static void
+luastream_encode_double(struct luastream *stream, double num)
+{
+ lua_pushnumber(stream->L, num);
+}
+
+static void
+luastream_encode_strn(struct luastream *stream, const char *str, uint32_t len)
+{
+ lua_pushlstring(stream->L, str, len);
+}
+
+static void
+luastream_encode_nil(struct luastream *stream)
+{
+ lua_pushnil(stream->L);
+}
+
+static void
+luastream_encode_bool(struct luastream *stream, bool val)
+{
+ lua_pushboolean(stream->L, val);
+}
+
+static int
+lua_vstream_encode_port(struct vstream *stream, struct port *port)
+{
+ port_dump_lua(port, ((struct luastream *)stream)->L);
+ return 0;
+}
+
+static void
+lua_vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+ (void)num;
+ lua_pushlstring(((struct luastream *)stream)->L, str, strlen(str));
+}
+
+static void
+lua_vstream_encode_map_commit(struct vstream *stream)
+{
+ size_t length;
+ const char *key = lua_tolstring(((struct luastream *)stream)->L, -2,
+ &length);
+ lua_setfield(((struct luastream *)stream)->L, -3, key);
+ lua_pop(((struct luastream *)stream)->L, 1);
+}
+
+static void
+lua_vstream_encode_array_commit(struct vstream *stream, uint32_t id)
+{
+ lua_rawseti(((struct luastream *)stream)->L, -2, id + 1);
+}
+
+static const struct vstream_vtab lua_vstream_vtab = {
+ /** encode_array = */ (encode_array_f)luastream_encode_array,
+ /** encode_map = */ (encode_map_f)luastream_encode_map,
+ /** encode_uint = */ (encode_uint_f)luastream_encode_uint,
+ /** encode_int = */ (encode_int_f)luastream_encode_int,
+ /** encode_float = */ (encode_float_f)luastream_encode_float,
+ /** encode_double = */ (encode_double_f)luastream_encode_double,
+ /** encode_strn = */ (encode_strn_f)luastream_encode_strn,
+ /** encode_nil = */ (encode_nil_f)luastream_encode_nil,
+ /** encode_bool = */ (encode_bool_f)luastream_encode_bool,
+ /** encode_enum = */ lua_vstream_encode_enum,
+ /** encode_port = */ lua_vstream_encode_port,
+ /** encode_array_commit = */ lua_vstream_encode_array_commit,
+ /** encode_map_commit = */ lua_vstream_encode_map_commit,
+};
+
+void
+luavstream_init(struct vstream *stream, struct lua_State *L)
+{
+ stream->vtab = &lua_vstream_vtab;
+ assert(sizeof(stream->inheritance_padding) >= sizeof(struct luastream));
+ luastream_init((struct luastream *) stream, L);
+}
diff --git a/src/lua/luastream.h b/src/lua/luastream.h
new file mode 100644
index 0000000..52a11e4
--- /dev/null
+++ b/src/lua/luastream.h
@@ -0,0 +1,47 @@
+#ifndef TARANTOOL_LUA_LUASTREAM_H_INCLUDED
+#define TARANTOOL_LUA_LUASTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the
+ * following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+struct lua_State;
+struct vstream;
+
+struct luastream {
+ struct lua_State *L;
+};
+
+void
+luastream_init(struct luastream *stream, struct lua_State *L);
+
+void
+luavstream_init(struct vstream *stream, struct lua_State *L);
+
+#endif /* TARANTOOL_LUA_LUASTREAM_H_INCLUDED */
diff --git a/src/vstream.h b/src/vstream.h
index fe7c49a..9515fad 100644
--- a/src/vstream.h
+++ b/src/vstream.h
@@ -70,7 +70,10 @@ struct vstream_vtab {
};
struct vstream {
- /** Here struct mpstream lives under the hood. */
+ /**
+ * Here struct mpstream or struct luastream lives under
+ * the hood.
+ */
char inheritance_padding[64];
/** Virtual function table. */
const struct vstream_vtab *vtab;
@@ -79,6 +82,9 @@ struct vstream {
void
mp_vstream_init_vtab(struct vstream *vstream);
+void
+lua_vstream_init_vtab(struct vstream *vstream);
+
static inline void
vstream_encode_array(struct vstream *stream, uint32_t size)
{
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* [tarantool-patches] [PATCH v4 5/5] sql: check new box.sql.execute()
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
` (3 preceding siblings ...)
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 4/5] lua: create vstream implementation for Lua imeevma
@ 2018-11-30 19:01 ` imeevma
2018-12-02 11:03 ` [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
5 siblings, 0 replies; 12+ messages in thread
From: imeevma @ 2018-11-30 19:01 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, kostja
This commit checks that new implementation of box.sql.execute() is
able to pass all tests. This is temporary commit and should be
dropped later. Even though this commit is temporary it shows that
this patch-set should be pushed after patch for issue #3832.
Needed for #3505
---
src/box/execute.c | 13 +++---
src/box/execute.h | 3 +-
src/box/iproto.cc | 3 +-
src/box/lua/schema.lua | 23 ++++++++++
src/box/lua/sql.c | 117 ++++---------------------------------------------
5 files changed, 44 insertions(+), 115 deletions(-)
diff --git a/src/box/execute.c b/src/box/execute.c
index 1384c96..769ab5a 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -736,6 +736,8 @@ sql_get_description(struct sqlite3_stmt *stmt, struct vstream *stream,
* column_name simply returns them.
*/
assert(name != NULL);
+ if (type == NULL)
+ type = "UNKNOWN";
vstream_encode_map(stream, 2);
vstream_encode_enum(stream, IPROTO_FIELD_NAME, "name");
@@ -754,7 +756,7 @@ sql_get_description(struct sqlite3_stmt *stmt, struct vstream *stream,
static inline int
sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, struct port *port,
- struct region *region)
+ struct region *region, int error_id)
{
int rc, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
@@ -771,7 +773,7 @@ sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, struct port *port,
assert(rc != SQLITE_ROW && rc != SQLITE_OK);
}
if (rc != SQLITE_DONE) {
- diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db));
+ diag_set(ClientError, error_id, sqlite3_errmsg(db));
return -1;
}
return 0;
@@ -779,7 +781,8 @@ sql_execute(sqlite3 *db, struct sqlite3_stmt *stmt, struct port *port,
int
sql_prepare_and_execute(const struct sql_request *request,
- struct sql_response *response, struct region *region)
+ struct sql_response *response, struct region *region,
+ int error_id)
{
const char *sql = request->sql_text;
uint32_t len = request->sql_text_len;
@@ -790,7 +793,7 @@ sql_prepare_and_execute(const struct sql_request *request,
return -1;
}
if (sqlite3_prepare_v2(db, sql, len, &stmt, NULL) != SQLITE_OK) {
- diag_set(ClientError, ER_SQL_EXECUTE, sqlite3_errmsg(db));
+ diag_set(ClientError, error_id, sqlite3_errmsg(db));
return -1;
}
assert(stmt != NULL);
@@ -798,7 +801,7 @@ sql_prepare_and_execute(const struct sql_request *request,
response->prep_stmt = stmt;
response->sync = request->sync;
if (sql_bind(request, stmt) == 0 &&
- sql_execute(db, stmt, &response->port, region) == 0)
+ sql_execute(db, stmt, &response->port, region, error_id) == 0)
return 0;
port_destroy(&response->port);
sqlite3_finalize(stmt);
diff --git a/src/box/execute.h b/src/box/execute.h
index 276fa0e..f304e8f 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -168,7 +168,8 @@ lua_sql_bind_list_decode(struct lua_State *L, struct sql_request *request,
*/
int
sql_prepare_and_execute(const struct sql_request *request,
- struct sql_response *response, struct region *region);
+ struct sql_response *response, struct region *region,
+ int error_id);
#if defined(__cplusplus)
} /* extern "C" { */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 6e284f7..03ea220 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1597,7 +1597,8 @@ tx_process_sql(struct cmsg *m)
goto error;
assert(msg->header.type == IPROTO_EXECUTE);
tx_inject_delay();
- if (sql_prepare_and_execute(&msg->sql, &response, &fiber()->gc) != 0)
+ if (sql_prepare_and_execute(&msg->sql, &response, &fiber()->gc,
+ ER_SQL_EXECUTE) != 0)
goto error;
/*
* Take an obuf only after execute(). Else the buffer can
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 8a804f0..02ec2fd 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -2456,3 +2456,26 @@ box.feedback.save = function(file_name)
end
box.NULL = msgpack.NULL
+
+box.sql.execute = function(sql)
+ local result = box.sql.new_execute(sql)
+ if result == nil then return end
+ local ret = nil
+ if result.rows ~= nil then
+ ret = {}
+ for key, row in pairs(result.rows) do
+ if type(row) == 'cdata' then
+ table.insert(ret, row:totable())
+ end
+ end
+ end
+ if result.metadata ~= nil then
+ if ret == nil then ret = {} end
+ ret[0] = {}
+ for key, row in pairs(result.metadata) do
+ table.insert(ret[0], row['name'])
+ end
+ setmetatable(ret, {__serialize = 'sequence'})
+ end
+ if ret ~= nil then return ret end
+end
diff --git a/src/box/lua/sql.c b/src/box/lua/sql.c
index 9f616e0..f524c5a 100644
--- a/src/box/lua/sql.c
+++ b/src/box/lua/sql.c
@@ -10,109 +10,6 @@
#include "box/execute.h"
#include "vstream.h"
-static void
-lua_push_column_names(struct lua_State *L, struct sqlite3_stmt *stmt)
-{
- int column_count = sqlite3_column_count(stmt);
- lua_createtable(L, column_count, 0);
- for (int i = 0; i < column_count; i++) {
- const char *name = sqlite3_column_name(stmt, i);
- lua_pushstring(L, name == NULL ? "" : name);
- lua_rawseti(L, -2, i+1);
- }
-}
-
-static void
-lua_push_row(struct lua_State *L, struct sqlite3_stmt *stmt)
-{
- int column_count = sqlite3_column_count(stmt);
-
- lua_createtable(L, column_count, 0);
- lua_rawgeti(L, LUA_REGISTRYINDEX, luaL_array_metatable_ref);
- lua_setmetatable(L, -2);
-
- for (int i = 0; i < column_count; i++) {
- int type = sqlite3_column_type(stmt, i);
- switch (type) {
- case SQLITE_INTEGER:
- luaL_pushint64(L, sqlite3_column_int64(stmt, i));
- break;
- case SQLITE_FLOAT:
- lua_pushnumber(L, sqlite3_column_double(stmt, i));
- break;
- case SQLITE_TEXT: {
- const void *text = sqlite3_column_text(stmt, i);
- lua_pushlstring(L, text,
- sqlite3_column_bytes(stmt, i));
- break;
- }
- case SQLITE_BLOB: {
- const void *blob = sqlite3_column_blob(stmt, i);
- if (sql_column_subtype(stmt,i) == SQL_SUBTYPE_MSGPACK) {
- luamp_decode(L, luaL_msgpack_default,
- (const char **)&blob);
- } else {
- lua_pushlstring(L, blob,
- sqlite3_column_bytes(stmt, i));
- }
- break;
- }
- case SQLITE_NULL:
- lua_rawgeti(L, LUA_REGISTRYINDEX, luaL_nil_ref);
- break;
- default:
- assert(0);
- }
- lua_rawseti(L, -2, i+1);
- }
-}
-
-static int
-lua_sql_execute(struct lua_State *L)
-{
- sqlite3 *db = sql_get();
- if (db == NULL)
- return luaL_error(L, "not ready");
-
- size_t length;
- const char *sql = lua_tolstring(L, 1, &length);
- if (sql == NULL)
- return luaL_error(L, "usage: box.sql.execute(sqlstring)");
-
- struct sqlite3_stmt *stmt;
- if (sqlite3_prepare_v2(db, sql, length, &stmt, &sql) != SQLITE_OK)
- goto sqlerror;
- assert(stmt != NULL);
-
- int rc;
- int retval_count;
- if (sqlite3_column_count(stmt) == 0) {
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW);
- retval_count = 0;
- } else {
- lua_newtable(L);
- lua_pushvalue(L, lua_upvalueindex(1));
- lua_setmetatable(L, -2);
- lua_push_column_names(L, stmt);
- lua_rawseti(L, -2, 0);
-
- int row_count = 0;
- while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
- lua_push_row(L, stmt);
- lua_rawseti(L, -2, ++row_count);
- }
- retval_count = 1;
- }
- if (rc != SQLITE_OK && rc != SQLITE_DONE)
- goto sqlerror;
- sqlite3_finalize(stmt);
- return retval_count;
-sqlerror:
- lua_pushstring(L, sqlite3_errmsg(db));
- sqlite3_finalize(stmt);
- return lua_error(L);
-}
-
static int
lbox_execute(struct lua_State *L)
{
@@ -129,10 +26,12 @@ lbox_execute(struct lua_State *L)
request.sql_text = sql;
request.sql_text_len = length;
if (lua_gettop(L) == 2 && lua_sql_bind_list_decode(L, &request, 2) != 0)
- return luaT_error(L);
+ goto sqlerror;
+
struct sql_response response = {.is_info_flattened = true};
- if (sql_prepare_and_execute(&request, &response, &fiber()->gc) != 0)
- return luaT_error(L);
+ if (sql_prepare_and_execute(&request, &response, &fiber()->gc,
+ ER_SYSTEM) != 0)
+ goto sqlerror;
int keys;
struct vstream vstream;
@@ -140,9 +39,12 @@ lbox_execute(struct lua_State *L)
lua_newtable(L);
if (sql_response_dump(&response, &keys, &vstream) != 0) {
lua_pop(L, 1);
- return luaT_error(L);
+ goto sqlerror;
}
return 1;
+sqlerror:
+ lua_pushstring(L, sqlite3_errmsg(db));
+ return lua_error(L);
}
static int
@@ -158,7 +60,6 @@ void
box_lua_sqlite_init(struct lua_State *L)
{
static const struct luaL_Reg module_funcs [] = {
- {"execute", lua_sql_execute},
{"new_execute", lbox_execute},
{"debug", lua_sql_debug},
{NULL, NULL}
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
` (4 preceding siblings ...)
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 5/5] sql: check new box.sql.execute() imeevma
@ 2018-12-02 11:03 ` imeevma
2018-12-03 15:21 ` Vladimir Davydov
5 siblings, 1 reply; 12+ messages in thread
From: imeevma @ 2018-12-02 11:03 UTC (permalink / raw)
To: v.shpilevoy, tarantool-patches, vdavydov.dev, kostja
This patch is the most dubious patch due to the implicit use of
mpstream as a stream for obuf. Discussion and patch below.
It is worth noting that in this version of the patch nothing
changes. At this point there is no approved solution for this
patch.
On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>> }
>>>>>>
>>>>>> int
>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>> + struct mpstream *stream)
>>>>>> {
>>>>>> sqlite3 *db = sql_get();
>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>> if (column_count > 0) {
>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>> err:
>>>>>> rc = -1;
>>>>>> goto finish;
>>>>>> }
>>>>>> *keys = 2;
>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>> - if (pos == NULL) {
>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>> - goto err;
>>>>>> - }
>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>> - /*
>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>> - * with Tarantool 1.6
>>>>>> - */
>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>> + mpstream_flush(stream);
>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>
>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>
>>>>> And when you introduce vstream later, you simply move this code to
>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>> used in mpstream to port_dump instead of obuf?
>>>>
>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>> and is used for box.select.
>>>>
>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>
>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>> in both cases we not just dump in a specific format, but to a concrete
>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>> by obuf destination.
>>>
>>> There's port_dump_plain, which dumps port contents in a specific format.
>>> So port_dump_obuf would look ambiguous.
>>>
>>>>
>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>> not hot.
>>>
>>> That would interconnect port and mpstream, make them dependent on each
>>> other. I don't think that would be good.
>>>
>>>>
>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>> box.select and iproto_call will use port_dump_obuf.
>>>>
>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>
>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>> because it's very limited in its applicability. People already prefer to
>>> use box.call over box.insert/select/etc over iproto, and with the
>>> appearance of box.execute they are likely to stop using plain box.select
>>> at all.
>>>
>>> That said, personally I would try to pass reserve/alloc methods to port,
>>> see how it goes.
>>>
>>
>> I do not see a reason to slow down box.select if we can don't do it.
>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>> functions including select.
>
> box.select called from Lua code doesn't use port_dump_msgpack.
>
>>
>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>> port_dump_obuf and add port_dump_msgpack which does not depend on
>> mpstream and takes alloc/reserve/ctx directly.
>
> Better call the optimized version (the one without callbacks)
> port_dump_msgpack_obuf to avoid confusion IMO.
>
> Anyway, I'd try to run cbench to see if it really perfomrs better
> than the one using callbacks.
Patch:
commit a857e1129f9e38bde8c5c6037149887f695ec6dc
Author: Mergen Imeev <imeevma@gmail.com>
Date: Fri Nov 9 22:31:07 2018 +0300
iproto: replace obuf by mpstream in execute.c
It is useful to replace obuf by mpstream. It allows us to design
vstream. Interface vstream will be used as universal interface to
dump result of SQL queries.
Needed for #3505
diff --git a/src/box/execute.c b/src/box/execute.c
index 3a6cadf..0d266dd 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -35,7 +35,6 @@
#include "sql/sqliteLimit.h"
#include "errcode.h"
#include "small/region.h"
-#include "small/obuf.h"
#include "diag.h"
#include "sql.h"
#include "xrow.h"
@@ -43,6 +42,7 @@
#include "port.h"
#include "tuple.h"
#include "sql/vdbe.h"
+#include "mpstream.h"
const char *sql_type_strs[] = {
NULL,
@@ -530,23 +530,13 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
* @retval -1 Client or memory error.
*/
static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
+sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
int column_count)
{
assert(column_count > 0);
- int size = mp_sizeof_uint(IPROTO_METADATA) +
- mp_sizeof_array(column_count);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_uint(pos, IPROTO_METADATA);
- pos = mp_encode_array(pos, column_count);
+ mpstream_encode_uint(stream, IPROTO_METADATA);
+ mpstream_encode_array(stream, column_count);
for (int i = 0; i < column_count; ++i) {
- size_t size = mp_sizeof_map(2) +
- mp_sizeof_uint(IPROTO_FIELD_NAME) +
- mp_sizeof_uint(IPROTO_FIELD_TYPE);
const char *name = sqlite3_column_name(stmt, i);
const char *type = sqlite3_column_datatype(stmt, i);
/*
@@ -555,18 +545,11 @@ sql_get_description(struct sqlite3_stmt *stmt, struct obuf *out,
* column_name simply returns them.
*/
assert(name != NULL);
- size += mp_sizeof_str(strlen(name));
- size += mp_sizeof_str(strlen(type));
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- return -1;
- }
- pos = mp_encode_map(pos, 2);
- pos = mp_encode_uint(pos, IPROTO_FIELD_NAME);
- pos = mp_encode_str(pos, name, strlen(name));
- pos = mp_encode_uint(pos, IPROTO_FIELD_TYPE);
- pos = mp_encode_str(pos, type, strlen(type));
+ mpstream_encode_map(stream, 2);
+ mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
+ mpstream_encode_str(stream, name);
+ mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
+ mpstream_encode_str(stream, type);
}
return 0;
}
@@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
}
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream)
{
sqlite3 *db = sql_get();
struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
- struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
int rc = 0, column_count = sqlite3_column_count(stmt);
if (column_count > 0) {
- if (sql_get_description(stmt, out, column_count) != 0) {
+ if (sql_get_description(stmt, stream, column_count) != 0) {
err:
rc = -1;
goto finish;
}
*keys = 2;
- int size = mp_sizeof_uint(IPROTO_DATA) +
- mp_sizeof_array(port_tuple->size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_DATA);
- pos = mp_encode_array(pos, port_tuple->size);
- /*
- * Just like SELECT, SQL uses output format compatible
- * with Tarantool 1.6
- */
- if (port_dump_msgpack_16(&response->port, out) < 0) {
+ mpstream_encode_uint(stream, IPROTO_DATA);
+ mpstream_flush(stream);
+ if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
/* Failed port dump destroyes the port. */
goto err;
}
+ mpstream_reset(stream);
} else {
*keys = 1;
- assert(port_tuple->size == 0);
struct stailq *autoinc_id_list =
vdbe_autoinc_id_list((struct Vdbe *)stmt);
uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
- int size = mp_sizeof_uint(IPROTO_SQL_INFO) +
- mp_sizeof_map(map_size);
- char *pos = (char *) obuf_alloc(out, size);
- if (pos == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "pos");
- goto err;
- }
- pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
- pos = mp_encode_map(pos, map_size);
+ mpstream_encode_uint(stream, IPROTO_SQL_INFO);
+ mpstream_encode_map(stream, map_size);
uint64_t id_count = 0;
- int changes = db->nChange;
- size = mp_sizeof_uint(SQL_INFO_ROW_COUNT) +
- mp_sizeof_uint(changes);
if (!stailq_empty(autoinc_id_list)) {
struct autoinc_id_entry *id_entry;
- stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- size += id_entry->id >= 0 ?
- mp_sizeof_uint(id_entry->id) :
- mp_sizeof_int(id_entry->id);
+ stailq_foreach_entry(id_entry, autoinc_id_list, link)
id_count++;
- }
- size += mp_sizeof_uint(SQL_INFO_AUTOINCREMENT_IDS) +
- mp_sizeof_array(id_count);
- }
- char *buf = obuf_alloc(out, size);
- if (buf == NULL) {
- diag_set(OutOfMemory, size, "obuf_alloc", "buf");
- goto err;
}
- buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
- buf = mp_encode_uint(buf, changes);
+
+ mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
+ mpstream_encode_uint(stream, db->nChange);
if (!stailq_empty(autoinc_id_list)) {
- buf = mp_encode_uint(buf, SQL_INFO_AUTOINCREMENT_IDS);
- buf = mp_encode_array(buf, id_count);
+ mpstream_encode_uint(stream,
+ SQL_INFO_AUTOINCREMENT_IDS);
+ mpstream_encode_array(stream, id_count);
struct autoinc_id_entry *id_entry;
stailq_foreach_entry(id_entry, autoinc_id_list, link) {
- buf = id_entry->id >= 0 ?
- mp_encode_uint(buf, id_entry->id) :
- mp_encode_int(buf, id_entry->id);
+ int64_t value = id_entry->id;
+ if (id_entry->id >= 0)
+ mpstream_encode_uint(stream, value);
+ else
+ mpstream_encode_int(stream, value);
}
}
}
diff --git a/src/box/execute.h b/src/box/execute.h
index 5f3d5eb..65ac81c 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,10 +48,10 @@ enum sql_info_key {
extern const char *sql_info_key_strs[];
-struct obuf;
struct region;
struct sql_bind;
struct xrow_header;
+struct mpstream;
/** EXECUTE request. */
struct sql_request {
@@ -105,13 +105,14 @@ struct sql_response {
* +----------------------------------------------+
* @param response EXECUTE response.
* @param[out] keys number of keys in dumped map.
- * @param out Output buffer.
+ * @param stream stream to where result is written.
*
* @retval 0 Success.
* @retval -1 Memory error.
*/
int
-sql_response_dump(struct sql_response *response, int *keys, struct obuf *out);
+sql_response_dump(struct sql_response *response, int *keys,
+ struct mpstream *stream);
/**
* Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 7c11d05..b110900 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -61,6 +61,7 @@
#include "rmean.h"
#include "execute.h"
#include "errinj.h"
+#include "mpstream.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -1573,12 +1574,20 @@ error:
tx_reply_error(msg);
}
+/** Callback to forward and error from mpstream methods. */
+static void
+set_encode_error(void *error_ctx)
+{
+ *(bool *)error_ctx = true;
+}
+
static void
tx_process_sql(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out;
struct sql_response response;
+ bool is_error = false;
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1598,10 +1607,16 @@ tx_process_sql(struct cmsg *m)
/* Prepare memory for the iproto header. */
if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
goto error;
- if (sql_response_dump(&response, &keys, out) != 0) {
+ struct mpstream stream;
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ set_encode_error, &is_error);
+ if (is_error)
+ goto error;
+ if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
obuf_rollback_to_svp(out, &header_svp);
goto error;
}
+ mpstream_flush(&stream);
iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
iproto_wpos_create(&msg->wpos, out);
return;
--
2.7.4
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-12-02 11:03 ` [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
@ 2018-12-03 15:21 ` Vladimir Davydov
2018-12-03 20:48 ` [tarantool-patches] " Vladislav Shpilevoy
0 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-12-03 15:21 UTC (permalink / raw)
To: imeevma; +Cc: v.shpilevoy, tarantool-patches, kostja
On Sun, Dec 02, 2018 at 02:03:21PM +0300, imeevma@tarantool.org wrote:
> This patch is the most dubious patch due to the implicit use of
> mpstream as a stream for obuf. Discussion and patch below.
>
> It is worth noting that in this version of the patch nothing
> changes. At this point there is no approved solution for this
> patch.
>
>
> On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> > On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
> >>
> >>
> >> On 30/11/2018 13:19, Vladimir Davydov wrote:
> >>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
> >>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
> >>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
> >>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> >>>>>> }
> >>>>>>
> >>>>>> int
> >>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> >>>>>> +sql_response_dump(struct sql_response *response, int *keys,
> >>>>>> + struct mpstream *stream)
> >>>>>> {
> >>>>>> sqlite3 *db = sql_get();
> >>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> >>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> >>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
> >>>>>> if (column_count > 0) {
> >>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
> >>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
> >>>>>> err:
> >>>>>> rc = -1;
> >>>>>> goto finish;
> >>>>>> }
> >>>>>> *keys = 2;
> >>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
> >>>>>> - mp_sizeof_array(port_tuple->size);
> >>>>>> - char *pos = (char *) obuf_alloc(out, size);
> >>>>>> - if (pos == NULL) {
> >>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> >>>>>> - goto err;
> >>>>>> - }
> >>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
> >>>>>> - pos = mp_encode_array(pos, port_tuple->size);
> >>>>>> - /*
> >>>>>> - * Just like SELECT, SQL uses output format compatible
> >>>>>> - * with Tarantool 1.6
> >>>>>> - */
> >>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
> >>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
> >>>>>> + mpstream_flush(stream);
> >>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
> >>>>>
> >>>>> stream->ctx isn't guaranteed to be an obuf
> >>>>>
> >>>>> And when you introduce vstream later, you simply move this code to
> >>>>> another file. This is confusing. May be we should pass alloc/reserve
> >>>>> used in mpstream to port_dump instead of obuf?
> >>>>
> >>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
> >>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
> >>>> and is used for box.select.
> >>>>
> >>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
> >>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
> >>>> dump port to not obuf, then we will just add a new method to port_vtab.
> >>>>
> >>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
> >>>> in both cases we not just dump in a specific format, but to a concrete
> >>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
> >>>> by obuf destination.
> >>>
> >>> There's port_dump_plain, which dumps port contents in a specific format.
> >>> So port_dump_obuf would look ambiguous.
> >>>
> >>>>
> >>>> If you worry about how to call sql_response_dump() to not obuf, then there
> >>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
> >>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
> >>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
> >>>> but use the full power of virtual functions in execute.c, which definitely is
> >>>> not hot.
> >>>
> >>> That would interconnect port and mpstream, make them dependent on each
> >>> other. I don't think that would be good.
> >>>
> >>>>
> >>>> mpstream implementation of vstream will call port_dump_mpstream, and
> >>>> luastream implementation of vstream will call port_dump_lua as it does now.
> >>>> box.select and iproto_call will use port_dump_obuf.
> >>>>
> >>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
> >>>
> >>> I may be wrong, but IMO there isn't much point in optimizing box.select,
> >>> because it's very limited in its applicability. People already prefer to
> >>> use box.call over box.insert/select/etc over iproto, and with the
> >>> appearance of box.execute they are likely to stop using plain box.select
> >>> at all.
> >>>
> >>> That said, personally I would try to pass reserve/alloc methods to port,
> >>> see how it goes.
> >>>
> >>
> >> I do not see a reason to slow down box.select if we can don't do it.
> >> Yeas, people use IPROTO_CALL, but in stored functions they use box
> >> functions including select.
> >
> > box.select called from Lua code doesn't use port_dump_msgpack.
> >
> >>
> >> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
> >> port_dump_obuf and add port_dump_msgpack which does not depend on
> >> mpstream and takes alloc/reserve/ctx directly.
> >
> > Better call the optimized version (the one without callbacks)
> > port_dump_msgpack_obuf to avoid confusion IMO.
> >
> > Anyway, I'd try to run cbench to see if it really perfomrs better
> > than the one using callbacks.
>
> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> }
>
> int
> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> +sql_response_dump(struct sql_response *response, int *keys,
> + struct mpstream *stream)
> {
> sqlite3 *db = sql_get();
> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> int rc = 0, column_count = sqlite3_column_count(stmt);
> if (column_count > 0) {
> - if (sql_get_description(stmt, out, column_count) != 0) {
> + if (sql_get_description(stmt, stream, column_count) != 0) {
> err:
> rc = -1;
> goto finish;
> }
> *keys = 2;
> - int size = mp_sizeof_uint(IPROTO_DATA) +
> - mp_sizeof_array(port_tuple->size);
> - char *pos = (char *) obuf_alloc(out, size);
> - if (pos == NULL) {
> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> - goto err;
> - }
> - pos = mp_encode_uint(pos, IPROTO_DATA);
> - pos = mp_encode_array(pos, port_tuple->size);
> - /*
> - * Just like SELECT, SQL uses output format compatible
> - * with Tarantool 1.6
> - */
> - if (port_dump_msgpack_16(&response->port, out) < 0) {
> + mpstream_encode_uint(stream, IPROTO_DATA);
> + mpstream_flush(stream);
> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
Still, I'm quite convinced that we need to pass alloc/reserve methods
along with ctx to port_dump_msgpack(), because implicitly assumping that
mpstream->ctx is, in fact, an obuf looks very fragile. However, Vlad is
right that it may indeed affect performance in a negative way. So let's
perhaps do the following:
1. Run cbench to see how badly indirect obuf_alloc/reserve slows
things down.
2. Consider the possibility of using templates or macro definitions
instead of function pointers.
What do you think?
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-12-03 15:21 ` Vladimir Davydov
@ 2018-12-03 20:48 ` Vladislav Shpilevoy
2018-12-04 8:26 ` Vladimir Davydov
0 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-03 20:48 UTC (permalink / raw)
To: Vladimir Davydov, imeevma; +Cc: tarantool-patches, kostja
On 03/12/2018 18:21, Vladimir Davydov wrote:
> On Sun, Dec 02, 2018 at 02:03:21PM +0300, imeevma@tarantool.org wrote:
>> This patch is the most dubious patch due to the implicit use of
>> mpstream as a stream for obuf. Discussion and patch below.
>>
>> It is worth noting that in this version of the patch nothing
>> changes. At this point there is no approved solution for this
>> patch.
>>
>>
>> On 11/30/18 1:55 PM, Vladimir Davydov wrote:
>>> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>>>
>>>>
>>>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>>> }
>>>>>>>>
>>>>>>>> int
>>>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>>>> + struct mpstream *stream)
>>>>>>>> {
>>>>>>>> sqlite3 *db = sql_get();
>>>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>>> if (column_count > 0) {
>>>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>>> err:
>>>>>>>> rc = -1;
>>>>>>>> goto finish;
>>>>>>>> }
>>>>>>>> *keys = 2;
>>>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>>>> - if (pos == NULL) {
>>>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>>>> - goto err;
>>>>>>>> - }
>>>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>>>> - /*
>>>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>>>> - * with Tarantool 1.6
>>>>>>>> - */
>>>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>>>> + mpstream_flush(stream);
>>>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>>>
>>>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>>>
>>>>>>> And when you introduce vstream later, you simply move this code to
>>>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>>>> used in mpstream to port_dump instead of obuf?
>>>>>>
>>>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>>>> and is used for box.select.
>>>>>>
>>>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>>>
>>>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>>>> in both cases we not just dump in a specific format, but to a concrete
>>>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>>>> by obuf destination.
>>>>>
>>>>> There's port_dump_plain, which dumps port contents in a specific format.
>>>>> So port_dump_obuf would look ambiguous.
>>>>>
>>>>>>
>>>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>>>> not hot.
>>>>>
>>>>> That would interconnect port and mpstream, make them dependent on each
>>>>> other. I don't think that would be good.
>>>>>
>>>>>>
>>>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>>>> box.select and iproto_call will use port_dump_obuf.
>>>>>>
>>>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>>>
>>>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>>>> because it's very limited in its applicability. People already prefer to
>>>>> use box.call over box.insert/select/etc over iproto, and with the
>>>>> appearance of box.execute they are likely to stop using plain box.select
>>>>> at all.
>>>>>
>>>>> That said, personally I would try to pass reserve/alloc methods to port,
>>>>> see how it goes.
>>>>>
>>>>
>>>> I do not see a reason to slow down box.select if we can don't do it.
>>>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>>>> functions including select.
>>>
>>> box.select called from Lua code doesn't use port_dump_msgpack.
>>>
>>>>
>>>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>>>> port_dump_obuf and add port_dump_msgpack which does not depend on
>>>> mpstream and takes alloc/reserve/ctx directly.
>>>
>>> Better call the optimized version (the one without callbacks)
>>> port_dump_msgpack_obuf to avoid confusion IMO.
>>>
>>> Anyway, I'd try to run cbench to see if it really perfomrs better
>>> than the one using callbacks.
>>
>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>> }
>>
>> int
>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>> +sql_response_dump(struct sql_response *response, int *keys,
>> + struct mpstream *stream)
>> {
>> sqlite3 *db = sql_get();
>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>> int rc = 0, column_count = sqlite3_column_count(stmt);
>> if (column_count > 0) {
>> - if (sql_get_description(stmt, out, column_count) != 0) {
>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>> err:
>> rc = -1;
>> goto finish;
>> }
>> *keys = 2;
>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>> - mp_sizeof_array(port_tuple->size);
>> - char *pos = (char *) obuf_alloc(out, size);
>> - if (pos == NULL) {
>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>> - goto err;
>> - }
>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>> - pos = mp_encode_array(pos, port_tuple->size);
>> - /*
>> - * Just like SELECT, SQL uses output format compatible
>> - * with Tarantool 1.6
>> - */
>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>> + mpstream_encode_uint(stream, IPROTO_DATA);
>> + mpstream_flush(stream);
>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>
> Still, I'm quite convinced that we need to pass alloc/reserve methods
> along with ctx to port_dump_msgpack(), because implicitly assumping that
> mpstream->ctx is, in fact, an obuf looks very fragile. However, Vlad is
> right that it may indeed affect performance in a negative way. So let's
> perhaps do the following:
>
> 1. Run cbench to see how badly indirect obuf_alloc/reserve slows
> things down.
>
> 2. Consider the possibility of using templates or macro definitions
> instead of function pointers.
>
> What do you think?
>
Good plan except one thing in its second point: port still must feature
double-virtualized method taking alloc/reserve to be "dumpable" via
mpstream. Yes, we can leave obuf method, even add region dump method in
future, but for mpstream it requires virtual alloc/reserve anyway
(until mpstream is templated). My point is in saving every single
percent of perf for calls and selects. For SQL alloc/reserve is enough.
What about bench - yes, maybe it is worth benching double-virtualized
port vs specialized. It should test calls and selects. But one problem -
as I know, cbench does not use iproto but port_dump_msgpack is reachable
from iproto only.
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-12-03 20:48 ` [tarantool-patches] " Vladislav Shpilevoy
@ 2018-12-04 8:26 ` Vladimir Davydov
2018-12-04 11:28 ` Vladislav Shpilevoy
0 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-12-04 8:26 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: imeevma, tarantool-patches, kostja
On Mon, Dec 03, 2018 at 11:48:26PM +0300, Vladislav Shpilevoy wrote:
>
>
> On 03/12/2018 18:21, Vladimir Davydov wrote:
> > On Sun, Dec 02, 2018 at 02:03:21PM +0300, imeevma@tarantool.org wrote:
> > > This patch is the most dubious patch due to the implicit use of
> > > mpstream as a stream for obuf. Discussion and patch below.
> > >
> > > It is worth noting that in this version of the patch nothing
> > > changes. At this point there is no approved solution for this
> > > patch.
> > >
> > >
> > > On 11/30/18 1:55 PM, Vladimir Davydov wrote:
> > > > On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
> > > > >
> > > > >
> > > > > On 30/11/2018 13:19, Vladimir Davydov wrote:
> > > > > > On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
> > > > > > > On 29/11/2018 13:53, Vladimir Davydov wrote:
> > > > > > > > On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
> > > > > > > > > @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> > > > > > > > > }
> > > > > > > > > int
> > > > > > > > > -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> > > > > > > > > +sql_response_dump(struct sql_response *response, int *keys,
> > > > > > > > > + struct mpstream *stream)
> > > > > > > > > {
> > > > > > > > > sqlite3 *db = sql_get();
> > > > > > > > > struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> > > > > > > > > - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> > > > > > > > > int rc = 0, column_count = sqlite3_column_count(stmt);
> > > > > > > > > if (column_count > 0) {
> > > > > > > > > - if (sql_get_description(stmt, out, column_count) != 0) {
> > > > > > > > > + if (sql_get_description(stmt, stream, column_count) != 0) {
> > > > > > > > > err:
> > > > > > > > > rc = -1;
> > > > > > > > > goto finish;
> > > > > > > > > }
> > > > > > > > > *keys = 2;
> > > > > > > > > - int size = mp_sizeof_uint(IPROTO_DATA) +
> > > > > > > > > - mp_sizeof_array(port_tuple->size);
> > > > > > > > > - char *pos = (char *) obuf_alloc(out, size);
> > > > > > > > > - if (pos == NULL) {
> > > > > > > > > - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> > > > > > > > > - goto err;
> > > > > > > > > - }
> > > > > > > > > - pos = mp_encode_uint(pos, IPROTO_DATA);
> > > > > > > > > - pos = mp_encode_array(pos, port_tuple->size);
> > > > > > > > > - /*
> > > > > > > > > - * Just like SELECT, SQL uses output format compatible
> > > > > > > > > - * with Tarantool 1.6
> > > > > > > > > - */
> > > > > > > > > - if (port_dump_msgpack_16(&response->port, out) < 0) {
> > > > > > > > > + mpstream_encode_uint(stream, IPROTO_DATA);
> > > > > > > > > + mpstream_flush(stream);
> > > > > > > > > + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
> > > > > > > >
> > > > > > > > stream->ctx isn't guaranteed to be an obuf
> > > > > > > >
> > > > > > > > And when you introduce vstream later, you simply move this code to
> > > > > > > > another file. This is confusing. May be we should pass alloc/reserve
> > > > > > > > used in mpstream to port_dump instead of obuf?
> > > > > > >
> > > > > > > Good idea, though not sure, if it is worth slowing down port_dump_msgpack
> > > > > > > adding a new level of indirection. Since port_dump_msgpack is a hot path
> > > > > > > and is used for box.select.
> > > > > > >
> > > > > > > Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
> > > > > > > and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
> > > > > > > dump port to not obuf, then we will just add a new method to port_vtab.
> > > > > > >
> > > > > > > Also, it would make port_dump_obuf name consistent with port_dump_lua -
> > > > > > > in both cases we not just dump in a specific format, but to a concrete
> > > > > > > destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
> > > > > > > by obuf destination.
> > > > > >
> > > > > > There's port_dump_plain, which dumps port contents in a specific format.
> > > > > > So port_dump_obuf would look ambiguous.
> > > > > >
> > > > > > >
> > > > > > > If you worry about how to call sql_response_dump() to not obuf, then there
> > > > > > > is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
> > > > > > > introduce a new method: port_dump_mpstream. It will take mpstream and use
> > > > > > > its reserve/alloc/error functions. It allows us to do not slow down box.select,
> > > > > > > but use the full power of virtual functions in execute.c, which definitely is
> > > > > > > not hot.
> > > > > >
> > > > > > That would interconnect port and mpstream, make them dependent on each
> > > > > > other. I don't think that would be good.
> > > > > >
> > > > > > >
> > > > > > > mpstream implementation of vstream will call port_dump_mpstream, and
> > > > > > > luastream implementation of vstream will call port_dump_lua as it does now.
> > > > > > > box.select and iproto_call will use port_dump_obuf.
> > > > > > >
> > > > > > > I prefer the second option: introduce port_dump_mpstream. It is ok for you?
> > > > > >
> > > > > > I may be wrong, but IMO there isn't much point in optimizing box.select,
> > > > > > because it's very limited in its applicability. People already prefer to
> > > > > > use box.call over box.insert/select/etc over iproto, and with the
> > > > > > appearance of box.execute they are likely to stop using plain box.select
> > > > > > at all.
> > > > > >
> > > > > > That said, personally I would try to pass reserve/alloc methods to port,
> > > > > > see how it goes.
> > > > > >
> > > > >
> > > > > I do not see a reason to slow down box.select if we can don't do it.
> > > > > Yeas, people use IPROTO_CALL, but in stored functions they use box
> > > > > functions including select.
> > > >
> > > > box.select called from Lua code doesn't use port_dump_msgpack.
> > > >
> > > > >
> > > > > Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
> > > > > port_dump_obuf and add port_dump_msgpack which does not depend on
> > > > > mpstream and takes alloc/reserve/ctx directly.
> > > >
> > > > Better call the optimized version (the one without callbacks)
> > > > port_dump_msgpack_obuf to avoid confusion IMO.
> > > >
> > > > Anyway, I'd try to run cbench to see if it really perfomrs better
> > > > than the one using callbacks.
> > >
> > > @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
> > > }
> > > int
> > > -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
> > > +sql_response_dump(struct sql_response *response, int *keys,
> > > + struct mpstream *stream)
> > > {
> > > sqlite3 *db = sql_get();
> > > struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
> > > - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
> > > int rc = 0, column_count = sqlite3_column_count(stmt);
> > > if (column_count > 0) {
> > > - if (sql_get_description(stmt, out, column_count) != 0) {
> > > + if (sql_get_description(stmt, stream, column_count) != 0) {
> > > err:
> > > rc = -1;
> > > goto finish;
> > > }
> > > *keys = 2;
> > > - int size = mp_sizeof_uint(IPROTO_DATA) +
> > > - mp_sizeof_array(port_tuple->size);
> > > - char *pos = (char *) obuf_alloc(out, size);
> > > - if (pos == NULL) {
> > > - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
> > > - goto err;
> > > - }
> > > - pos = mp_encode_uint(pos, IPROTO_DATA);
> > > - pos = mp_encode_array(pos, port_tuple->size);
> > > - /*
> > > - * Just like SELECT, SQL uses output format compatible
> > > - * with Tarantool 1.6
> > > - */
> > > - if (port_dump_msgpack_16(&response->port, out) < 0) {
> > > + mpstream_encode_uint(stream, IPROTO_DATA);
> > > + mpstream_flush(stream);
> > > + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
> >
> > Still, I'm quite convinced that we need to pass alloc/reserve methods
> > along with ctx to port_dump_msgpack(), because implicitly assumping that
> > mpstream->ctx is, in fact, an obuf looks very fragile. However, Vlad is
> > right that it may indeed affect performance in a negative way. So let's
> > perhaps do the following:
> >
> > 1. Run cbench to see how badly indirect obuf_alloc/reserve slows
> > things down.
> >
> > 2. Consider the possibility of using templates or macro definitions
> > instead of function pointers.
> >
> > What do you think?
> >
>
> Good plan except one thing in its second point: port still must feature
> double-virtualized method taking alloc/reserve to be "dumpable" via
> mpstream. Yes, we can leave obuf method, even add region dump method in
> future, but for mpstream it requires virtual alloc/reserve anyway
> (until mpstream is templated). My point is in saving every single
> percent of perf for calls and selects. For SQL alloc/reserve is enough.
Anyway, it'd be nice to see how much it's going to save, exactly.
>
> What about bench - yes, maybe it is worth benching double-virtualized
> port vs specialized. It should test calls and selects. But one problem -
> as I know, cbench does not use iproto but port_dump_msgpack is reachable
> from iproto only.
Let's try nosqlbench then.
^ permalink raw reply [flat|nested] 12+ messages in thread
* Re: [tarantool-patches] Re: [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c
2018-12-04 8:26 ` Vladimir Davydov
@ 2018-12-04 11:28 ` Vladislav Shpilevoy
0 siblings, 0 replies; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-12-04 11:28 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: imeevma, tarantool-patches, kostja
On 04/12/2018 11:26, Vladimir Davydov wrote:
> On Mon, Dec 03, 2018 at 11:48:26PM +0300, Vladislav Shpilevoy wrote:
>>
>>
>> On 03/12/2018 18:21, Vladimir Davydov wrote:
>>> On Sun, Dec 02, 2018 at 02:03:21PM +0300, imeevma@tarantool.org wrote:
>>>> This patch is the most dubious patch due to the implicit use of
>>>> mpstream as a stream for obuf. Discussion and patch below.
>>>>
>>>> It is worth noting that in this version of the patch nothing
>>>> changes. At this point there is no approved solution for this
>>>> patch.
>>>>
>>>>
>>>> On 11/30/18 1:55 PM, Vladimir Davydov wrote:
>>>>> On Fri, Nov 30, 2018 at 01:45:48PM +0300, Vladislav Shpilevoy wrote:
>>>>>>
>>>>>>
>>>>>> On 30/11/2018 13:19, Vladimir Davydov wrote:
>>>>>>> On Thu, Nov 29, 2018 at 05:04:06PM +0300, Vladislav Shpilevoy wrote:
>>>>>>>> On 29/11/2018 13:53, Vladimir Davydov wrote:
>>>>>>>>> On Tue, Nov 27, 2018 at 10:25:43PM +0300, imeevma@tarantool.org wrote:
>>>>>>>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>>>>>>>> }
>>>>>>>>>> int
>>>>>>>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>>>>>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>>>>>>>> + struct mpstream *stream)
>>>>>>>>>> {
>>>>>>>>>> sqlite3 *db = sql_get();
>>>>>>>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>>>>>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>>>>>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>>>>>>>> if (column_count > 0) {
>>>>>>>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>>>>>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>>>>>>>> err:
>>>>>>>>>> rc = -1;
>>>>>>>>>> goto finish;
>>>>>>>>>> }
>>>>>>>>>> *keys = 2;
>>>>>>>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>>>>>>>> - mp_sizeof_array(port_tuple->size);
>>>>>>>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>>>>>>>> - if (pos == NULL) {
>>>>>>>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>>>>>>>> - goto err;
>>>>>>>>>> - }
>>>>>>>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>>>>>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>>>>>>>> - /*
>>>>>>>>>> - * Just like SELECT, SQL uses output format compatible
>>>>>>>>>> - * with Tarantool 1.6
>>>>>>>>>> - */
>>>>>>>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>>>>>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>>>>>>>> + mpstream_flush(stream);
>>>>>>>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>>>>>>>
>>>>>>>>> stream->ctx isn't guaranteed to be an obuf
>>>>>>>>>
>>>>>>>>> And when you introduce vstream later, you simply move this code to
>>>>>>>>> another file. This is confusing. May be we should pass alloc/reserve
>>>>>>>>> used in mpstream to port_dump instead of obuf?
>>>>>>>>
>>>>>>>> Good idea, though not sure, if it is worth slowing down port_dump_msgpack
>>>>>>>> adding a new level of indirection. Since port_dump_msgpack is a hot path
>>>>>>>> and is used for box.select.
>>>>>>>>
>>>>>>>> Maybe it is better to just rename port_dump_msgpack to port_dump_obuf
>>>>>>>> and rename vstream_port_dump to vstream_port_dump_obuf? If we ever will
>>>>>>>> dump port to not obuf, then we will just add a new method to port_vtab.
>>>>>>>>
>>>>>>>> Also, it would make port_dump_obuf name consistent with port_dump_lua -
>>>>>>>> in both cases we not just dump in a specific format, but to a concrete
>>>>>>>> destination: obuf and lua stack. Now port_dump_msgpack anyway is restricted
>>>>>>>> by obuf destination.
>>>>>>>
>>>>>>> There's port_dump_plain, which dumps port contents in a specific format.
>>>>>>> So port_dump_obuf would look ambiguous.
>>>>>>>
>>>>>>>>
>>>>>>>> If you worry about how to call sql_response_dump() to not obuf, then there
>>>>>>>> is another option. Anyway rename port_dump_msgpack to port_dump_obuf and
>>>>>>>> introduce a new method: port_dump_mpstream. It will take mpstream and use
>>>>>>>> its reserve/alloc/error functions. It allows us to do not slow down box.select,
>>>>>>>> but use the full power of virtual functions in execute.c, which definitely is
>>>>>>>> not hot.
>>>>>>>
>>>>>>> That would interconnect port and mpstream, make them dependent on each
>>>>>>> other. I don't think that would be good.
>>>>>>>
>>>>>>>>
>>>>>>>> mpstream implementation of vstream will call port_dump_mpstream, and
>>>>>>>> luastream implementation of vstream will call port_dump_lua as it does now.
>>>>>>>> box.select and iproto_call will use port_dump_obuf.
>>>>>>>>
>>>>>>>> I prefer the second option: introduce port_dump_mpstream. It is ok for you?
>>>>>>>
>>>>>>> I may be wrong, but IMO there isn't much point in optimizing box.select,
>>>>>>> because it's very limited in its applicability. People already prefer to
>>>>>>> use box.call over box.insert/select/etc over iproto, and with the
>>>>>>> appearance of box.execute they are likely to stop using plain box.select
>>>>>>> at all.
>>>>>>>
>>>>>>> That said, personally I would try to pass reserve/alloc methods to port,
>>>>>>> see how it goes.
>>>>>>>
>>>>>>
>>>>>> I do not see a reason to slow down box.select if we can don't do it.
>>>>>> Yeas, people use IPROTO_CALL, but in stored functions they use box
>>>>>> functions including select.
>>>>>
>>>>> box.select called from Lua code doesn't use port_dump_msgpack.
>>>>>
>>>>>>
>>>>>> Ok, instead of port_dump_mpstream we can rename port_dump_msgpack to
>>>>>> port_dump_obuf and add port_dump_msgpack which does not depend on
>>>>>> mpstream and takes alloc/reserve/ctx directly.
>>>>>
>>>>> Better call the optimized version (the one without callbacks)
>>>>> port_dump_msgpack_obuf to avoid confusion IMO.
>>>>>
>>>>> Anyway, I'd try to run cbench to see if it really perfomrs better
>>>>> than the one using callbacks.
>>>>
>>>> @@ -625,81 +608,53 @@ sql_prepare_and_execute(const struct sql_request *request,
>>>> }
>>>> int
>>>> -sql_response_dump(struct sql_response *response, int *keys, struct obuf *out)
>>>> +sql_response_dump(struct sql_response *response, int *keys,
>>>> + struct mpstream *stream)
>>>> {
>>>> sqlite3 *db = sql_get();
>>>> struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
>>>> - struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
>>>> int rc = 0, column_count = sqlite3_column_count(stmt);
>>>> if (column_count > 0) {
>>>> - if (sql_get_description(stmt, out, column_count) != 0) {
>>>> + if (sql_get_description(stmt, stream, column_count) != 0) {
>>>> err:
>>>> rc = -1;
>>>> goto finish;
>>>> }
>>>> *keys = 2;
>>>> - int size = mp_sizeof_uint(IPROTO_DATA) +
>>>> - mp_sizeof_array(port_tuple->size);
>>>> - char *pos = (char *) obuf_alloc(out, size);
>>>> - if (pos == NULL) {
>>>> - diag_set(OutOfMemory, size, "obuf_alloc", "pos");
>>>> - goto err;
>>>> - }
>>>> - pos = mp_encode_uint(pos, IPROTO_DATA);
>>>> - pos = mp_encode_array(pos, port_tuple->size);
>>>> - /*
>>>> - * Just like SELECT, SQL uses output format compatible
>>>> - * with Tarantool 1.6
>>>> - */
>>>> - if (port_dump_msgpack_16(&response->port, out) < 0) {
>>>> + mpstream_encode_uint(stream, IPROTO_DATA);
>>>> + mpstream_flush(stream);
>>>> + if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
>>>
>>> Still, I'm quite convinced that we need to pass alloc/reserve methods
>>> along with ctx to port_dump_msgpack(), because implicitly assumping that
>>> mpstream->ctx is, in fact, an obuf looks very fragile. However, Vlad is
>>> right that it may indeed affect performance in a negative way. So let's
>>> perhaps do the following:
>>>
>>> 1. Run cbench to see how badly indirect obuf_alloc/reserve slows
>>> things down.
>>>
>>> 2. Consider the possibility of using templates or macro definitions
>>> instead of function pointers.
>>>
>>> What do you think?
>>>
>>
>> Good plan except one thing in its second point: port still must feature
>> double-virtualized method taking alloc/reserve to be "dumpable" via
>> mpstream. Yes, we can leave obuf method, even add region dump method in
>> future, but for mpstream it requires virtual alloc/reserve anyway
>> (until mpstream is templated). My point is in saving every single
>> percent of perf for calls and selects. For SQL alloc/reserve is enough.
>
> Anyway, it'd be nice to see how much it's going to save, exactly.
Ok. Kostja yesterday verbally said that it is not worth benching such minor
thing and we should not introduce second virtualization level, but lets wait
for a written approval.
^ permalink raw reply [flat|nested] 12+ messages in thread