[tarantool-patches] [PATCH v2 5/7] sql: create interface vstream

imeevma at tarantool.org imeevma at tarantool.org
Thu Nov 22 22:11:00 MSK 2018


If we want to use functions from execute.h not only in IPROTO we
should create special interface. This interface will allow us to
create different implementations for mpstream and lua_State and
use functions from execute.c without changing them. This patch
creates such interface and its implementation for mpstream and
replaces mpstream functions in execute.c by methods of this
interface.

Needed for #3505
---
 src/box/execute.c |  69 ++++++++++++-----------
 src/box/execute.h |   6 +-
 src/box/iproto.cc |  11 ++--
 src/box/vstream.h | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/mpstream.c    |  78 ++++++++++++++++++++++++++
 5 files changed, 291 insertions(+), 37 deletions(-)
 create mode 100644 src/box/vstream.h

diff --git a/src/box/execute.c b/src/box/execute.c
index 1f0e5ab..8093d9c 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -42,7 +42,7 @@
 #include "port.h"
 #include "tuple.h"
 #include "sql/vdbe.h"
-#include "mpstream.h"
+#include "vstream.h"
 
 const char *sql_type_strs[] = {
 	NULL,
@@ -530,12 +530,12 @@ sql_bind(const struct sql_request *request, struct sqlite3_stmt *stmt)
  * @retval -1 Client or memory error.
  */
 static inline int
-sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
+sql_get_description(struct sqlite3_stmt *stmt, struct vstream *stream,
 		    int column_count)
 {
 	assert(column_count > 0);
-	mpstream_encode_uint(stream, IPROTO_METADATA);
-	mpstream_encode_array(stream, column_count);
+	vstream_encode_enum(stream, IPROTO_METADATA, "metadata");
+	vstream_encode_array(stream, column_count);
 	for (int i = 0; i < column_count; ++i) {
 		const char *name = sqlite3_column_name(stmt, i);
 		const char *type = sqlite3_column_datatype(stmt, i);
@@ -547,12 +547,19 @@ sql_get_description(struct sqlite3_stmt *stmt, struct mpstream *stream,
 		assert(name != NULL);
 		if (type == NULL)
 			type = "UNKNOWN";
-		mpstream_encode_map(stream, 2);
-		mpstream_encode_uint(stream, IPROTO_FIELD_NAME);
-		mpstream_encode_str(stream, name);
-		mpstream_encode_uint(stream, IPROTO_FIELD_TYPE);
-		mpstream_encode_str(stream, type);
+		vstream_encode_map(stream, 2);
+
+		vstream_encode_enum(stream, IPROTO_FIELD_NAME, "name");
+		vstream_encode_str(stream, name);
+		vstream_encode_map_commit(stream);
+
+		vstream_encode_enum(stream, IPROTO_FIELD_TYPE, "type");
+		vstream_encode_str(stream, type);
+		vstream_encode_map_commit(stream);
+
+		vstream_encode_array_commit(stream, i);
 	}
+	vstream_encode_map_commit(stream);
 	return 0;
 }
 
@@ -611,7 +618,7 @@ sql_prepare_and_execute(const struct sql_request *request,
 
 int
 sql_response_dump(struct sql_response *response, int *keys,
-		  struct mpstream *stream)
+		  struct vstream *stream)
 {
 	sqlite3 *db = sql_get();
 	struct sqlite3_stmt *stmt = (struct sqlite3_stmt *) response->prep_stmt;
@@ -624,48 +631,48 @@ err:
 			goto finish;
 		}
 		*keys = 2;
-		mpstream_encode_uint(stream, IPROTO_DATA);
-		mpstream_encode_array(stream, port_tuple->size);
-		mpstream_flush(stream);
-		/*
-		 * Just like SELECT, SQL uses output format compatible
-		 * with Tarantool 1.6
-		 */
-		if (port_dump_msgpack_16(&response->port, stream->ctx) < 0) {
-			/* Failed port dump destroyes the port. */
+		vstream_encode_enum(stream, IPROTO_DATA, "rows");
+		vstream_encode_array(stream, port_tuple->size);
+		if (vstream_encode_port(stream, &response->port) < 0)
 			goto err;
-		}
-		mpstream_reset(stream);
+		vstream_encode_map_commit(stream);
 	} else {
 		*keys = 1;
 		assert(port_tuple->size == 0);
 		struct stailq *autoinc_id_list =
 			vdbe_autoinc_id_list((struct Vdbe *)stmt);
 		uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
-		mpstream_encode_uint(stream, IPROTO_SQL_INFO);
-		mpstream_encode_map(stream, map_size);
 		uint64_t id_count = 0;
 		if (!stailq_empty(autoinc_id_list)) {
 			struct autoinc_id_entry *id_entry;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link)
 				id_count++;
 		}
-
-		mpstream_encode_uint(stream, SQL_INFO_ROW_COUNT);
-		mpstream_encode_uint(stream, db->nChange);
+		if (!response->is_flatten) {
+			vstream_encode_enum(stream, IPROTO_SQL_INFO, "info");
+			vstream_encode_map(stream, map_size);
+		}
+		vstream_encode_enum(stream, SQL_INFO_ROW_COUNT, "rowcount");
+		vstream_encode_uint(stream, db->nChange);
+		vstream_encode_map_commit(stream);
 		if (!stailq_empty(autoinc_id_list)) {
-			mpstream_encode_uint(stream,
-					     SQL_INFO_AUTOINCREMENT_IDS);
-			mpstream_encode_array(stream, id_count);
+			vstream_encode_enum(stream, SQL_INFO_AUTOINCREMENT_IDS,
+					    "autoincrement_ids");
+			vstream_encode_array(stream, id_count);
 			struct autoinc_id_entry *id_entry;
+			int i = 0;
 			stailq_foreach_entry(id_entry, autoinc_id_list, link) {
 				int64_t value = id_entry->id;
 				if (id_entry->id >= 0)
-					mpstream_encode_uint(stream, value);
+					vstream_encode_uint(stream, value);
 				else
-					mpstream_encode_int(stream, value);
+					vstream_encode_int(stream, value);
+				vstream_encode_array_commit(stream, i++);
 			}
+			vstream_encode_map_commit(stream);
 		}
+		if (!response->is_flatten)
+			vstream_encode_map_commit(stream);
 	}
 finish:
 	port_destroy(&response->port);
