[Tarantool-patches] [PATCH 2/3] box: introduce port_c

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Sun Mar 8 20:47:34 MSK 2020


Port_c is a new descendant of struct port. It is used now for
public C functions to store their result. Currently they can
return only a tuple, but it will change soon, they will be able to
return arbitrary MessagePack.

Port_tuple is not removed, because still is used for box_select(),
for functional indexes, and in SQL as a base for port_sql.
Although that may be changed later. Functional indexes really need
only a single MessagePack object from their function. While
box_select() working via port_tuple or port_c didn't show any
significant difference during micro benchmarks.

Part of #4641
---

As you can see, port_tuple is used in a very few places now. And
by performance it is not really different from the new port_c,
according to micro benchmarks I did. Moreover, it may be actually
faster for functional indexes - users won't need to create a tuple
just to return a functional index key, which is usually small, I
believe.

If you (reviewers) want me to drop port_tuple in scope of this
patchset, I can do that. At the moment of sending the patchset I
decided to postpone it.

 src/box/box.cc      |   2 +-
 src/box/func.c      |   2 +-
 src/box/lua/misc.cc |  20 +++++
 src/box/port.c      | 191 +++++++++++++++++++++++++++++++++++++++++---
 src/box/port.h      |  50 ++++++++++++
 src/box/sql/func.c  |  22 ++---
 6 files changed, 266 insertions(+), 21 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 09dd67ab4..276430913 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1070,7 +1070,7 @@ boxk(int type, uint32_t space_id, const char *format, ...)
 int
 box_return_tuple(box_function_ctx_t *ctx, box_tuple_t *tuple)
 {
-	return port_tuple_add(ctx->port, tuple);
+	return port_c_add_tuple(ctx->port, tuple);
 }
 
 /* schema_find_id()-like method using only public API */
diff --git a/src/box/func.c b/src/box/func.c
index 431577127..04b04b6ad 100644
--- a/src/box/func.c
+++ b/src/box/func.c
@@ -523,7 +523,7 @@ func_c_call(struct func *base, struct port *args, struct port *ret)
 	if (data == NULL)
 		return -1;
 
-	port_tuple_create(ret);
+	port_c_create(ret);
 	box_function_ctx_t ctx = { ret };
 
 	/* Module can be changed after function reload. */
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 79b6cfe3a..a6ba024ea 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -78,6 +78,26 @@ port_tuple_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
 	}
 }
 
+extern "C" void
+port_c_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
+{
+	struct port_c *port = (struct port_c *)base;
+	if (!is_flat)
+		lua_createtable(L, port->size, 0);
+	struct port_c_entry *pe = port->first;
+	const char *mp;
+	for (int i = 0; pe != NULL; pe = pe->next) {
+		if (pe->mp_size == 0) {
+			luaT_pushtuple(L, pe->tuple);
+		} else {
+			mp = pe->mp;
+			luamp_decode(L, luaL_msgpack_default, &mp);
+		}
+		if (!is_flat)
+			lua_rawseti(L, -2, ++i);
+	}
+}
+
 extern "C" void
 port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
 {
diff --git a/src/box/port.c b/src/box/port.c
index 6e2fe3a6e..925a4b2d5 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -37,7 +37,12 @@
 #include <fiber.h>
 #include "errinj.h"
 
-static struct mempool port_tuple_entry_pool;
+static struct mempool port_entry_pool;
+
+enum {
+	PORT_ENTRY_SIZE = MAX(sizeof(struct port_c_entry),
+			      sizeof(struct port_tuple_entry)),
+};
 
 int
 port_tuple_add(struct port *base, struct tuple *tuple)
@@ -49,7 +54,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
 		e = &port->first_entry;
 		port->first = port->last = e;
 	} else {
-		e = mempool_alloc(&port_tuple_entry_pool);
+		e = mempool_alloc(&port_entry_pool);
 		if (e == NULL) {
 			diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
 			return -1;
@@ -87,7 +92,7 @@ port_tuple_destroy(struct port *base)
 		struct port_tuple_entry *cur = e;
 		e = e->next;
 		tuple_unref(cur->tuple);
-		mempool_free(&port_tuple_entry_pool, cur);
+		mempool_free(&port_entry_pool, cur);
 	}
 }
 
@@ -127,28 +132,194 @@ port_tuple_dump_msgpack(struct port *base, struct obuf *out)
 extern void
 port_tuple_dump_lua(struct port *base, struct lua_State *L, bool is_flat);
 
