[Tarantool-patches] [PATCH v2 1/1] box: export box_session_push to the public C API
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Wed Apr 8 02:20:47 MSK 2020
API is different from box.session.push() - sync argument was
removed. It will disappear from Lua API as well, because it just
is not needed here. Session is omitted as well. Indeed, a user
can't push to a foreign session, and the current session can be
obtained inside box_session_push(). And anyway session is not in
the public C API.
Internally dump into iproto is done using obuf_dup(), just like
tuple_to_obuf() does. obuf_alloc() would be a bad call here,
because it wouldn't be able to split the pushed data into several
obuf chunks, and would cause obuf fragmentation.
Dump into plain text behaves just like a Lua push - it produces a
YAML formatted text or Lua text depending on output format. But to
turn MessagePack into YAML or Lua text an intermediate Lua
representation is used, because there are no a MessagePack -> YAML
and MessagePack -> Lua text translators yet.
Closes #4734
@TarantoolBot document
Title: box_session_push() C API
There is a new function in the public C API:
```C
int
box_session_push(const char *data, const char *data_end);
```
It takes raw MessagePack, and behaves just like Lua
`box.session.push()`.
---
Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4734-export-box-session-push
Issue: https://github.com/tarantool/tarantool/issues/4734
Changes in v2:
- Rebased on Chris' patch for box.session.push() Lua text format;
- Fixed Igor's comments.
@ChangeLog
- box_session_push() new public C API function. It takes a
const char * MessagePack, and returns it to the client
out of order, like Lua analogue (box.session.push()) (gh-4734).
extra/exports | 1 +
src/box/box.cc | 14 +++++
src/box/box.h | 14 +++++
src/box/call.c | 44 +++++++++++++-
src/box/lua/console.c | 75 ++++++++++++++++++++---
src/box/port.h | 15 +++++
test/box/function1.c | 7 +++
test/box/push.result | 133 +++++++++++++++++++++++++++++++++++++++++
test/box/push.test.lua | 51 ++++++++++++++++
9 files changed, 344 insertions(+), 10 deletions(-)
diff --git a/extra/exports b/extra/exports
index f71cb7d93..2c19b5445 100644
--- a/extra/exports
+++ b/extra/exports
@@ -220,6 +220,7 @@ box_sequence_next
box_sequence_current
box_sequence_set
box_sequence_reset
+box_session_push
box_index_iterator
box_iterator_next
box_iterator_free
diff --git a/src/box/box.cc b/src/box/box.cc
index 765d64678..15e79df19 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1455,6 +1455,20 @@ box_sequence_reset(uint32_t seq_id)
return sequence_data_delete(seq_id);
}
+int
+box_session_push(const char *data, const char *data_end)
+{
+ struct session *session = current_session();
+ if (session == NULL)
+ return -1;
+ struct port_msgpack port;
+ struct port *base = (struct port *) &port;
+ port_msgpack_create(base, data, data_end - data);
+ int rc = session_push(session, session_sync(session), base);
+ port_msgpack_destroy(base);
+ return rc;
+}
+
static inline void
box_register_replica(uint32_t id, const struct tt_uuid *uuid)
{
diff --git a/src/box/box.h b/src/box/box.h
index 044d929d4..c94e500ab 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -457,6 +457,20 @@ box_sequence_set(uint32_t seq_id, int64_t value);
API_EXPORT int
box_sequence_reset(uint32_t seq_id);
+/**
+ * Push MessagePack data into a session data channel - socket,
+ * console or whatever is behind the session. Note, that
+ * successful push does not guarantee delivery in case it was sent
+ * into the network. Just like with write()/send() system calls.
+ *
+ * \param data begin of MessagePack to push
+ * \param data_end end of MessagePack to push
+ * \retval -1 on error (check box_error_last())
+ * \retval 0 on success
+ */
+API_EXPORT int
+box_session_push(const char *data, const char *data_end);
+
/** \endcond public */
/**
diff --git a/src/box/call.c b/src/box/call.c
index a46a61c3c..be4f71f59 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -64,17 +64,55 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size)
return port->data;
}
+static int
+port_msgpack_dump_msgpack(struct port *base, struct obuf *out)
+{
+ struct port_msgpack *port = (struct port_msgpack *) base;
+ assert(port->vtab == &port_msgpack_vtab);
+ size_t size = port->data_sz;
+ if (obuf_dup(out, port->data, size) == size)
+ return 0;
+ diag_set(OutOfMemory, size, "obuf_dup", "port->data");
+ return -1;
+}
+
extern void
port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat);
+extern const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size);
+
+void
+port_msgpack_destroy(struct port *base)
+{
+ struct port_msgpack *port = (struct port_msgpack *) base;
+ assert(port->vtab == &port_msgpack_vtab);
+ free(port->plain);
+}
+
+char *
+port_mspack_set_plain(struct port *base, const char *plain, uint32_t len)
+{
+ struct port_msgpack *port = (struct port_msgpack *) base;
+ assert(port->plain == NULL);
+ port->plain = (char *)malloc(len + 1);
+ if (port->plain == NULL) {
+ diag_set(OutOfMemory, len + 1, "malloc", "port->plain");
+ return NULL;
+ }
+ memcpy(port->plain, plain, len);
+ port->plain[len] = 0;
+ return port->plain;
+}
+
static const struct port_vtab port_msgpack_vtab = {
- .dump_msgpack = NULL,
+ .dump_msgpack = port_msgpack_dump_msgpack,
.dump_msgpack_16 = NULL,
.dump_lua = port_msgpack_dump_lua,
- .dump_plain = NULL,
+ .dump_plain = port_msgpack_dump_plain,
.get_msgpack = port_msgpack_get_msgpack,
.get_vdbemem = NULL,
- .destroy = NULL,
+ .destroy = port_msgpack_destroy,
};
int
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index bd454c269..396cb87f2 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -37,6 +37,7 @@
#include "lua/fiber.h"
#include "fiber.h"
#include "coio.h"
+#include "lua/msgpack.h"
#include "lua-yaml/lyaml.h"
#include <lua.h>
#include <lauxlib.h>
@@ -390,19 +391,17 @@ console_set_output_format(enum output_format output_format)
}
/**
- * Dump port lua data with respect to output format:
+ * Dump Lua data to text with respect to output format:
* YAML document tagged with !push! global tag or Lua string.
- * @param port Port lua.
+ * @param L Lua state.
* @param[out] size Size of the result.
*
- * @retval not NULL Tagged YAML document.
+ * @retval not NULL Tagged YAML document or Lua text.
* @retval NULL Error.
*/
-const char *
-port_lua_dump_plain(struct port *port, uint32_t *size)
+static const char *
+lua_dump_plain(struct lua_State *L, uint32_t *size)
{
- struct port_lua *port_lua = (struct port_lua *) port;
- struct lua_State *L = port_lua->L;
enum output_format fmt = console_get_output_format();
if (fmt == OUTPUT_FORMAT_YAML) {
int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
@@ -435,6 +434,68 @@ port_lua_dump_plain(struct port *port, uint32_t *size)
return result;
}
+/** Plain text converter for port Lua data. */
+const char *
+port_lua_dump_plain(struct port *base, uint32_t *size)
+{
+ return lua_dump_plain(((struct port_lua *)base)->L, size);
+}
+
+/**
+ * A helper for port_msgpack_dump_plain() to execute it safely
+ * regarding Lua errors.
+ */
+static int
+lua_port_msgpack_dump_plain(struct lua_State *L)
+{
+ char **result = (char **)lua_touserdata(L, lua_upvalueindex(1));
+ struct port_msgpack *port =
+ (struct port_msgpack *)lua_touserdata(L, lua_upvalueindex(2));
+ uint32_t *size = (uint32_t *)lua_touserdata(L, lua_upvalueindex(3));
+ const char *data = port->data;
+ /*
+ * MessagePack -> Lua object -> YAML/Lua text. The middle
+ * is not really needed here, but there is no
+ * MessagePack -> YAML encoder yet. Neither
+ * MessagePack -> Lua text.
+ */
+ luamp_decode(L, luaL_msgpack_default, &data);
+ data = lua_dump_plain(L, size);
+ if (data == NULL) {
+ *result = NULL;
+ return 0;
+ }
+ *result = port_mspack_set_plain((struct port *)port, data, *size);
+ return 0;
+ }
+
+/** Plain text converter for raw MessagePack. */
+const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size)
+{
+ struct lua_State *L = tarantool_L;
+ char *result = NULL;
+ int top = lua_gettop(L);
+ (void) top;
+ lua_pushlightuserdata(L, &result);
+ lua_pushlightuserdata(L, base);
+ lua_pushlightuserdata(L, size);
+ lua_pushcclosure(L, lua_port_msgpack_dump_plain, 3);
+ if (lua_pcall(L, 0, 0, 0) != 0) {
+ /*
+ * Error string is pushed in case it was a Lua
+ * error.
+ */
+ assert(lua_isstring(L, -1));
+ diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+ lua_pop(L, 1);
+ assert(lua_gettop(L) == top);
+ return NULL;
+ }
+ assert(lua_gettop(L) == top);
+ return result;
+}
+
/**
* Push a tagged YAML document or a Lua string into a console
* socket.
diff --git a/src/box/port.h b/src/box/port.h
index 9d3d02b3c..7ef5a2c63 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -86,6 +86,11 @@ struct port_msgpack {
const struct port_vtab *vtab;
const char *data;
uint32_t data_sz;
+ /**
+ * Buffer for dump_plain() function. It is created during
+ * dump on demand and is deleted together with the port.
+ */
+ char *plain;
};
static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
@@ -95,6 +100,16 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
void
port_msgpack_create(struct port *port, const char *data, uint32_t data_sz);
+/** Destroy a MessagePack port. */
+void
+port_msgpack_destroy(struct port *base);
+
+/**
+ * Set plain text version of data in the given port. It is copied.
+ */
+char *
+port_mspack_set_plain(struct port *base, const char *plain, uint32_t len);
+
/** Port for storing the result of a Lua CALL/EVAL. */
struct port_lua {
const struct port_vtab *vtab;
diff --git a/test/box/function1.c b/test/box/function1.c
index 87062d6a8..b2ce752a9 100644
--- a/test/box/function1.c
+++ b/test/box/function1.c
@@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end)
fiber_sleep(0);
return 0;
}
+
+int
+test_push(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+ (void) ctx;
+ return box_session_push(args, args_end);
+}
diff --git a/test/box/push.result b/test/box/push.result
index aebcb7501..f5d8fe563 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -563,3 +563,136 @@ box.schema.func.drop('do_long_and_push')
box.session.on_disconnect(nil, on_disconnect)
---
...
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+---
+...
+old_cpath = package.cpath
+---
+...
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+---
+...
+box.schema.func.create('function1.test_push', {language = 'C'})
+---
+...
+box.schema.user.grant('guest', 'super')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+messages = {}
+---
+...
+c:call('function1.test_push', \
+ {1, 2, 3}, \
+ {on_push = table.insert, \
+ on_push_ctx = messages})
+---
+- []
+...
+messages
+---
+- - [1, 2, 3]
+...
+c:close()
+---
+...
+--
+-- C can push to the console.
+--
+ffi = require('ffi')
+---
+...
+cd = ffi.new('char[3]')
+---
+...
+cd[0] = 65
+---
+...
+cd[1] = 0
+---
+...
+cd[2] = 67
+---
+...
+-- A string having 0 byte inside. Check that it is handled fine.
+s = ffi.string(cd, 3)
+---
+...
+console = require('console')
+---
+...
+fio = require('fio')
+---
+...
+socket = require('socket')
+---
+...
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+---
+...
+_ = fio.unlink(sock_path)
+---
+...
+server = console.listen(sock_path)
+---
+...
+client = socket.tcp_connect('unix/', sock_path)
+---
+...
+_ = client:read({chunk = 128})
+---
+...
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+---
+...
+client:read("\n...\n")
+---
+- '%TAG !push! tag:tarantool.io/push,2018
+
+ --- [1, 2, 3, "A\0C"]
+
+ ...
+
+ '
+...
+_ = client:read("\n...\n")
+---
+...
+-- Lua output format is supported too.
+_ = client:write("\\set output lua\n")
+---
+...
+_ = client:read(";")
+---
+...
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+---
+...
+client:read(";")
+---
+- '-- Push
+
+ {1, 2, 3, "A\0C"};'
+...
+_ = client:read(";")
+---
+...
+client:close()
+---
+- true
+...
+server:close()
+---
+- true
+...
+box.schema.user.revoke('guest', 'super')
+---
+...
+box.schema.func.drop('function1.test_push')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index 7ae6f4a86..d2dc31db6 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -264,3 +264,54 @@ chan_push:put(true)
chan_push:get()
box.schema.func.drop('do_long_and_push')
box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+old_cpath = package.cpath
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+
+box.schema.func.create('function1.test_push', {language = 'C'})
+box.schema.user.grant('guest', 'super')
+c = netbox.connect(box.cfg.listen)
+messages = {}
+c:call('function1.test_push', \
+ {1, 2, 3}, \
+ {on_push = table.insert, \
+ on_push_ctx = messages})
+messages
+c:close()
+--
+-- C can push to the console.
+--
+ffi = require('ffi')
+cd = ffi.new('char[3]')
+cd[0] = 65
+cd[1] = 0
+cd[2] = 67
+-- A string having 0 byte inside. Check that it is handled fine.
+s = ffi.string(cd, 3)
+
+console = require('console')
+fio = require('fio')
+socket = require('socket')
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+_ = fio.unlink(sock_path)
+server = console.listen(sock_path)
+client = socket.tcp_connect('unix/', sock_path)
+_ = client:read({chunk = 128})
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+client:read("\n...\n")
+_ = client:read("\n...\n")
+-- Lua output format is supported too.
+_ = client:write("\\set output lua\n")
+_ = client:read(";")
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+client:read(";")
+_ = client:read(";")
+client:close()
+server:close()
+
+box.schema.user.revoke('guest', 'super')
+box.schema.func.drop('function1.test_push')
--
2.21.1 (Apple Git-122.3)
More information about the Tarantool-patches
mailing list