Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@dev.tarantool.org, imun@tarantool.org
Subject: [Tarantool-patches] [PATCH v2 1/1] box: export box_session_push to the public C API
Date: Wed,  8 Apr 2020 01:20:47 +0200	[thread overview]
Message-ID: <e2bc28149fb628dded1a9b1bd42180fa59b64e82.1586301544.git.v.shpilevoy@tarantool.org> (raw)

API is different from box.session.push() - sync argument was
removed. It will disappear from Lua API as well, because it just
is not needed here. Session is omitted as well. Indeed, a user
can't push to a foreign session, and the current session can be
obtained inside box_session_push(). And anyway session is not in
the public C API.

Internally dump into iproto is done using obuf_dup(), just like
tuple_to_obuf() does. obuf_alloc() would be a bad call here,
because it wouldn't be able to split the pushed data into several
obuf chunks, and would cause obuf fragmentation.

Dump into plain text behaves just like a Lua push - it produces a
YAML formatted text or Lua text depending on output format. But to
turn MessagePack into YAML or Lua text an intermediate Lua
representation is used, because there are no a MessagePack -> YAML
and MessagePack -> Lua text translators yet.

Closes #4734

@TarantoolBot document
Title: box_session_push() C API

There is a new function in the public C API:
```C
    int
    box_session_push(const char *data, const char *data_end);
```

It takes raw MessagePack, and behaves just like Lua
`box.session.push()`.
---
Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4734-export-box-session-push
Issue: https://github.com/tarantool/tarantool/issues/4734

Changes in v2:
- Rebased on Chris' patch for box.session.push() Lua text format;
- Fixed Igor's comments.

@ChangeLog
- box_session_push() new public C API function. It takes a
  const char * MessagePack, and returns it to the client
  out of order, like Lua analogue (box.session.push()) (gh-4734).

 extra/exports          |   1 +
 src/box/box.cc         |  14 +++++
 src/box/box.h          |  14 +++++
 src/box/call.c         |  44 +++++++++++++-
 src/box/lua/console.c  |  75 ++++++++++++++++++++---
 src/box/port.h         |  15 +++++
 test/box/function1.c   |   7 +++
 test/box/push.result   | 133 +++++++++++++++++++++++++++++++++++++++++
 test/box/push.test.lua |  51 ++++++++++++++++
 9 files changed, 344 insertions(+), 10 deletions(-)

diff --git a/extra/exports b/extra/exports
index f71cb7d93..2c19b5445 100644
--- a/extra/exports
+++ b/extra/exports
@@ -220,6 +220,7 @@ box_sequence_next
 box_sequence_current
 box_sequence_set
 box_sequence_reset
+box_session_push
 box_index_iterator
 box_iterator_next
 box_iterator_free
diff --git a/src/box/box.cc b/src/box/box.cc
index 765d64678..15e79df19 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1455,6 +1455,20 @@ box_sequence_reset(uint32_t seq_id)
 	return sequence_data_delete(seq_id);
 }
 