+static inline void
+port_c_destroy_entry(struct port_c_entry *pe)
+{
+	if (pe->mp_size == 0)
+		tuple_unref(pe->tuple);
+	else if (pe->mp_size <= PORT_ENTRY_SIZE)
+		mempool_free(&port_entry_pool, pe->mp);
+	else
+		free(pe->mp);
+}
+
+static void
+port_c_destroy(struct port *base)
+{
+	struct port_c *port = (struct port_c *)base;
+	struct port_c_entry *pe = port->first;
+	if (pe == NULL)
+		return;
+	port_c_destroy_entry(pe);
+	pe = pe->next;
+	while (pe != NULL) {
+		struct port_c_entry *cur = pe;
+		pe = pe->next;
+		port_c_destroy_entry(cur);
+		mempool_free(&port_entry_pool, cur);
+	}
+}
+
+static inline struct port_c_entry *
+port_c_new_entry(struct port_c *port)
+{
+	struct port_c_entry *e;
+	if (port->size == 0) {
+		e = &port->first_entry;
+		port->first = e;
+		port->last = e;
+	} else {
+		e = mempool_alloc(&port_entry_pool);
+		if (e == NULL) {
+			diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
+			return NULL;
+		}
+		port->last->next = e;
+		port->last = e;
+	}
+	e->next = NULL;
+	++port->size;
+	return e;
+}
+
+int
+port_c_add_tuple(struct port *base, struct tuple *tuple)
+{
+	struct port_c *port = (struct port_c *)base;
+	struct port_c_entry *pe = port_c_new_entry(port);
+	if (pe != NULL) {
+		/* 0 mp_size means the entry stores a tuple. */
+		pe->mp_size = 0;
+		pe->tuple = tuple;
+		tuple_ref(tuple);
+		return 0;
+	}
+	return -1;
+}
+
+int
+port_c_add_mp(struct port *base, const char *mp, const char *mp_end)
+{
+	struct port_c *port = (struct port_c *)base;
+	struct port_c_entry *pe;
+	uint32_t size = mp_end - mp;
+	char *dst;
+	if (size <= PORT_ENTRY_SIZE) {
+		/*
+		 * Alloc on a mempool is several times faster than
+		 * on the heap. And it perfectly fits any
+		 * MessagePack number, a short string, a boolean.
+		 */
+		dst = mempool_alloc(&port_entry_pool);
+		if (dst == NULL) {
+			diag_set(OutOfMemory, size, "mempool_alloc", "dst");
+			return -1;
+		}
+	} else {
+		dst = malloc(size);
+		if (dst == NULL) {
+			diag_set(OutOfMemory, size, "malloc", "dst");
+			return -1;
+		}
+	}
+	pe = port_c_new_entry(port);
+	if (pe != NULL) {
+		memcpy(dst, mp, size);
+		pe->mp = dst;
+		pe->mp_size = size;
+		return 0;
+	}
+	if (size <= PORT_ENTRY_SIZE)
+		mempool_free(&port_entry_pool, dst);
+	else
+		free(dst);
+	return -1;
+}
+
+static int
+port_c_dump_msgpack_16(struct port *base, struct obuf *out)
+{
+	struct port_c *port = (struct port_c *)base;
+	struct port_c_entry *pe;
+	for (pe = port->first; pe != NULL; pe = pe->next) {
+		uint32_t size = pe->mp_size;
+		if (size == 0) {
+			if (tuple_to_obuf(pe->tuple, out) != 0)
+				return -1;
+		} else if (obuf_dup(out, pe->mp, size) != size) {
+			diag_set(OutOfMemory, size, "obuf_dup", "data");
+			return -1;
+		}
+		ERROR_INJECT(ERRINJ_PORT_DUMP, {
+			diag_set(OutOfMemory,
+				 size == 0 ? tuple_size(pe->tuple) : size,
+				 "obuf_dup", "data");
+			return -1;
+		});
+	}
+	return port->size;
+}
+
+static int
+port_c_dump_msgpack(struct port *base, struct obuf *out)
+{
+	struct port_c *port = (struct port_c *)base;
+	char *size_buf = obuf_alloc(out, mp_sizeof_array(port->size));
+	if (size_buf == NULL) {
+		diag_set(OutOfMemory, mp_sizeof_array(port->size), "obuf_alloc",
+			 "size_buf");
+		return -1;
+	}
+	mp_encode_array(size_buf, port->size);
+	if (port_c_dump_msgpack_16(base, out) < 0)
+		return -1;
+	return 1;
+}
+
+extern void
+port_c_dump_lua(struct port *port, struct lua_State *L, bool is_flat);
+
+extern struct sql_value *
+port_c_get_vdbemem(struct port *base, uint32_t *size);
+
+static const struct port_vtab port_c_vtab = {
+	.dump_msgpack = port_c_dump_msgpack,
+	.dump_msgpack_16 = port_c_dump_msgpack_16,
+	.dump_lua = port_c_dump_lua,
+	.dump_plain = NULL,
+	.get_msgpack = NULL,
+	.get_vdbemem = port_c_get_vdbemem,
+	.destroy = port_c_destroy,
+};
+
+void
+port_c_create(struct port *base)
+{
+	struct port_c *port = (struct port_c *)base;
+	port->vtab = &port_c_vtab;
+	port->first = NULL;
+	port->last = NULL;
+	port->size = 0;
+}
+
 void
 port_init(void)
 {
-	mempool_create(&port_tuple_entry_pool, &cord()->slabc,
-		       sizeof(struct port_tuple_entry));
+	mempool_create(&port_entry_pool, &cord()->slabc, PORT_ENTRY_SIZE);
 }
 
 void
 port_free(void)
 {
-	mempool_destroy(&port_tuple_entry_pool);
+	mempool_destroy(&port_entry_pool);
 }
 
