[tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Jun 1 23:55:55 MSK 2018


box.session.push allows to send some intermediate results in the
scope of main request with no finalizing it. Messages can be
sent over text and binary protocol. This patch allows to send
text pushes.

Text push is a YAML document tagged with '!push!' handle and
'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way
to define a type of the document.

Console received push message just prints it to the stdout (or
sends to a next console, if it is remote console too).

Part of #2677
---
 src/box/lua/call.c            |  4 +++
 src/box/lua/console.c         | 74 ++++++++++++++++++++++++++++++++++++++++++-
 src/box/lua/console.lua       | 27 +++++++++++++---
 src/box/lua/session.c         |  6 ++--
 src/box/port.c                |  7 ++++
 src/box/port.h                | 17 ++++++++++
 src/diag.h                    |  3 ++
 src/sio.cc                    | 16 ++++++++++
 test/app-tap/console.test.lua |  6 ++--
 9 files changed, 150 insertions(+), 10 deletions(-)

diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index 831583d26..e682820df 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -412,9 +412,13 @@ port_lua_destroy(struct port *base)
 	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, port->ref);
 }
 
+extern const char *
+port_lua_dump_plain(struct port *port, uint32_t *size);
+
 static const struct port_vtab port_lua_vtab = {
 	.dump_msgpack = port_lua_dump,
 	.dump_msgpack_16 = port_lua_dump_16,
+	.dump_plain = port_lua_dump_plain,
 	.destroy = port_lua_destroy,
 };
 
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index fea96bc05..49065a53c 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -31,6 +31,8 @@
 
 #include "box/lua/console.h"
 #include "box/session.h"
+#include "box/port.h"
+#include "box/error.h"
 #include "lua/utils.h"
 #include "lua/fiber.h"
 #include "fiber.h"
@@ -366,6 +368,76 @@ console_session_fd(struct session *session)
 	return session->meta.fd;
 }
 
+/**
+ * Dump port lua data as a YAML document tagged with !push! global
+ * tag.
+ * @param port Port lua.
+ * @param[out] size Size of the result.
+ *
+ * @retval not NULL Tagged YAML document.
+ * @retval NULL Error.
+ */
+const char *
+port_lua_dump_plain(struct port *port, uint32_t *size)
+{
+	struct port_lua *port_lua = (struct port_lua *) port;
+	struct lua_State *L = port_lua->L;
+	int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
+				 "tag:tarantool.io/push,2018");
+	if (rc == 2) {
+		/*
+		 * Nil and error object are pushed onto the stack.
+		 */
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		return NULL;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	return result;
+}
+
+/**
+ * Push a tagged YAML document into a console socket.
+ * @param session Console session.
+ * @param port Port with YAML to push.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static int
+console_session_push(struct session *session, struct port *port)
+{
+	assert(session_vtab_registry[session->type].push ==
+	       console_session_push);
+	uint32_t text_len;
+	const char *text = port_dump_plain(port, &text_len);
+	if (text == NULL)
+		return -1;
+	int fd = session_fd(session);
+	while (text_len > 0) {
+		ssize_t rc = write(fd, text, text_len);
+		if (rc < 0) {
+			if (errno == EAGAIN || errno == EWOULDBLOCK) {
+				while (coio_wait(fd, COIO_WRITE,
+						 TIMEOUT_INFINITY) !=
+				       COIO_WRITE);
+			} else if (errno != EINTR) {
+				diag_set(SocketError, fd, strerror(errno));
+				return -1;
+			}
+		} else {
+			text_len -= (uint32_t) rc;
+			text += rc;
+		}
+	}
+	return 0;
+}
+
 void
 tarantool_lua_console_init(struct lua_State *L)
 {
@@ -400,7 +472,7 @@ tarantool_lua_console_init(struct lua_State *L)
 	 */
 	lua_setfield(L, -2, "formatter");
 	struct session_vtab console_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ console_session_push,
 		/* .fd = */ console_session_fd,
 		/* .sync = */ generic_session_sync,
 	};
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index bc4e02bfc..6271b416b 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -11,6 +11,7 @@ local yaml = require('yaml')
 local net_box = require('net.box')
 
 local YAML_TERM = '\n...\n'
+local PUSH_TAG_HANDLE = '!push!'
 
 local function format(status, ...)
     local err
@@ -92,13 +93,25 @@ local text_connection_mt = {
         --
         eval = function(self, text)
             text = text..'$EOF$\n'
-            if self:write(text) then
+            if not self:write(text) then
+                error(self:set_error())
+            end
+            while true do
                 local rc = self:read()
-                if rc then
+                if not rc then
+                    break
+                end
+                local handle, prefix = yaml.decode(rc, {tag_only = true})
+                if not handle then
+                    -- Can not fail - tags are encoded with no
+                    -- user participation and are correct always.
                     return rc
                 end
+                if handle == PUSH_TAG_HANDLE and self.print_f then
+                    self.print_f(rc)
+                end
             end
-            error(self:set_error())
+            return rc
         end,
         --
         -- Make the connection be in error state, set error
@@ -121,15 +134,18 @@ local text_connection_mt = {
 -- netbox-like object.
 -- @param connection Socket to wrap.
 -- @param url Parsed destination URL.
+-- @param print_f Function to print push messages.
+--
 -- @retval nil, err Error, and err contains an error message.
 -- @retval  not nil Netbox-like object.
 --
-local function wrap_text_socket(connection, url)
+local function wrap_text_socket(connection, url, print_f)
     local conn = setmetatable({
         _socket = connection,
         state = 'active',
         host = url.host or 'localhost',
         port = url.service,
+        print_f = print_f,
     }, text_connection_mt)
     if not conn:write('require("console").delimiter("$EOF$")\n') or
        not conn:read() then
@@ -369,7 +385,8 @@ local function connect(uri, opts)
     end
     local remote
     if greeting.protocol == 'Lua console' then
-        remote = wrap_text_socket(connection, u)
+        remote = wrap_text_socket(connection, u,
+                                  function(msg) self:print(msg) end)
     else
         opts = {
             connect_timeout = opts.timeout,
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 05010c4c3..627f62f59 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -41,6 +41,7 @@
 #include "box/session.h"
 #include "box/user.h"
 #include "box/schema.h"
+#include "box/port.h"
 
 static const char *sessionlib_name = "box.session";
 
@@ -367,8 +368,9 @@ lbox_session_push(struct lua_State *L)
 {
 	if (lua_gettop(L) != 1)
 		return luaL_error(L, "Usage: box.session.push(data)");
-
-	if (session_push(current_session(), NULL) != 0) {
+	struct port port;
+	port_lua_create(&port, L);
+	if (session_push(current_session(), &port) != 0) {
 		lua_pushnil(L);
 		luaT_pusherror(L, box_error_last());
 		return 2;
diff --git a/src/box/port.c b/src/box/port.c
index 255eb732c..03ca323d7 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out)
 	return port->vtab->dump_msgpack_16(port, out);
 }
 
+const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+	return port->vtab->dump_plain(port, size);
+}
+
 void
 port_init(void)
 {
@@ -159,5 +165,6 @@ port_free(void)
 const struct port_vtab port_tuple_vtab = {
 	.dump_msgpack = port_tuple_dump_msgpack,
 	.dump_msgpack_16 = port_tuple_dump_msgpack_16,
+	.dump_plain = NULL,
 	.destroy = port_tuple_destroy,
 };
diff --git a/src/box/port.h b/src/box/port.h
index 6f1e5f364..882bb3791 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -77,6 +77,11 @@ struct port_vtab {
 	 * 1.6 format.
 	 */
 	int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+	/**
+	 * Dump a port content as a plain text into a buffer,
+	 * allocated inside.
+	 */
+	const char *(*dump_plain)(struct port *port, uint32_t *size);
 	/**
 	 * Destroy a port and release associated resources.
 	 */
@@ -179,6 +184,18 @@ port_dump_msgpack(struct port *port, struct obuf *out);
 int
 port_dump_msgpack_16(struct port *port, struct obuf *out);
 
+/**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+const char *
+port_dump_plain(struct port *port, uint32_t *size);
+
 void
 port_init(void);
 
diff --git a/src/diag.h b/src/diag.h
index bd5a539b0..24b6383b6 100644
--- a/src/diag.h
+++ b/src/diag.h
@@ -251,6 +251,9 @@ struct error *
 BuildXlogError(const char *file, unsigned line, const char *format, ...);
 struct error *
 BuildCollationError(const char *file, unsigned line, const char *format, ...);
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...);
 
 struct index_def;
 
diff --git a/src/sio.cc b/src/sio.cc
index c906a97a8..8d71f0382 100644
--- a/src/sio.cc
+++ b/src/sio.cc
@@ -67,6 +67,22 @@ SocketError::SocketError(const char *file, unsigned line, int fd,
 	errno = save_errno;
 }
 
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...)
+{
+	try {
+		SocketError *e = new SocketError(file, line, fd, "");
+		va_list ap;
+		va_start(ap, format);
+		error_vformat_msg(e, format, ap);
+		va_end(ap);
+		return e;
+	} catch (OutOfMemory *e) {
+		return e;
+	}
+}
+
 /** Pretty print socket name and peer (for exceptions) */
 const char *
 sio_socketname(int fd)
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index a5b3061a9..237b3d002 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(60)
+test:plan(61)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console")
 -- gh-2677: box.session.push, text protocol support.
 --
 client:write('box.session.push(200)\n')
-test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n',
+        "pushed message")
+test:is(client:read(EOL), '---\n- true\n...\n', "pushed message")
 
 -- Execute some command
 client:write("1\n")
-- 
2.15.1 (Apple Git-101)





More information about the Tarantool-patches mailing list