diff --git a/src/box/execute.h b/src/box/execute.h
index 940f3a3..5a11a8a 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -52,7 +52,7 @@ struct obuf;
 struct region;
 struct sql_bind;
 struct xrow_header;
-struct mpstream;
+struct vstream;
 
 /** EXECUTE request. */
 struct sql_request {
@@ -75,6 +75,8 @@ struct sql_response {
 	struct port port;
 	/** Prepared SQL statement with metadata. */
 	void *prep_stmt;
+	/** Result should be flatten if true. */
+	bool is_flatten;
 };
 
 /**
@@ -113,7 +115,7 @@ struct sql_response {
  */
 int
 sql_response_dump(struct sql_response *response, int *keys,
-		  struct mpstream *stream);
+		  struct vstream *stream);
 
 /**
  * Parse the EXECUTE request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b110900..380a6ee 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -62,6 +62,7 @@
 #include "execute.h"
 #include "errinj.h"
 #include "mpstream.h"
+#include "vstream.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -1607,16 +1608,18 @@ tx_process_sql(struct cmsg *m)
 	/* Prepare memory for the iproto header. */
 	if (iproto_prepare_header(out, &header_svp, IPROTO_SQL_HEADER_LEN) != 0)
 		goto error;
-	struct mpstream stream;
-	mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
-		      set_encode_error, &is_error);
+
+	struct vstream stream;
+	mpstream_init((struct mpstream *)&stream, out, obuf_reserve_cb,
+		      obuf_alloc_cb, set_encode_error, &is_error);
 	if (is_error)
 		goto error;
+	mp_vstream_init_vtab(&stream);
 	if (sql_response_dump(&response, &keys, &stream) != 0 || is_error) {
 		obuf_rollback_to_svp(out, &header_svp);
 		goto error;
 	}
-	mpstream_flush(&stream);
+	mpstream_flush((struct mpstream *)&stream);
 	iproto_reply_sql(out, &header_svp, response.sync, schema_version, keys);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
diff --git a/src/box/vstream.h b/src/box/vstream.h
new file mode 100644
index 0000000..a8dcfc2
--- /dev/null
+++ b/src/box/vstream.h
@@ -0,0 +1,164 @@
+#ifndef TARANTOOL_VSTREAM_H_INCLUDED
+#define TARANTOOL_VSTREAM_H_INCLUDED
+/*
+ * Copyright 2010-2018, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "diag.h"
+#include "mpstream.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct vstream;
+struct lua_State;
+struct port;
+
+struct vstream_vtab {
+	void (*encode_array)(struct vstream *stream, uint32_t size);
+	void (*encode_map)(struct vstream *stream, uint32_t size);
+	void (*encode_uint)(struct vstream *stream, uint64_t num);
+	void (*encode_int)(struct vstream *stream, int64_t num);
+	void (*encode_float)(struct vstream *stream, float num);
+	void (*encode_double)(struct vstream *stream, double num);
+	void (*encode_strn)(struct vstream *stream, const char *str,
+			    uint32_t len);
+	void (*encode_nil)(struct vstream *stream);
+	void (*encode_bool)(struct vstream *stream, bool val);
+	void (*encode_enum)(struct vstream *stream, int64_t num,
+			    const char *str);
+	int (*encode_port)(struct vstream *stream, struct port *port);
+	void (*encode_array_commit)(struct vstream *stream, uint32_t id);
+	void (*encode_map_commit)(struct vstream *stream);
+};
+
+struct vstream {
+	/** TODO: Write comment. */
+	union {
+		struct mpstream mpstream;
+		struct lua_State *L;
+	};
+	/** Virtual function table. */
+	const struct vstream_vtab *vtab;
+};
+
+void
+mp_vstream_init_vtab(struct vstream *vstream);
+
+static inline void
+vstream_encode_array(struct vstream *stream, uint32_t size)
+{
+	return stream->vtab->encode_array(stream, size);
+}
+
+static inline void
+vstream_encode_map(struct vstream *stream, uint32_t size)
+{
+	return stream->vtab->encode_map(stream, size);
+}
+
+static inline void
+vstream_encode_uint(struct vstream *stream, uint64_t num)
+{
+	return stream->vtab->encode_uint(stream, num);
+}
+
+static inline void
+vstream_encode_int(struct vstream *stream, int64_t num)
+{
+	return stream->vtab->encode_int(stream, num);
+}
+
+static inline void
+vstream_encode_float(struct vstream *stream, float num)
+{
+	return stream->vtab->encode_float(stream, num);
+}
+
+static inline void
+vstream_encode_double(struct vstream *stream, double num)
+{
+	return stream->vtab->encode_double(stream, num);
+}
+
+static inline void
+vstream_encode_strn(struct vstream *stream, const char *str, uint32_t len)
+{
+	return stream->vtab->encode_strn(stream, str, len);
+}
+
+static inline void
+vstream_encode_str(struct vstream *stream, const char *str)
+{
+	return stream->vtab->encode_strn(stream, str, strlen(str));
+}
+
+static inline void
+vstream_encode_nil(struct vstream *stream)
+{
+	return stream->vtab->encode_nil(stream);
+}
+
+static inline void
+vstream_encode_bool(struct vstream *stream, bool val)
+{
+	return stream->vtab->encode_bool(stream, val);
+}
+
+static inline void
+vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+	return stream->vtab->encode_enum(stream, num, str);
+}
+
+static inline int
+vstream_encode_port(struct vstream *stream, struct port *port)
+{
+	return stream->vtab->encode_port(stream, port);
+}
+
+static inline void
+vstream_encode_map_commit(struct vstream *stream)
+{
+	return stream->vtab->encode_map_commit(stream);
+}
+
+static inline void
+vstream_encode_array_commit(struct vstream *stream, uint32_t id)
+{
+	return stream->vtab->encode_array_commit(stream, id);
+}
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* TARANTOOL_VSTREAM_H_INCLUDED */
diff --git a/src/mpstream.c b/src/mpstream.c
index e4f7950..f9943e4 100644
--- a/src/mpstream.c
+++ b/src/mpstream.c
@@ -34,6 +34,11 @@
 #include <stdint.h>
 #include "msgpuck.h"
 
+#include "box/vstream.h"
+#include "box/iproto_constants.h"
+#include "box/port.h"
+#include "box/xrow.h"
+
 void
 mpstream_reserve_slow(struct mpstream *stream, size_t size)
 {
@@ -175,3 +180,76 @@ mpstream_encode_bool(struct mpstream *stream, bool val)
     char *pos = mp_encode_bool(data, val);
     mpstream_advance(stream, pos - data);
 }
+
+typedef void (*encode_array_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_map_f)(struct vstream *stream, uint32_t size);
+typedef void (*encode_uint_f)(struct vstream *stream, uint64_t num);
+typedef void (*encode_int_f)(struct vstream *stream, int64_t num);
+typedef void (*encode_float_f)(struct vstream *stream, float num);
+typedef void (*encode_double_f)(struct vstream *stream, double num);
+typedef void (*encode_strn_f)(struct vstream *stream, const char *str,
+			      uint32_t len);
+typedef void (*encode_nil_f)(struct vstream *stream);
+typedef void (*encode_bool_f)(struct vstream *stream, bool val);
+
+int
+mp_vstream_encode_port(struct vstream *stream, struct port *port)
+{
+	struct mpstream *mpstream = (struct mpstream *)stream;
+	mpstream_flush(mpstream);
+	/*
+	 * Just like SELECT, SQL uses output format compatible
+	 * with Tarantool 1.6
+	 */
+	if (port_dump_msgpack_16(port, mpstream->ctx) < 0) {
+		/* Failed port dump destroyes the port. */
+		return -1;
+	}
+	mpstream_reset(mpstream);
+	return 0;
+}
+
+void
+mp_vstream_encode_enum(struct vstream *stream, int64_t num, const char *str)
+{
+	(void)str;
+	if (num < 0)
+		mpstream_encode_int((struct mpstream *)stream, num);
+	else
+		mpstream_encode_uint((struct mpstream *)stream, num);
+}
+
+void
+mp_vstream_encode_map_commit(struct vstream *stream)
+{
+	(void)stream;
+}
+
+void
+mp_vstream_encode_array_commit(struct vstream *stream, uint32_t id)
+{
+	(void)stream;
+	(void)id;
+}
+
+const struct vstream_vtab mp_vstream_vtab = {
+	/** encode_array = */ (encode_array_f)mpstream_encode_array,
+	/** encode_map = */ (encode_map_f)mpstream_encode_map,
+	/** encode_uint = */ (encode_uint_f)mpstream_encode_uint,
+	/** encode_int = */ (encode_int_f)mpstream_encode_int,
+	/** encode_float = */ (encode_float_f)mpstream_encode_float,
+	/** encode_double = */ (encode_double_f)mpstream_encode_double,
+	/** encode_strn = */ (encode_strn_f)mpstream_encode_strn,
+	/** encode_nil = */ (encode_nil_f)mpstream_encode_nil,
+	/** encode_bool = */ (encode_bool_f)mpstream_encode_bool,
+	/** encode_enum = */ mp_vstream_encode_enum,
+	/** encode_port = */ mp_vstream_encode_port,
+	/** encode_array_commit = */ mp_vstream_encode_array_commit,
+	/** encode_map_commit = */ mp_vstream_encode_map_commit,
+};
+
+void
+mp_vstream_init_vtab(struct vstream *vstream)
+{
+	vstream->vtab = &mp_vstream_vtab;
+}
-- 
2.7.4





More information about the Tarantool-patches mailing list