-extern struct sql_value *
-port_tuple_get_vdbemem(struct port *base, uint32_t *size);
-
 const struct port_vtab port_tuple_vtab = {
 	.dump_msgpack = port_tuple_dump_msgpack,
 	.dump_msgpack_16 = port_tuple_dump_msgpack_16,
 	.dump_lua = port_tuple_dump_lua,
 	.dump_plain = NULL,
 	.get_msgpack = NULL,
-	.get_vdbemem = port_tuple_get_vdbemem,
+	.get_vdbemem = NULL,
 	.destroy = port_tuple_destroy,
 };
diff --git a/src/box/port.h b/src/box/port.h
index 9d3d02b3c..cd40d8a15 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -130,6 +130,56 @@ void
 port_vdbemem_create(struct port *base, struct sql_value *mem,
 		    uint32_t mem_count);
 
+/**
+ * The struct is PACKED to avoid unnecessary 4 byte padding
+ * after mp_size. These objects are never stored on stack, neither
+ * allocated as an array, so the padding is not needed.
+ */
+struct PACKED port_c_entry {
+	struct port_c_entry *next;
+	union {
+		/** Valid if mp_size is 0. */
+		struct tuple *tuple;
+		/**
+		 * Valid if mp_size is > 0. MessagePack is
+		 * allocated either on heap or on the port entry
+		 * mempool, if it fits into a pool object.
+		 */
+		char *mp;
+	};
+	uint32_t mp_size;
+};
+
+static_assert(sizeof(struct port_c_entry) == 20,
+	      "port_c_entry is expected to be perfectly aligned and packed");
+
+/**
+ * C port is used by C functions from the public API. They can
+ * return tuples and arbitrary MessagePack.
+ */
+struct port_c {
+	const struct port_vtab *vtab;
+	struct port_c_entry *first;
+	struct port_c_entry *last;
+	struct port_c_entry first_entry;
+	int size;
+};
+
+static_assert(sizeof(struct port_c) <= sizeof(struct port),
+	      "sizeof(struct port_c) must be <= sizeof(struct port)");
+
+/** Create a C port object. */
+void
+port_c_create(struct port *base);
+
+/** Append a tuple to the port. Tuple is referenced. */
+int
+port_c_add_tuple(struct port *port, struct tuple *tuple);
+
+/** Append raw MessagePack to the port. It is copied. */
+int
+port_c_add_mp(struct port *port, const char *mp, const char *mp_end);
+
 void
 port_init(void);
 
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index 6e724c824..70147da1e 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -299,9 +299,9 @@ error:
 }
 
 struct sql_value *
-port_tuple_get_vdbemem(struct port *base, uint32_t *size)
+port_c_get_vdbemem(struct port *base, uint32_t *size)
 {
-	struct port_tuple *port = (struct port_tuple *)base;
+	struct port_c *port = (struct port_c *)base;
 	*size = port->size;
 	if (*size == 0 || *size > 1) {
 		diag_set(ClientError, ER_SQL_FUNC_WRONG_RET_COUNT, "C", *size);
@@ -315,14 +315,18 @@ port_tuple_get_vdbemem(struct port *base, uint32_t *size)
 	if (val == NULL)
 		return NULL;
 	int i = 0;
-	struct port_tuple_entry *pe;
+	const char *data;
+	struct port_c_entry *pe;
 	for (pe = port->first; pe != NULL; pe = pe->next) {
-		const char *data = tuple_data(pe->tuple);
-		if (mp_typeof(*data) != MP_ARRAY ||
-			mp_decode_array(&data) != 1) {
-			diag_set(ClientError, ER_SQL_EXECUTE,
-				 "Unsupported type passed from C");
-			goto error;
+		if (pe->mp_size == 0) {
+			data = tuple_data(pe->tuple);
+			if (mp_decode_array(&data) != 1) {
+				diag_set(ClientError, ER_SQL_EXECUTE,
+					 "Unsupported type passed from C");
+				goto error;
+			}
+		} else {
+			data = pe->mp;
 		}
 		uint32_t len;
 		const char *str;
-- 
2.21.1 (Apple Git-122.3)



More information about the Tarantool-patches mailing list