Tarantool development patches archive
 help / color / mirror / Atom feed
From: imeevma@tarantool.org
To: v.shpilevoy@tarantool.org, tarantool-patches@freelists.org,
	kostja@tarantool.org
Subject: [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream
Date: Fri, 30 Nov 2018 22:01:21 +0300	[thread overview]
Message-ID: <2c3de27a56d37ec8f5d630c42326389381fd1745.1543604148.git.imeevma@gmail.com> (raw)
In-Reply-To: <cover.1543604148.git.imeevma@gmail.com>

If we want to use functions from execute.h not only in IPROTO we
should create special interface. This interface will allow us to
create different implementations for mpstream and lua_State and
use functions from execute.c without changing them. This patch
creates such interface and its implementation for mpstream and
replaces mpstream functions in execute.c by methods of this
interface.

Needed for #3505
---
 src/box/execute.c |  61 ++++++++++++--------
 src/box/execute.h |  18 +++++-
 src/box/iproto.cc |  11 ++--
 src/mpstream.c    |  57 ++++++++++++++++++
 src/mpstream.h    |   7 +++
 src/vstream.h     | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 6 files changed, 294 insertions(+), 30 deletions(-)
 create mode 100644 src/vstream.h

diff --git a/src/box/execute.c b/src/box/execute.c
index 0d266dd..36b861f 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -42,7 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
-#include "mpstream.h"
+#include "vstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,12 +530,12 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
+sql_get_description(struct sqlite3_stmt *stmt, struct vstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	mpstream_encode_uint(stream, IPROTO_METADATA);
-	mpstream_encode_array(stream, column_count);
+	vstream_encode_enum(stream, IPROTO_METADATA, "metadata");
+	vstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
@@ -545,12 +545,19 @@ sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		 * column_name simply returns them.
 		 */
 		assert(name != NULL);
-		mpstream_encode_map(stream, 2);
-		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
-		mpstream_encode_str(stream, name);
-		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
-		mpstream_encode_str(stream, type);
+		vstream_encode_map(stream, 2);
+
+		vstream_encode_enum(stream, IPROTO_FIELD_NAME, "name");
+		vstream_encode_str(stream, name);
+		vstream_encode_map_commit(stream);
+
+		vstream_encode_enum(stream, IPROTO_FIELD_TYPE, "type");
+		vstream_encode_str(stream, type);
+		vstream_encode_map_commit(stream);
+
+		vstream_encode_array_commit(stream, i);
 	}
+	vstream_encode_map_commit(stream);
 	return 0;
 }
 
@@ -609,7 +616,7 @@ sql_prepare_and_execute(const struct sql_request *request,
 
 int
 sql_response_dump(struct sql_response *response, int *keys,
-		  struct mpstream *stream)
+		  struct vstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
@@ -621,42 +628,48 @@ err:
 			goto finish;
 		}
 		*keys = 2;
-		mpstream_encode_uint(stream, IPROTO_DATA);
-		mpstream_flush(stream);
-		if (port_dump_msgpack(&response->port, stream->ctx) < 0) {
+		vstream_encode_enum(stream, IPROTO_DATA, "rows");
+		if (vstream_encode_port(stream, &response->port) < 0) {
 			/* Failed port dump destroyes the port. */
 			goto err;
 		}
-		mpstream_reset(stream);
+		vstream_encode_map_commit(stream);
 	} else {
 		*keys = 1;
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
-		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
 		}
-
-		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
-		mpstream_encode_uint(stream, db->nChange);
+		if (!response->is_info_flattened) {
+			vstream_encode_enum(stream, IPROTO_SQL_INFO, "info");
+			vstream_encode_map(stream, map_size);
+		}
+		vstream_encode_enum(stream, SQL_INFO_ROW_COUNT, "rowcount");
+		vstream_encode_uint(stream, db->nChange);
+		vstream_encode_map_commit(stream);
 		if (!stailq_empty(autoinc_id_list)) {
-			mpstream_encode_uint(stream,
-					     SQL_INFO_AUTOINCREMENT_IDS);
-			mpstream_encode_array(stream, id_count);
+			vstream_encode_enum(stream, SQL_INFO_AUTOINCREMENT_IDS,
+					    "autoincrement_ids");
+			vstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
+			int i = 0;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
 				int64_t value = id_entry->id;
 				if (id_entry->id >= 0)
-					mpstream_encode_uint(stream, value);
+					vstream_encode_uint(stream, value);
 				else
-					mpstream_encode_int(stream, value);
+					vstream_encode_int(stream, value);
+				vstream_encode_array_commit(stream, i++);
 			}
+			vstream_encode_map_commit(stream);
 		}
+		if (!response->is_info_flattened)
+			vstream_encode_map_commit(stream);
 	}
 finish:
 	port_destroy(&response->port);
diff --git a/src/box/execute.h b/src/box/execute.h
index 65ac81c..56b7339 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -51,7 +51,7 @@ extern const char *sql_info_key_strs[];
 struct region;
 struct sql_bind;
 struct xrow_header;
-struct mpstream;
+struct vstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -74,6 +74,20 @@ struct sql_response {
 	struct port port;
 	/** Prepared SQL statement with metadata. */
 	void *prep_stmt;
+	/**
+	 * SQL response can be dumped into msgpack to be sent via
+	 * iproto or onto Lua stack to be returned into an
+	 * application. In the first case response body has
+	 * explicit field IPROTO_SQL_INFO: {rowcount = ...,
+	 * autoids = ...}. But in case of Lua this field is
+	 * flattened. A result never has 'info' field, it has
+	 * inlined 'rowcount' and 'autoids'. In iproto
+	 * IPROTO_SQL_INFO field is sent mostly to explicitly
+	 * distinguish two response types: DML/DDL vs DQL,
+	 * IPROTO_SQL_INFO vs IPROTO_METADATA. So this flag is set
+	 * by Lua and allows to flatten SQL_INFO fields.
+	 */
+	bool is_info_flattened;
 };
 
 /**
@@ -112,7 +126,7 @@ struct sql_response {
  */
 int
 sql_response_dump(struct sql_response *response, int *keys,
-		  struct mpstream *stream);
+		  struct vstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b110900..6e284f7 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -62,6 +62,7 @@
 #include "execute.h"
 #include "errinj.h"
 #include "mpstream.h"
+#include "vstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1587,6 +1588,7 @@ tx_process_sql(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct obuf *out;
 	struct sql_response response;
+	memset(&response, 0, sizeof(response));
 	bool is_error = false;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1607,16 +1609,17 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	struct mpstream stream;
-	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
-		      set_encode_error, &is_error);
+
+	struct vstream stream;
+	mpvstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+		       set_encode_error, &is_error);
 	if (is_error)
 		goto error;
 	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
-	mpstream_flush(&stream);
+	mpstream_flush((struct mpstream *)&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
diff --git a/src/mpstream.c b/src/mpstream.c
index e4f7950..6636da1 100644
--- a/src/mpstream.c
+++ b/src/mpstream.c
@@ -33,6 +33,8 @@
 #include <assert.h>
 #include <stdint.h>
 #include "msgpuck.h"
+#include "vstream.h"
+#include "port.h"
 
 void
 mpstream_reserve_slow(struct mpstream *stream, size_t size)
@@ -175,3 +177,58 @@ mpstream_encode_bool(struct mpstream *stream, bool val)
     char *pos = mp_encode_bool(data, val);
     mpstream_advance(stream, pos - data);
 }
+
+static int
+mp_vstream_encode_port(struct vstream *stream, struct port *port)
+{
+	struct mpstream *mpstream = (struct mpstream *)stream;
+	mpstream_flush(mpstream);
+	if (port_dump_msgpack(port, mpstream->ctx) < 0) {
+		/* Failed port dump destroyes the port. */
+		return -1;
+	}
+	mpstream_reset(mpstream);
+	return 0;
+}
+
+static void
+mp_vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+	(void)str;
+	if (num < 0)
+		mpstream_encode_int((struct mpstream *)stream, num);
+	else
+		mpstream_encode_uint((struct mpstream *)stream, num);
+}
+
+static void
+mp_vstream_noop(struct vstream *stream, ...)
+{
+	(void) stream;
+}
+
+static const struct vstream_vtab mp_vstream_vtab = {
+	/** encode_array = */ (encode_array_f)mpstream_encode_array,
+	/** encode_map = */ (encode_map_f)mpstream_encode_map,
+	/** encode_uint = */ (encode_uint_f)mpstream_encode_uint,
+	/** encode_int = */ (encode_int_f)mpstream_encode_int,
+	/** encode_float = */ (encode_float_f)mpstream_encode_float,
+	/** encode_double = */ (encode_double_f)mpstream_encode_double,
+	/** encode_strn = */ (encode_strn_f)mpstream_encode_strn,
+	/** encode_nil = */ (encode_nil_f)mpstream_encode_nil,
+	/** encode_bool = */ (encode_bool_f)mpstream_encode_bool,
+	/** encode_enum = */ mp_vstream_encode_enum,
+	/** encode_port = */ mp_vstream_encode_port,
+	/** encode_array_commit = */ (encode_array_commit_f)mp_vstream_noop,
+	/** encode_map_commit = */ (encode_map_commit_f)mp_vstream_noop,
+};
+
+void
+mpvstream_init(struct vstream *stream, void *ctx, mpstream_reserve_f reserve,
+	       mpstream_alloc_f alloc, mpstream_error_f error, void *error_ctx)
+{
+	stream->vtab = &mp_vstream_vtab;
+	assert(sizeof(stream->inheritance_padding) >= sizeof(struct mpstream));
+	mpstream_init((struct mpstream *) stream, ctx, reserve, alloc, error,
+		      error_ctx);
+}
diff --git a/src/mpstream.h b/src/mpstream.h
index e22d052..ce0e25d 100644
--- a/src/mpstream.h
+++ b/src/mpstream.h
@@ -78,6 +78,13 @@ mpstream_init(struct mpstream *stream, void *ctx,
 	      mpstream_reserve_f reserve, mpstream_alloc_f alloc,
 	      mpstream_error_f error, void *error_ctx);
 
+struct vstream;
+
+/** Initialize a vstream object as an instance of mpstream. */
+void
+mpvstream_init(struct vstream *stream, void *ctx, mpstream_reserve_f reserve,
+	       mpstream_alloc_f alloc, mpstream_error_f error, void *error_ctx);
+
 static inline void
 mpstream_flush(struct mpstream *stream)
 {
diff --git a/src/vstream.h b/src/vstream.h
new file mode 100644
index 0000000..fe7c49a
--- /dev/null
+++ b/src/vstream.h
@@ -0,0 +1,170 @@
+#ifndef TARANTOOL_VSTREAM_H_INCLUDED
+#define TARANTOOL_VSTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct vstream;
+struct port;
+
+typedef void (*encode_array_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_map_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_uint_f)(struct vstream *stream, uint64_t num);
+typedef void (*encode_int_f)(struct vstream *stream, int64_t num);
+typedef void (*encode_float_f)(struct vstream *stream, float num);
+typedef void (*encode_double_f)(struct vstream *stream, double num);
+typedef void (*encode_strn_f)(struct vstream *stream, const char *str,
+			      uint32_t len);
+typedef void (*encode_nil_f)(struct vstream *stream);
+typedef void (*encode_bool_f)(struct vstream *stream, bool val);
+typedef void (*encode_enum_f)(struct vstream *stream, int64_t num,
+			      const char *str);
+typedef int (*encode_port_f)(struct vstream *stream, struct port *port);
+typedef void (*encode_array_commit_f)(struct vstream *stream, uint32_t id);
+typedef void (*encode_map_commit_f)(struct vstream *stream);
+
+struct vstream_vtab {
+	encode_array_f encode_array;
+	encode_map_f encode_map;
+	encode_uint_f encode_uint;
+	encode_int_f encode_int;
+	encode_float_f encode_float;
+	encode_double_f encode_double;
+	encode_strn_f encode_strn;
+	encode_nil_f encode_nil;
+	encode_bool_f encode_bool;
+	encode_enum_f encode_enum;
+	encode_port_f encode_port;
+	encode_array_commit_f encode_array_commit;
+	encode_map_commit_f encode_map_commit;
+};
+
+struct vstream {
+	/** Here struct mpstream lives under the hood. */
+	char inheritance_padding[64];
+	/** Virtual function table. */
+	const struct vstream_vtab *vtab;
+};
+
+void
+mp_vstream_init_vtab(struct vstream *vstream);
+
+static inline void
+vstream_encode_array(struct vstream *stream, uint32_t size)
+{
+	return stream->vtab->encode_array(stream, size);
+}
+
+static inline void
+vstream_encode_map(struct vstream *stream, uint32_t size)
+{
+	return stream->vtab->encode_map(stream, size);
+}
+
+static inline void
+vstream_encode_uint(struct vstream *stream, uint64_t num)
+{
+	return stream->vtab->encode_uint(stream, num);
+}
+
+static inline void
+vstream_encode_int(struct vstream *stream, int64_t num)
+{
+	return stream->vtab->encode_int(stream, num);
+}
+
+static inline void
+vstream_encode_float(struct vstream *stream, float num)
+{
+	return stream->vtab->encode_float(stream, num);
+}
+
+static inline void
+vstream_encode_double(struct vstream *stream, double num)
+{
+	return stream->vtab->encode_double(stream, num);
+}
+
+static inline void
+vstream_encode_strn(struct vstream *stream, const char *str, uint32_t len)
+{
+	return stream->vtab->encode_strn(stream, str, len);
+}
+
+static inline void
+vstream_encode_str(struct vstream *stream, const char *str)
+{
+	return stream->vtab->encode_strn(stream, str, strlen(str));
+}
+
+static inline void
+vstream_encode_nil(struct vstream *stream)
+{
+	return stream->vtab->encode_nil(stream);
+}
+
+static inline void
+vstream_encode_bool(struct vstream *stream, bool val)
+{
+	return stream->vtab->encode_bool(stream, val);
+}
+
+static inline void
+vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+	return stream->vtab->encode_enum(stream, num, str);
+}
+
+static inline int
+vstream_encode_port(struct vstream *stream, struct port *port)
+{
+	return stream->vtab->encode_port(stream, port);
+}
+
+static inline void
+vstream_encode_map_commit(struct vstream *stream)
+{
+	return stream->vtab->encode_map_commit(stream);
+}
+
+static inline void
+vstream_encode_array_commit(struct vstream *stream, uint32_t id)
+{
+	return stream->vtab->encode_array_commit(stream, id);
+}
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_VSTREAM_H_INCLUDED */
-- 
2.7.4

  parent reply	other threads:[~2018-11-30 19:01 UTC|newest]

Thread overview: 12+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-11-30 19:00 [PATCH v4 0/5] Remove box.sql.execute() imeevma
2018-11-30 19:01 ` [PATCH v4 1/5] box: move port to src/ imeevma
2018-12-03  9:22   ` Vladimir Davydov
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
2018-11-30 19:01 ` imeevma [this message]
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 4/5] lua: create vstream implementation for Lua imeevma
2018-11-30 19:01 ` [tarantool-patches] [PATCH v4 5/5] sql: check new box.sql.execute() imeevma
2018-12-02 11:03 ` [PATCH v4 2/5] iproto: replace obuf by mpstream in execute.c imeevma
2018-12-03 15:21   ` Vladimir Davydov
2018-12-03 20:48     ` [tarantool-patches] " Vladislav Shpilevoy
2018-12-04  8:26       ` Vladimir Davydov
2018-12-04 11:28         ` Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=2c3de27a56d37ec8f5d630c42326389381fd1745.1543604148.git.imeevma@gmail.com \
    --to=imeevma@tarantool.org \
    --cc=kostja@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [tarantool-patches] [PATCH v4 3/5] sql: create interface vstream' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox