[Tarantool-patches] [PATCH 1/1] box: export box_session_push to the public C API

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Wed Jan 22 03:06:40 MSK 2020


Thanks for the review!

> I see that box_return_tuple() receives (box_function_ctx_t *) as the
> first argument and wonder why box_session_push() does not. If we able to
> determine where to and how to send a push w/o the context, then the
> context is not necessary in box_return_tuple() too? I don't very
> familiar with this part of codebase, so it is just question for now.
> I'll look around soon if time permits.

Context of session push is a session. It is available in fiber, which
is a global variable.

Context of a function is its port - a storage for returned values. Port
is not available in the fiber, and therefore is not stored anywhere
globally. It needs to be passed explicitly. We pass it as an opaque
pointer box_function_ctx_t*.

We can add a context to the push, but then we need to create a public
version of struct session. Something like 'box_session_t', which would
be an alias for 'struct session'. And have a function like
'box_session_current()', which in turn won't have a context.

I think we need to introduce box_session_t only if we are going to
allow users to access foreign sessions. Otherwise it is always the
user's own session and is available in the fiber anyway.


I added YAML formatting. But it looks crutchy. Mainly because we don't
have a MessagePack -> YAML converter. I used Lua representation as an
intermediate translator. I guess we can comb plain text formatting
(YAML format, Lua output format) in scope of
https://github.com/tarantool/tarantool/issues/4686.

================================================================================

commit 63cdf88fbb3ed9cf7e77cd04efcee381da64cd53
Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date:   Mon Jan 20 23:08:58 2020 +0100

    box: export box_session_push to the public C API
    
    API is different from box.session.push() - sync argument was
    removed. It will disappear from Lua API as well, according to
    a part of public API, and because it just is not needed here.
    Session is omitted as well. Indeed, a user can't push to a foreign
    session, and the current session can be obtained inside
    box_session_push(). And anyway session is not in the public C API.
    
    Internally dump into iproto is done using obuf_dup(), just like
    tuple_to_obuf() does. obuf_alloc() would be a bad call here,
    because it wouldn't be able to split the pushed data into several
    obuf chunks, and would cause obuf fragmentation.
    
    Dump into plain text behaves just like a Lua push - it returns a
    YAML formatted text. But to turn MessagePack into YAML an
    intermediate Lua representation is used, because there is no a
    MessagePack -> YAML translator yet.
    
    Closes #4734
    
    @TarantoolBot document
    Title: box_session_push() C API
    
    There is a new function in the public C API:
    
        int
        box_session_push(const char *data, const char *data_end);
    
    It takes raw MessagePack, and behaves just like Lua
    box.session.push().

diff --git a/extra/exports b/extra/exports
index 7b84a1452..bc3759244 100644
--- a/extra/exports
+++ b/extra/exports
@@ -216,6 +216,7 @@ box_truncate
 box_sequence_next
 box_sequence_set
 box_sequence_reset
+box_session_push
 box_index_iterator
 box_iterator_next
 box_iterator_free
diff --git a/src/box/box.cc b/src/box/box.cc
index 1b2b27d61..a32c8cdbd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1433,6 +1433,20 @@ box_sequence_reset(uint32_t seq_id)
 	return sequence_data_delete(seq_id);
 }
 
+int
+box_session_push(const char *data, const char *data_end)
+{
+	struct session *session = current_session();
+	if (session == NULL)
+		return -1;
+	struct port_msgpack port;
+	struct port *base = (struct port *) &port;
+	port_msgpack_create(base, data, data_end - data);
+	int rc = session_push(session, session_sync(session), base);
+	port_msgpack_destroy(base);
+	return rc;
+}
+
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
diff --git a/src/box/box.h b/src/box/box.h
index a212e6510..93773cedd 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -442,6 +442,20 @@ box_sequence_set(uint32_t seq_id, int64_t value);
 API_EXPORT int
 box_sequence_reset(uint32_t seq_id);
 
+/**
+ * Push MessagePack data into a session data channel - socket,
+ * console or whatever is behind the session. Note, that
+ * successful push does not guarantee delivery in case it was sent
+ * into the network. Just like with write()/send() system calls.
+ *
+ * \param data begin of MessagePack to push
+ * \param data_end end of MessagePack to push
+ * \retval -1 on error (check box_error_last())
+ * \retval 0 on success
+ */
+API_EXPORT int
+box_session_push(const char *data, const char *data_end);
+
 /** \endcond public */
 
 /**
diff --git a/src/box/call.c b/src/box/call.c
index bcaa453ea..00aa3845b 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -64,17 +64,40 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size)
 	return port->data;
 }
 
+static int
+port_msgpack_dump_msgpack(struct port *base, struct obuf *out)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	assert(port->vtab == &port_msgpack_vtab);
+	size_t size = port->data_sz;
+	if (obuf_dup(out, port->data, size) == size)
+		return 0;
+	diag_set(OutOfMemory, size, "obuf_dup", "port->data");
+	return -1;
+}
+
 extern void
 port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat);
 
+extern const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size);
+
+void
+port_msgpack_destroy(struct port *base)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	assert(port->vtab == &port_msgpack_vtab);
+	free(port->buffer);
+}
+
 static const struct port_vtab port_msgpack_vtab = {
-	.dump_msgpack = NULL,
+	.dump_msgpack = port_msgpack_dump_msgpack,
 	.dump_msgpack_16 = NULL,
 	.dump_lua = port_msgpack_dump_lua,
-	.dump_plain = NULL,
+	.dump_plain = port_msgpack_dump_plain,
 	.get_msgpack = port_msgpack_get_msgpack,
 	.get_vdbemem = NULL,
-	.destroy = NULL,
+	.destroy = port_msgpack_destroy,
 };
 
 int
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 57e7e7f4f..fbf1a76cc 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -37,6 +37,7 @@
 #include "lua/fiber.h"
 #include "fiber.h"
 #include "coio.h"
+#include "lua/msgpack.h"
 #include "lua-yaml/lyaml.h"
 #include <lua.h>
 #include <lauxlib.h>
@@ -410,6 +411,70 @@ port_lua_dump_plain(struct port *port, uint32_t *size)
 	return result;
 }
 
+/** The same as port_lua plain dump, but from raw MessagePack. */
+const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	struct lua_State *L = luaT_newthread(tarantool_L);
+	if (L == NULL)
+		return NULL;
+	/*
+	 * Create a new thread and pop it immediately from the
+	 * main stack. So as it would not stay there in case
+	 * something would throw in this function.
+	 */
+	int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
+	/*
+	 * MessagePack -> Lua -> YAML. The middle is not really
+	 * needed here, but there is no MessagePack -> YAML
+	 * encoder yet.
+	 */
+	const char *data = port->data;
+	luamp_decode(L, luaL_msgpack_default, &data);
+	int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
+				 "tag:tarantool.io/push,2018");
================================================================================

luamp_decode and lua_yaml_encode can throw a Lua error. Don't know how to
fix that except introduction of a MessagePack -> YAML converter, which seems
a pretty big and independent task.

================================================================================
+	if (rc == 2) {
+		/*
+		 * Nil and error object are pushed onto the stack.
+		 */
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		goto error;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	/*
+	 * The result string should be copied, because the stack,
+	 * keeping the string, is going to be destroyed in the
+	 * next lines.
+	 * Can't use region, because somebody should free it, and
+	 * its purpose is to free many objects at once. In this
+	 * case only one string should be allocated and freed
+	 * right after usage. Heap is fine for that.
+	 * Can't use the global tarantool_lua_ibuf, because the
+	 * plain dumper is used by session push, which yields in
+	 * coio. And during that yield the ibuf can be reset by
+	 * another fiber.
+	 */
+	char *buffer = (char *) strdup(result);
+	if (buffer == NULL) {
+		diag_set(OutOfMemory, len + 1, "strdup", "buffer");
+		goto error;
+	}
+	assert(port->buffer == NULL);
+	port->buffer = buffer;
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+	return buffer;
+error:
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+	return NULL;
+}
+
 /**
  * Push a tagged YAML document into a console socket.
  * @param session Console session.
diff --git a/src/box/port.h b/src/box/port.h
index 9d3d02b3c..8a1650424 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -86,6 +86,12 @@ struct port_msgpack {
 	const struct port_vtab *vtab;
 	const char *data;
 	uint32_t data_sz;
+	/**
+	 * Buffer for dump_*() functions. In particular, it is
+	 * used by the plain dumper to save a result string and
+	 * free it when the port is destroyed.
+	 */
+	char *buffer;
 };
 
 static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
@@ -95,6 +101,10 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
 void
 port_msgpack_create(struct port *port, const char *data, uint32_t data_sz);
 
+/** Destroy a MessagePack port. */
+void
+port_msgpack_destroy(struct port *base);
+
 /** Port for storing the result of a Lua CALL/EVAL. */
 struct port_lua {
 	const struct port_vtab *vtab;
diff --git a/test/box/function1.c b/test/box/function1.c
index 87062d6a8..b2ce752a9 100644
--- a/test/box/function1.c
+++ b/test/box/function1.c
@@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end)
 		fiber_sleep(0);
 	return 0;
 }
+
+int
+test_push(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void) ctx;
+	return box_session_push(args, args_end);
+}
diff --git a/test/box/push.result b/test/box/push.result
index aebcb7501..6d85a484b 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -563,3 +563,95 @@ box.schema.func.drop('do_long_and_push')
 box.session.on_disconnect(nil, on_disconnect)
 ---
 ...
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+---
+...
+old_cpath = package.cpath
+---
+...
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+---
+...
+box.schema.func.create('function1.test_push', {language = 'C'})
+---
+...
+box.schema.user.grant('guest', 'super')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+messages = {}
+---
+...
+c:call('function1.test_push',   \
+       {1, 2, 3},               \
+       {on_push = table.insert, \
+        on_push_ctx = messages})
+---
+- []
+...
+messages
+---
+- - [1, 2, 3]
+...
+c:close()
+---
+...
+--
+-- C can push to the console.
+--
+console = require('console')
+---
+...
+fio = require('fio')
+---
+...
+socket = require('socket')
+---
+...
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+---
+...
+_ = fio.unlink(sock_path)
+---
+...
+server = console.listen(sock_path)
+---
+...
+client = socket.tcp_connect('unix/', sock_path)
+---
+...
+_ = client:read({chunk = 128})
+---
+...
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3})\n")
+---
+...
+client:read("\n...\n")
+---
+- '%TAG !push! tag:tarantool.io/push,2018
+
+  --- [1, 2, 3]
+
+  ...
+
+  '
+...
+client:close()
+---
+- true
+...
+server:close()
+---
+- true
+...
+box.schema.user.revoke('guest', 'super')
+---
+...
+box.schema.func.drop('function1.test_push')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index 7ae6f4a86..d47422cef 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -264,3 +264,39 @@ chan_push:put(true)
 chan_push:get()
 box.schema.func.drop('do_long_and_push')
 box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+old_cpath = package.cpath
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+
+box.schema.func.create('function1.test_push', {language = 'C'})
+box.schema.user.grant('guest', 'super')
+c = netbox.connect(box.cfg.listen)
+messages = {}
+c:call('function1.test_push',   \
+       {1, 2, 3},               \
+       {on_push = table.insert, \
+        on_push_ctx = messages})
+messages
+c:close()
+--
+-- C can push to the console.
+--
+console = require('console')
+fio = require('fio')
+socket = require('socket')
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+_ = fio.unlink(sock_path)
+server = console.listen(sock_path)
+client = socket.tcp_connect('unix/', sock_path)
+_ = client:read({chunk = 128})
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3})\n")
+client:read("\n...\n")
+client:close()
+server:close()
+
+box.schema.user.revoke('guest', 'super')
+box.schema.func.drop('function1.test_push')


More information about the Tarantool-patches mailing list