[Tarantool-patches] [PATCH v2 1/3] box: introduce port_c
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Thu Apr 23 03:12:39 MSK 2020
Port_c is a new descendant of struct port. It is used now for
public C functions to store their result. Currently they can
return only a tuple, but it will change soon, they will be able to
return arbitrary MessagePack.
Port_tuple is not removed, because still is used for box_select(),
for functional indexes, and in SQL as a base for port_sql.
Although that may be changed later. Functional indexes really need
only a single MessagePack object from their function. While
box_select() working via port_tuple or port_c didn't show any
significant difference during micro benchmarks.
Part of #4641
---
src/box/box.cc | 2 +-
src/box/func.c | 2 +-
src/box/lua/misc.cc | 20 +++++
src/box/port.c | 206 +++++++++++++++++++++++++++++++++++++++++---
src/box/port.h | 50 +++++++++++
src/box/sql/func.c | 22 +++--
6 files changed, 281 insertions(+), 21 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index f4541b577..12f3a50a7 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1072,7 +1072,7 @@ boxk(int type, uint32_t space_id, const char *format, ...)
int
box_return_tuple(box_function_ctx_t *ctx, box_tuple_t *tuple)
{
- return port_tuple_add(ctx->port, tuple);
+ return port_c_add_tuple(ctx->port, tuple);
}
/* schema_find_id()-like method using only public API */
diff --git a/src/box/func.c b/src/box/func.c
index 431577127..04b04b6ad 100644
--- a/src/box/func.c
+++ b/src/box/func.c
@@ -523,7 +523,7 @@ func_c_call(struct func *base, struct port *args, struct port *ret)
if (data == NULL)
return -1;
- port_tuple_create(ret);
+ port_c_create(ret);
box_function_ctx_t ctx = { ret };
/* Module can be changed after function reload. */
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index 4348dcbb2..55677ed61 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -78,6 +78,26 @@ port_tuple_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
}
}
+extern "C" void
+port_c_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
+{
+ struct port_c *port = (struct port_c *)base;
+ if (!is_flat)
+ lua_createtable(L, port->size, 0);
+ struct port_c_entry *pe = port->first;
+ const char *mp;
+ for (int i = 0; pe != NULL; pe = pe->next) {
+ if (pe->mp_size == 0) {
+ luaT_pushtuple(L, pe->tuple);
+ } else {
+ mp = pe->mp;
+ luamp_decode(L, luaL_msgpack_default, &mp);
+ }
+ if (!is_flat)
+ lua_rawseti(L, -2, ++i);
+ }
+}
+
extern "C" void
port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat)
{
diff --git a/src/box/port.c b/src/box/port.c
index 6e2fe3a6e..2c1fadb5c 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -37,7 +37,18 @@
#include <fiber.h>
#include "errinj.h"
-static struct mempool port_tuple_entry_pool;
+/**
+ * The pools is used both by port_c and port_tuple, since their
+ * entires are almost of the same size. Also port_c can use
+ * objects from the pool to store result data in their memory,
+ * when it fits.
+ */
+static struct mempool port_entry_pool;
+
+enum {
+ PORT_ENTRY_SIZE = MAX(sizeof(struct port_c_entry),
+ sizeof(struct port_tuple_entry)),
+};
int
port_tuple_add(struct port *base, struct tuple *tuple)
@@ -49,7 +60,7 @@ port_tuple_add(struct port *base, struct tuple *tuple)
e = &port->first_entry;
port->first = port->last = e;
} else {
- e = mempool_alloc(&port_tuple_entry_pool);
+ e = mempool_alloc(&port_entry_pool);
if (e == NULL) {
diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
return -1;
@@ -87,7 +98,7 @@ port_tuple_destroy(struct port *base)
struct port_tuple_entry *cur = e;
e = e->next;
tuple_unref(cur->tuple);
- mempool_free(&port_tuple_entry_pool, cur);
+ mempool_free(&port_entry_pool, cur);
}
}
@@ -127,28 +138,203 @@ port_tuple_dump_msgpack(struct port *base, struct obuf *out)
extern void
port_tuple_dump_lua(struct port *base, struct lua_State *L, bool is_flat);
+static inline void
+port_c_destroy_entry(struct port_c_entry *pe)
+{
+ /*
+ * See port_c_add_*() for algorithm of how and where to
+ * store data, to understand why it is freed differently.
+ */
+ if (pe->mp_size == 0)
+ tuple_unref(pe->tuple);
+ else if (pe->mp_size <= PORT_ENTRY_SIZE)
+ mempool_free(&port_entry_pool, pe->mp);
+ else
+ free(pe->mp);
+}
+
+static void
+port_c_destroy(struct port *base)
+{
+ struct port_c *port = (struct port_c *)base;
+ struct port_c_entry *pe = port->first;
+ if (pe == NULL)
+ return;
+ port_c_destroy_entry(pe);
+ /*
+ * Port->first is skipped, it is pointing at
+ * port_c.first_entry, and is freed together with the
+ * port.
+ */
+ pe = pe->next;
+ while (pe != NULL) {
+ struct port_c_entry *cur = pe;
+ pe = pe->next;
+ port_c_destroy_entry(cur);
+ mempool_free(&port_entry_pool, cur);
+ }
+}
+
+static inline struct port_c_entry *
+port_c_new_entry(struct port_c *port)
+{
+ struct port_c_entry *e;
+ if (port->size == 0) {
+ e = &port->first_entry;
+ port->first = e;
+ port->last = e;
+ } else {
+ e = mempool_alloc(&port_entry_pool);
+ if (e == NULL) {
+ diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
+ return NULL;
+ }
+ port->last->next = e;
+ port->last = e;
+ }
+ e->next = NULL;
+ ++port->size;
+ return e;
+}
+
+int
+port_c_add_tuple(struct port *base, struct tuple *tuple)
+{
+ struct port_c *port = (struct port_c *)base;
+ struct port_c_entry *pe = port_c_new_entry(port);
+ if (pe == NULL)
+ return -1;
+ /* 0 mp_size means the entry stores a tuple. */
+ pe->mp_size = 0;
+ pe->tuple = tuple;
+ tuple_ref(tuple);
+ return 0;
+}
+
+int
+port_c_add_mp(struct port *base, const char *mp, const char *mp_end)
+{
+ struct port_c *port = (struct port_c *)base;
+ struct port_c_entry *pe;
+ assert(mp_end > mp);
+ uint32_t size = mp_end - mp;
+ char *dst;
+ if (size <= PORT_ENTRY_SIZE) {
+ /*
+ * Alloc on a mempool is several times faster than
+ * on the heap. And it perfectly fits any
+ * MessagePack number, a short string, a boolean.
+ */
+ dst = mempool_alloc(&port_entry_pool);
+ if (dst == NULL) {
+ diag_set(OutOfMemory, size, "mempool_alloc", "dst");
+ return -1;
+ }
+ } else {
+ dst = malloc(size);
+ if (dst == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "dst");
+ return -1;
+ }
+ }
+ pe = port_c_new_entry(port);
+ if (pe != NULL) {
+ memcpy(dst, mp, size);
+ pe->mp = dst;
+ pe->mp_size = size;
+ return 0;
+ }
+ if (size <= PORT_ENTRY_SIZE)
+ mempool_free(&port_entry_pool, dst);
+ else
+ free(dst);
+ return -1;
+}
+
+static int
+port_c_dump_msgpack_16(struct port *base, struct obuf *out)
+{
+ struct port_c *port = (struct port_c *)base;
+ struct port_c_entry *pe;
+ for (pe = port->first; pe != NULL; pe = pe->next) {
+ uint32_t size = pe->mp_size;
+ if (size == 0) {
+ if (tuple_to_obuf(pe->tuple, out) != 0)
+ return -1;
+ } else if (obuf_dup(out, pe->mp, size) != size) {
+ diag_set(OutOfMemory, size, "obuf_dup", "data");
+ return -1;
+ }
+ ERROR_INJECT(ERRINJ_PORT_DUMP, {
+ diag_set(OutOfMemory,
+ size == 0 ? tuple_size(pe->tuple) : size,
+ "obuf_dup", "data");
+ return -1;
+ });
+ }
+ return port->size;
+}
+
+static int
+port_c_dump_msgpack(struct port *base, struct obuf *out)
+{
+ struct port_c *port = (struct port_c *)base;
+ char *size_buf = obuf_alloc(out, mp_sizeof_array(port->size));
+ if (size_buf == NULL) {
+ diag_set(OutOfMemory, mp_sizeof_array(port->size), "obuf_alloc",
+ "size_buf");
+ return -1;
+ }
+ mp_encode_array(size_buf, port->size);
+ if (port_c_dump_msgpack_16(base, out) < 0)
+ return -1;
+ return 1;
+}
+
+extern void
+port_c_dump_lua(struct port *port, struct lua_State *L, bool is_flat);
+
+extern struct sql_value *
+port_c_get_vdbemem(struct port *base, uint32_t *size);
+
+static const struct port_vtab port_c_vtab = {
+ .dump_msgpack = port_c_dump_msgpack,
+ .dump_msgpack_16 = port_c_dump_msgpack_16,
+ .dump_lua = port_c_dump_lua,
+ .dump_plain = NULL,
+ .get_msgpack = NULL,
+ .get_vdbemem = port_c_get_vdbemem,
+ .destroy = port_c_destroy,
+};
+
+void
+port_c_create(struct port *base)
+{
+ struct port_c *port = (struct port_c *)base;
+ port->vtab = &port_c_vtab;
+ port->first = NULL;
+ port->last = NULL;
+ port->size = 0;
+}
+
void
port_init(void)
{
- mempool_create(&port_tuple_entry_pool, &cord()->slabc,
- sizeof(struct port_tuple_entry));
+ mempool_create(&port_entry_pool, &cord()->slabc, PORT_ENTRY_SIZE);
}
void
port_free(void)
{
- mempool_destroy(&port_tuple_entry_pool);
+ mempool_destroy(&port_entry_pool);
}
-extern struct sql_value *
-port_tuple_get_vdbemem(struct port *base, uint32_t *size);
-
const struct port_vtab port_tuple_vtab = {
.dump_msgpack = port_tuple_dump_msgpack,
.dump_msgpack_16 = port_tuple_dump_msgpack_16,
.dump_lua = port_tuple_dump_lua,
.dump_plain = NULL,
.get_msgpack = NULL,
- .get_vdbemem = port_tuple_get_vdbemem,
+ .get_vdbemem = NULL,
.destroy = port_tuple_destroy,
};
diff --git a/src/box/port.h b/src/box/port.h
index fa6c1374d..4d6a5b399 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -145,6 +145,56 @@ void
port_vdbemem_create(struct port *base, struct sql_value *mem,
uint32_t mem_count);
+/**
+ * The struct is PACKED to avoid unnecessary 4 byte padding
+ * after mp_size. These objects are never stored on stack, neither
+ * allocated as an array, so the padding is not needed.
+ */
+struct PACKED port_c_entry {
+ struct port_c_entry *next;
+ union {
+ /** Valid if mp_size is 0. */
+ struct tuple *tuple;
+ /**
+ * Valid if mp_size is > 0. MessagePack is
+ * allocated either on heap or on the port entry
+ * mempool, if it fits into a pool object.
+ */
+ char *mp;
+ };
+ uint32_t mp_size;
+};
+
+static_assert(sizeof(struct port_c_entry) == 20,
+ "port_c_entry is expected to be perfectly aligned and packed");
+
+/**
+ * C port is used by C functions from the public API. They can
+ * return tuples and arbitrary MessagePack.
+ */
+struct port_c {
+ const struct port_vtab *vtab;
+ struct port_c_entry *first;
+ struct port_c_entry *last;
+ struct port_c_entry first_entry;
+ int size;
+};
+
+static_assert(sizeof(struct port_c) <= sizeof(struct port),
+ "sizeof(struct port_c) must be <= sizeof(struct port)");
+
+/** Create a C port object. */
+void
+port_c_create(struct port *base);
+
+/** Append a tuple to the port. Tuple is referenced. */
+int
+port_c_add_tuple(struct port *port, struct tuple *tuple);
+
+/** Append raw MessagePack to the port. It is copied. */
+int
+port_c_add_mp(struct port *port, const char *mp, const char *mp_end);
+
void
port_init(void);
diff --git a/src/box/sql/func.c b/src/box/sql/func.c
index 376837217..2c510940b 100644
--- a/src/box/sql/func.c
+++ b/src/box/sql/func.c
@@ -301,9 +301,9 @@ error:
}
struct sql_value *
-port_tuple_get_vdbemem(struct port *base, uint32_t *size)
+port_c_get_vdbemem(struct port *base, uint32_t *size)
{
- struct port_tuple *port = (struct port_tuple *)base;
+ struct port_c *port = (struct port_c *)base;
*size = port->size;
if (*size == 0 || *size > 1) {
diag_set(ClientError, ER_SQL_FUNC_WRONG_RET_COUNT, "C", *size);
@@ -317,14 +317,18 @@ port_tuple_get_vdbemem(struct port *base, uint32_t *size)
if (val == NULL)
return NULL;
int i = 0;
- struct port_tuple_entry *pe;
+ const char *data;
+ struct port_c_entry *pe;
for (pe = port->first; pe != NULL; pe = pe->next) {
- const char *data = tuple_data(pe->tuple);
- if (mp_typeof(*data) != MP_ARRAY ||
- mp_decode_array(&data) != 1) {
- diag_set(ClientError, ER_SQL_EXECUTE,
- "Unsupported type passed from C");
- goto error;
+ if (pe->mp_size == 0) {
+ data = tuple_data(pe->tuple);
+ if (mp_decode_array(&data) != 1) {
+ diag_set(ClientError, ER_SQL_EXECUTE,
+ "Unsupported type passed from C");
+ goto error;
+ }
+ } else {
+ data = pe->mp;
}
uint32_t len;
const char *str;
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list