+int
+box_session_push(const char *data, const char *data_end)
+{
+	struct session *session = current_session();
+	if (session == NULL)
+		return -1;
+	struct port_msgpack port;
+	struct port *base = (struct port *) &port;
+	port_msgpack_create(base, data, data_end - data);
+	int rc = session_push(session, session_sync(session), base);
+	port_msgpack_destroy(base);
+	return rc;
+}
+
 static inline void
 box_register_replica(uint32_t id, const struct tt_uuid *uuid)
 {
diff --git a/src/box/box.h b/src/box/box.h
index 044d929d4..c94e500ab 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -457,6 +457,20 @@ box_sequence_set(uint32_t seq_id, int64_t value);
 API_EXPORT int
 box_sequence_reset(uint32_t seq_id);
 
+/**
+ * Push MessagePack data into a session data channel - socket,
+ * console or whatever is behind the session. Note, that
+ * successful push does not guarantee delivery in case it was sent
+ * into the network. Just like with write()/send() system calls.
+ *
+ * \param data begin of MessagePack to push
+ * \param data_end end of MessagePack to push
+ * \retval -1 on error (check box_error_last())
+ * \retval 0 on success
+ */
+API_EXPORT int
+box_session_push(const char *data, const char *data_end);
+
 /** \endcond public */
 
 /**
diff --git a/src/box/call.c b/src/box/call.c
index a46a61c3c..be4f71f59 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -64,17 +64,55 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size)
 	return port->data;
 }
 
+static int
+port_msgpack_dump_msgpack(struct port *base, struct obuf *out)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	assert(port->vtab == &port_msgpack_vtab);
+	size_t size = port->data_sz;
+	if (obuf_dup(out, port->data, size) == size)
+		return 0;
+	diag_set(OutOfMemory, size, "obuf_dup", "port->data");
+	return -1;
+}
+
 extern void
 port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat);
 
+extern const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size);
+
+void
+port_msgpack_destroy(struct port *base)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	assert(port->vtab == &port_msgpack_vtab);
+	free(port->plain);
+}
+
+char *
+port_mspack_set_plain(struct port *base, const char *plain, uint32_t len)
+{
+	struct port_msgpack *port = (struct port_msgpack *) base;
+	assert(port->plain == NULL);
+	port->plain = (char *)malloc(len + 1);
+	if (port->plain == NULL) {
+		diag_set(OutOfMemory, len + 1, "malloc", "port->plain");
+		return NULL;
+	}
+	memcpy(port->plain, plain, len);
+	port->plain[len] = 0;
+	return port->plain;
+}
+
 static const struct port_vtab port_msgpack_vtab = {
-	.dump_msgpack = NULL,
+	.dump_msgpack = port_msgpack_dump_msgpack,
 	.dump_msgpack_16 = NULL,
 	.dump_lua = port_msgpack_dump_lua,
-	.dump_plain = NULL,
+	.dump_plain = port_msgpack_dump_plain,
 	.get_msgpack = port_msgpack_get_msgpack,
 	.get_vdbemem = NULL,
-	.destroy = NULL,
+	.destroy = port_msgpack_destroy,
 };
 
 int
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index bd454c269..396cb87f2 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -37,6 +37,7 @@
 #include "lua/fiber.h"
 #include "fiber.h"
 #include "coio.h"
+#include "lua/msgpack.h"
 #include "lua-yaml/lyaml.h"
 #include <lua.h>
 #include <lauxlib.h>
@@ -390,19 +391,17 @@ console_set_output_format(enum output_format output_format)
 }
 
 /**
- * Dump port lua data with respect to output format:
+ * Dump Lua data to text with respect to output format:
  * YAML document tagged with !push! global tag or Lua string.
- * @param port Port lua.
+ * @param L Lua state.
  * @param[out] size Size of the result.
  *
- * @retval not NULL Tagged YAML document.
+ * @retval not NULL Tagged YAML document or Lua text.
  * @retval NULL Error.
  */
-const char *
-port_lua_dump_plain(struct port *port, uint32_t *size)
+static const char *
+lua_dump_plain(struct lua_State *L, uint32_t *size)
 {
-	struct port_lua *port_lua = (struct port_lua *) port;
-	struct lua_State *L = port_lua->L;
 	enum output_format fmt = console_get_output_format();
 	if (fmt == OUTPUT_FORMAT_YAML) {
 		int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
@@ -435,6 +434,68 @@ port_lua_dump_plain(struct port *port, uint32_t *size)
 	return result;
 }
 
+/** Plain text converter for port Lua data. */
+const char *
+port_lua_dump_plain(struct port *base, uint32_t *size)
+{
+	return lua_dump_plain(((struct port_lua *)base)->L, size);
+}
+
+/**
+ * A helper for port_msgpack_dump_plain() to execute it safely
+ * regarding Lua errors.
+ */
+static int
+lua_port_msgpack_dump_plain(struct lua_State *L)
+{
+	char **result = (char **)lua_touserdata(L, lua_upvalueindex(1));
+	struct port_msgpack *port =
+		(struct port_msgpack *)lua_touserdata(L, lua_upvalueindex(2));
+	uint32_t *size = (uint32_t *)lua_touserdata(L, lua_upvalueindex(3));
+	const char *data = port->data;
+	/*
+	 * MessagePack -> Lua object -> YAML/Lua text. The middle
+	 * is not really needed here, but there is no
+	 * MessagePack -> YAML encoder yet. Neither
+	 * MessagePack -> Lua text.
+	 */
+	luamp_decode(L, luaL_msgpack_default, &data);
+	data = lua_dump_plain(L, size);
+	if (data == NULL) {
+		*result = NULL;
+		return 0;
+	}
+	*result = port_mspack_set_plain((struct port *)port, data, *size);
+	return 0;
+ }
+
+/** Plain text converter for raw MessagePack. */
+const char *
+port_msgpack_dump_plain(struct port *base, uint32_t *size)
+{
+	struct lua_State *L = tarantool_L;
+	char *result = NULL;
+	int top = lua_gettop(L);
+	(void) top;
+	lua_pushlightuserdata(L, &result);
+	lua_pushlightuserdata(L, base);
+	lua_pushlightuserdata(L, size);
+	lua_pushcclosure(L, lua_port_msgpack_dump_plain, 3);
+	if (lua_pcall(L, 0, 0, 0) != 0) {
+		/*
+		 * Error string is pushed in case it was a Lua
+		 * error.
+		 */
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		lua_pop(L, 1);
+		assert(lua_gettop(L) == top);
+		return NULL;
+	}
+	assert(lua_gettop(L) == top);
+	return result;
+}
+
 /**
  * Push a tagged YAML document or a Lua string into a console
  * socket.
diff --git a/src/box/port.h b/src/box/port.h
index 9d3d02b3c..7ef5a2c63 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -86,6 +86,11 @@ struct port_msgpack {
 	const struct port_vtab *vtab;
 	const char *data;
 	uint32_t data_sz;
+	/**
+	 * Buffer for dump_plain() function. It is created during
+	 * dump on demand and is deleted together with the port.
+	 */
+	char *plain;
 };
 
 static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
@@ -95,6 +100,16 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port),
 void
 port_msgpack_create(struct port *port, const char *data, uint32_t data_sz);
 
+/** Destroy a MessagePack port. */
+void
+port_msgpack_destroy(struct port *base);
+
+/**
+ * Set plain text version of data in the given port. It is copied.
+ */
+char *
+port_mspack_set_plain(struct port *base, const char *plain, uint32_t len);
+
 /** Port for storing the result of a Lua CALL/EVAL. */
 struct port_lua {
 	const struct port_vtab *vtab;
diff --git a/test/box/function1.c b/test/box/function1.c
index 87062d6a8..b2ce752a9 100644
--- a/test/box/function1.c
+++ b/test/box/function1.c
@@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end)
 		fiber_sleep(0);
 	return 0;
 }
+
+int
+test_push(box_function_ctx_t *ctx, const char *args, const char *args_end)
+{
+	(void) ctx;
+	return box_session_push(args, args_end);
+}
diff --git a/test/box/push.result b/test/box/push.result
index aebcb7501..f5d8fe563 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -563,3 +563,136 @@ box.schema.func.drop('do_long_and_push')
 box.session.on_disconnect(nil, on_disconnect)
 ---
 ...
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+---
+...
+old_cpath = package.cpath
+---
+...
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+---
+...
+box.schema.func.create('function1.test_push', {language = 'C'})
+---
+...
+box.schema.user.grant('guest', 'super')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+messages = {}
+---
+...
+c:call('function1.test_push',   \
+       {1, 2, 3},               \
+       {on_push = table.insert, \
+        on_push_ctx = messages})
+---
+- []
+...
+messages
+---
+- - [1, 2, 3]
+...
+c:close()
+---
+...
+--
+-- C can push to the console.
+--
+ffi = require('ffi')
+---
+...
+cd = ffi.new('char[3]')
+---
+...
+cd[0] = 65
+---
+...
+cd[1] = 0
+---
+...
+cd[2] = 67
+---
+...
+-- A string having 0 byte inside. Check that it is handled fine.
+s = ffi.string(cd, 3)
+---
+...
+console = require('console')
+---
+...
+fio = require('fio')
+---
+...
+socket = require('socket')
+---
+...
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+---
+...
+_ = fio.unlink(sock_path)
+---
+...
+server = console.listen(sock_path)
+---
+...
+client = socket.tcp_connect('unix/', sock_path)
+---
+...
+_ = client:read({chunk = 128})
+---
+...
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+---
+...
+client:read("\n...\n")
+---
+- '%TAG !push! tag:tarantool.io/push,2018
+
+  --- [1, 2, 3, "A\0C"]
+
+  ...
+
+  '
+...
+_ = client:read("\n...\n")
+---
+...
+-- Lua output format is supported too.
+_ = client:write("\\set output lua\n")
+---
+...
+_ = client:read(";")
+---
+...
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+---
+...
+client:read(";")
+---
+- '-- Push
+
+  {1, 2, 3, "A\0C"};'
+...
+_ = client:read(";")
+---
+...
+client:close()
+---
+- true
+...
+server:close()
+---
+- true
+...
+box.schema.user.revoke('guest', 'super')
+---
+...
+box.schema.func.drop('function1.test_push')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index 7ae6f4a86..d2dc31db6 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -264,3 +264,54 @@ chan_push:put(true)
 chan_push:get()
 box.schema.func.drop('do_long_and_push')
 box.session.on_disconnect(nil, on_disconnect)
+
+--
+-- gh-4734: C API for session push.
+--
+build_path = os.getenv("BUILDDIR")
+old_cpath = package.cpath
+package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath
+
+box.schema.func.create('function1.test_push', {language = 'C'})
+box.schema.user.grant('guest', 'super')
+c = netbox.connect(box.cfg.listen)
+messages = {}
+c:call('function1.test_push',   \
+       {1, 2, 3},               \
+       {on_push = table.insert, \
+        on_push_ctx = messages})
+messages
+c:close()
+--
+-- C can push to the console.
+--
+ffi = require('ffi')
+cd = ffi.new('char[3]')
+cd[0] = 65
+cd[1] = 0
+cd[2] = 67
+-- A string having 0 byte inside. Check that it is handled fine.
+s = ffi.string(cd, 3)
+
+console = require('console')
+fio = require('fio')
+socket = require('socket')
+sock_path = fio.pathjoin(fio.cwd(), 'console.sock')
+_ = fio.unlink(sock_path)
+server = console.listen(sock_path)
+client = socket.tcp_connect('unix/', sock_path)
+_ = client:read({chunk = 128})
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+client:read("\n...\n")
+_ = client:read("\n...\n")
+-- Lua output format is supported too.
+_ = client:write("\\set output lua\n")
+_ = client:read(";")
+_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n")
+client:read(";")
+_ = client:read(";")
+client:close()
+server:close()
+
+box.schema.user.revoke('guest', 'super')
+box.schema.func.drop('function1.test_push')
-- 
2.21.1 (Apple Git-122.3)

             reply	other threads:[~2020-04-07 23:20 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-04-07 23:20 Vladislav Shpilevoy [this message]
2020-04-08 15:33 ` Igor Munkin
2020-04-08 20:35   ` Vladislav Shpilevoy
2020-04-11  9:38     ` Igor Munkin
2020-04-11 18:11       ` Vladislav Shpilevoy
2020-04-11 18:11         ` Igor Munkin
2020-04-12 14:15           ` Vladislav Shpilevoy
2020-04-14  7:54 ` Kirill Yukhin

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=e2bc28149fb628dded1a9b1bd42180fa59b64e82.1586301544.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=imun@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH v2 1/1] box: export box_session_push to the public C API' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox