[PATCH v2 08/10] session: introduce text box.session.push

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 20 16:24:33 MSK 2018


box.session.push allows to send some intermediate results in the
scope of main request with no finalizing it. Messages can be
sent over text and binary protocol. This patch allows to send
text pushes.

Text push is a YAML document tagged with '!push!' handle and
'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way
to define a type of the document.

Console received push message just prints it to the stdout (or
sends to a next console, if it is remote console too).

Part of #2677
---
 src/box/lua/console.c         | 45 +++++++++++++++++++++++++++++++++++-
 src/box/lua/console.h         | 10 ++++++++
 src/box/lua/console.lua       | 29 +++++++++++++++++++----
 src/box/lua/session.c         | 54 ++++++++++++++++++++++++++++++++++++++++++-
 src/box/port.c                |  6 +++++
 src/box/port.h                | 17 ++++++++++++++
 src/fio.c                     |  2 +-
 src/fio.h                     |  2 +-
 test/app-tap/console.test.lua |  6 +++--
 9 files changed, 160 insertions(+), 11 deletions(-)

diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index a3bf83cb1..c4886db8c 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -31,10 +31,12 @@
 
 #include "box/lua/console.h"
 #include "box/session.h"
+#include "box/port.h"
 #include "lua/utils.h"
 #include "lua/fiber.h"
 #include "fiber.h"
 #include "coio.h"
+#include "fio.h"
 #include "lua-yaml/lyaml.h"
 #include <lua.h>
 #include <lauxlib.h>
@@ -364,6 +366,47 @@ console_session_fd(struct session *session)
 	return session->meta.fd;
 }
 
+int
+console_encode_push(struct lua_State *L)
+{
+	return lua_yaml_encode_tagged(L, luaL_yaml_default, "!push!",
+				      "tag:tarantool.io/push,2018");
+}
+
+/**
+ * Push a tagged YAML document into a console socket.
+ * @param session Console session.
+ * @param port Port with YAML to push.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static int
+console_session_push(struct session *session, struct port *port)
+{
+	assert(session_vtab_registry[session->type].push ==
+	       console_session_push);
+	uint32_t text_len;
+	const char *text = port_dump_plain(port, &text_len);
+	if (text == NULL)
+		return -1;
+	int fd = session_fd(session);
+	while (text_len > 0) {
+		while (coio_wait(fd, COIO_WRITE,
+				 TIMEOUT_INFINITY) != COIO_WRITE);
+		const struct iovec iov = {
+			.iov_base = (void *) text,
+			.iov_len = text_len
+		};
+		ssize_t rc = fio_writev(fd, &iov, 1);
+		if (rc < 0)
+			return -1;
+		text_len -= rc;
+		text += rc;
+	}
+	return 0;
+}
+
 void
 tarantool_lua_console_init(struct lua_State *L)
 {
@@ -400,7 +443,7 @@ tarantool_lua_console_init(struct lua_State *L)
 	 */
 	lua_setfield(L, -2, "formatter");
 	struct session_vtab console_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ console_session_push,
 		/* .fd = */ console_session_fd,
 		/* .sync = */ generic_session_sync,
 	};
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..6d1449810 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -36,6 +36,16 @@ extern "C" {
 
 struct lua_State;
 
+/**
+ * Encode a single value on top of the stack into YAML document
+ * tagged as push message.
+ * @param object Any lua object on top of the stack.
+ * @retval nil, error Error occured.
+ * @retval not nil Tagged YAML document.
+ */
+int
+console_encode_push(struct lua_State *L);
+
 void
 tarantool_lua_console_init(struct lua_State *L);
 
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index bc4e02bfc..b8ae5ba59 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -11,6 +11,7 @@ local yaml = require('yaml')
 local net_box = require('net.box')
 
 local YAML_TERM = '\n...\n'
+local PUSH_TAG_HANDLE = '!push!'
 
 local function format(status, ...)
     local err
@@ -92,13 +93,27 @@ local text_connection_mt = {
         --
         eval = function(self, text)
             text = text..'$EOF$\n'
-            if self:write(text) then
+            if not self:write(text) then
+                error(self:set_error())
+            end
+            while true do
                 local rc = self:read()
-                if rc then
+                if not rc then
+                    break
+                end
+                local handle, prefix = yaml.decode_tag(rc)
+                assert(handle or not prefix)
+                if not handle then
+                    -- Can not fail - tags are encoded with no
+                    -- user participation and are correct always.
+                    assert(not prefix)
                     return rc
                 end
+                if handle == PUSH_TAG_HANDLE and self.print_f then
+                    self.print_f(rc)
+                end
             end
-            error(self:set_error())
+            return rc
         end,
         --
         -- Make the connection be in error state, set error
@@ -121,15 +136,18 @@ local text_connection_mt = {
 -- netbox-like object.
 -- @param connection Socket to wrap.
 -- @param url Parsed destination URL.
+-- @param print_f Function to print push messages.
+--
 -- @retval nil, err Error, and err contains an error message.
 -- @retval  not nil Netbox-like object.
 --
-local function wrap_text_socket(connection, url)
+local function wrap_text_socket(connection, url, print_f)
     local conn = setmetatable({
         _socket = connection,
         state = 'active',
         host = url.host or 'localhost',
         port = url.service,
+        print_f = print_f,
     }, text_connection_mt)
     if not conn:write('require("console").delimiter("$EOF$")\n') or
        not conn:read() then
@@ -369,7 +387,8 @@ local function connect(uri, opts)
     end
     local remote
     if greeting.protocol == 'Lua console' then
-        remote = wrap_text_socket(connection, u)
+        remote = wrap_text_socket(connection, u,
+                                  function(msg) self:print(msg) end)
     else
         opts = {
             connect_timeout = opts.timeout,
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 5fe5f08d4..306271809 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -41,6 +41,8 @@
 #include "box/session.h"
 #include "box/user.h"
 #include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
 
 static const char *sessionlib_name = "box.session";
 
@@ -355,6 +357,52 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
 	return 3;
 }
 
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+	const struct port_vtab *vtab;
+	/**
+	 * Lua state, containing data to dump on top of the stack.
+	 */
+	struct lua_State *L;
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+       .dump_msgpack = NULL,
+       /*
+        * Dump_16 has no sense, since push appears since 1.10
+        * protocol.
+        */
+       .dump_msgpack_16 = NULL,
+       .dump_plain = lua_push_port_dump_plain,
+       .destroy = NULL,
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size)
+{
+	struct lua_push_port *lua_port = (struct lua_push_port *) port;
+	assert(lua_port->vtab == &lua_push_port_vtab);
+	struct lua_State *L = lua_port->L;
+	int rc = console_encode_push(L);
+	if (rc == 2) {
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		return NULL;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	return result;
+}
+
 /**
  * Push a message using a protocol, depending on a session type.
  * @param data Data to push, first argument on a stack.
@@ -367,7 +415,11 @@ lbox_session_push(struct lua_State *L)
 	if (lua_gettop(L) != 1)
 		return luaL_error(L, "Usage: box.session.push(data)");
 
-	if (session_push(current_session(), NULL) != 0) {
+	struct lua_push_port port;
+	port.vtab = &lua_push_port_vtab;
+	port.L = L;
+
+	if (session_push(current_session(), (struct port *) &port) != 0) {
 		lua_pushnil(L);
 		luaT_pusherror(L, box_error_last());
 		return 2;
diff --git a/src/box/port.c b/src/box/port.c
index 255eb732c..f9b655840 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out)
 	return port->vtab->dump_msgpack_16(port, out);
 }
 
+const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+	return port->vtab->dump_plain(port, size);
+}
+
 void
 port_init(void)
 {
diff --git a/src/box/port.h b/src/box/port.h
index 1c44b9b00..7fc1b8972 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
 	 * 1.6 format.
 	 */
 	int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+	/**
+	 * Dump a port content as a plain text into a buffer,
+	 * allocated inside.
+	 */
+	const char *(*dump_plain)(struct port *port, uint32_t *size);
 	/**
 	 * Destroy a port and release associated resources.
 	 */
@@ -158,6 +163,18 @@ port_dump_msgpack(struct port *port, struct obuf *out);
 int
 port_dump_msgpack_16(struct port *port, struct obuf *out);
 
+/**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+const char *
+port_dump_plain(struct port *port, uint32_t *size);
+
 void
 port_init(void);
 
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..b1d9ecf44 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -135,7 +135,7 @@ fio_writen(int fd, const void *buf, size_t count)
 }
 
 ssize_t
-fio_writev(int fd, struct iovec *iov, int iovcnt)
+fio_writev(int fd, const struct iovec *iov, int iovcnt)
 {
 	assert(iov && iovcnt >= 0);
 	ssize_t nwr;
diff --git a/src/fio.h b/src/fio.h
index 12749afcb..fb6383c81 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -133,7 +133,7 @@ fio_writen(int fd, const void *buf, size_t count);
  *         returns the total number of bytes written, or -1 if error.
  */
 ssize_t
-fio_writev(int fd, struct iovec *iov, int iovcnt);
+fio_writev(int fd, const struct iovec *iov, int iovcnt);
 
 /**
  * A wrapper around writev, but retries for partial writes
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index d2e88b55b..b1d7166d4 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(60)
+test:plan(61)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console")
 -- gh-2677: box.session.push, text protocol support.
 --
 client:write('box.session.push(200)\n')
-test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n',
+        "pushed message")
+test:is(client:read(EOL), '---\n- true\n...\n', "pushed message")
 
 -- Execute some command
 client:write("1\n")
-- 
2.15.1 (Apple Git-101)




More information about the Tarantool-patches mailing list