Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
To: tarantool-patches@freelists.org
Cc: vdavydov.dev@gmail.com
Subject: [PATCH v2 08/10] session: introduce text box.session.push
Date: Fri, 20 Apr 2018 16:24:33 +0300	[thread overview]
Message-ID: <f16c5d5a1089229efe330bf4385fdd5434be052b.1524228894.git.v.shpilevoy@tarantool.org> (raw)
In-Reply-To: <cover.1524228894.git.v.shpilevoy@tarantool.org>
In-Reply-To: <cover.1524228894.git.v.shpilevoy@tarantool.org>

box.session.push allows to send some intermediate results in the
scope of main request with no finalizing it. Messages can be
sent over text and binary protocol. This patch allows to send
text pushes.

Text push is a YAML document tagged with '!push!' handle and
'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way
to define a type of the document.

Console received push message just prints it to the stdout (or
sends to a next console, if it is remote console too).

Part of #2677
---
 src/box/lua/console.c         | 45 +++++++++++++++++++++++++++++++++++-
 src/box/lua/console.h         | 10 ++++++++
 src/box/lua/console.lua       | 29 +++++++++++++++++++----
 src/box/lua/session.c         | 54 ++++++++++++++++++++++++++++++++++++++++++-
 src/box/port.c                |  6 +++++
 src/box/port.h                | 17 ++++++++++++++
 src/fio.c                     |  2 +-
 src/fio.h                     |  2 +-
 test/app-tap/console.test.lua |  6 +++--
 9 files changed, 160 insertions(+), 11 deletions(-)

diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index a3bf83cb1..c4886db8c 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -31,10 +31,12 @@
 
 #include "box/lua/console.h"
 #include "box/session.h"
+#include "box/port.h"
 #include "lua/utils.h"
 #include "lua/fiber.h"
 #include "fiber.h"
 #include "coio.h"
+#include "fio.h"
 #include "lua-yaml/lyaml.h"
 #include <lua.h>
 #include <lauxlib.h>
@@ -364,6 +366,47 @@ console_session_fd(struct session *session)
 	return session->meta.fd;
 }
 
+int
+console_encode_push(struct lua_State *L)
+{
+	return lua_yaml_encode_tagged(L, luaL_yaml_default, "!push!",
+				      "tag:tarantool.io/push,2018");
+}
+
+/**
+ * Push a tagged YAML document into a console socket.
+ * @param session Console session.
+ * @param port Port with YAML to push.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static int
+console_session_push(struct session *session, struct port *port)
+{
+	assert(session_vtab_registry[session->type].push ==
+	       console_session_push);
+	uint32_t text_len;
+	const char *text = port_dump_plain(port, &text_len);
+	if (text == NULL)
+		return -1;
+	int fd = session_fd(session);
+	while (text_len > 0) {
+		while (coio_wait(fd, COIO_WRITE,
+				 TIMEOUT_INFINITY) != COIO_WRITE);
+		const struct iovec iov = {
+			.iov_base = (void *) text,
+			.iov_len = text_len
+		};
+		ssize_t rc = fio_writev(fd, &iov, 1);
+		if (rc < 0)
+			return -1;
+		text_len -= rc;
+		text += rc;
+	}
+	return 0;
+}
+
 void
 tarantool_lua_console_init(struct lua_State *L)
 {
@@ -400,7 +443,7 @@ tarantool_lua_console_init(struct lua_State *L)
 	 */
 	lua_setfield(L, -2, "formatter");
 	struct session_vtab console_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ console_session_push,
 		/* .fd = */ console_session_fd,
 		/* .sync = */ generic_session_sync,
 	};
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..6d1449810 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -36,6 +36,16 @@ extern "C" {
 
 struct lua_State;
 
+/**
+ * Encode a single value on top of the stack into YAML document
+ * tagged as push message.
+ * @param object Any lua object on top of the stack.
+ * @retval nil, error Error occured.
+ * @retval not nil Tagged YAML document.
+ */
+int
+console_encode_push(struct lua_State *L);
+
 void
 tarantool_lua_console_init(struct lua_State *L);
 
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index bc4e02bfc..b8ae5ba59 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -11,6 +11,7 @@ local yaml = require('yaml')
 local net_box = require('net.box')
 
 local YAML_TERM = '\n...\n'
+local PUSH_TAG_HANDLE = '!push!'
 
 local function format(status, ...)
     local err
@@ -92,13 +93,27 @@ local text_connection_mt = {
         --
         eval = function(self, text)
             text = text..'$EOF$\n'
-            if self:write(text) then
+            if not self:write(text) then
+                error(self:set_error())
+            end
+            while true do
                 local rc = self:read()
-                if rc then
+                if not rc then
+                    break
+                end
+                local handle, prefix = yaml.decode_tag(rc)
+                assert(handle or not prefix)
+                if not handle then
+                    -- Can not fail - tags are encoded with no
+                    -- user participation and are correct always.
+                    assert(not prefix)
                     return rc
                 end
+                if handle == PUSH_TAG_HANDLE and self.print_f then
+                    self.print_f(rc)
+                end
             end
-            error(self:set_error())
+            return rc
         end,
         --
         -- Make the connection be in error state, set error
@@ -121,15 +136,18 @@ local text_connection_mt = {
 -- netbox-like object.
 -- @param connection Socket to wrap.
 -- @param url Parsed destination URL.
+-- @param print_f Function to print push messages.
+--
 -- @retval nil, err Error, and err contains an error message.
 -- @retval  not nil Netbox-like object.
 --
-local function wrap_text_socket(connection, url)
+local function wrap_text_socket(connection, url, print_f)
     local conn = setmetatable({
         _socket = connection,
         state = 'active',
         host = url.host or 'localhost',
         port = url.service,
+        print_f = print_f,
     }, text_connection_mt)
     if not conn:write('require("console").delimiter("$EOF$")\n') or
        not conn:read() then
@@ -369,7 +387,8 @@ local function connect(uri, opts)
     end
     local remote
     if greeting.protocol == 'Lua console' then
-        remote = wrap_text_socket(connection, u)
+        remote = wrap_text_socket(connection, u,
+                                  function(msg) self:print(msg) end)
     else
         opts = {
             connect_timeout = opts.timeout,
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 5fe5f08d4..306271809 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -41,6 +41,8 @@
 #include "box/session.h"
 #include "box/user.h"
 #include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
 
 static const char *sessionlib_name = "box.session";
 
@@ -355,6 +357,52 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
 	return 3;
 }
 
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+	const struct port_vtab *vtab;
+	/**
+	 * Lua state, containing data to dump on top of the stack.
+	 */
+	struct lua_State *L;
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+       .dump_msgpack = NULL,
+       /*
+        * Dump_16 has no sense, since push appears since 1.10
+        * protocol.
+        */
+       .dump_msgpack_16 = NULL,
+       .dump_plain = lua_push_port_dump_plain,
+       .destroy = NULL,
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size)
+{
+	struct lua_push_port *lua_port = (struct lua_push_port *) port;
+	assert(lua_port->vtab == &lua_push_port_vtab);
+	struct lua_State *L = lua_port->L;
+	int rc = console_encode_push(L);
+	if (rc == 2) {
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		return NULL;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	return result;
+}
+
 /**
  * Push a message using a protocol, depending on a session type.
  * @param data Data to push, first argument on a stack.
@@ -367,7 +415,11 @@ lbox_session_push(struct lua_State *L)
 	if (lua_gettop(L) != 1)
 		return luaL_error(L, "Usage: box.session.push(data)");
 
-	if (session_push(current_session(), NULL) != 0) {
+	struct lua_push_port port;
+	port.vtab = &lua_push_port_vtab;
+	port.L = L;
+
+	if (session_push(current_session(), (struct port *) &port) != 0) {
 		lua_pushnil(L);
 		luaT_pusherror(L, box_error_last());
 		return 2;
diff --git a/src/box/port.c b/src/box/port.c
index 255eb732c..f9b655840 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out)
 	return port->vtab->dump_msgpack_16(port, out);
 }
 
+const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+	return port->vtab->dump_plain(port, size);
+}
+
 void
 port_init(void)
 {
diff --git a/src/box/port.h b/src/box/port.h
index 1c44b9b00..7fc1b8972 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
 	 * 1.6 format.
 	 */
 	int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+	/**
+	 * Dump a port content as a plain text into a buffer,
+	 * allocated inside.
+	 */
+	const char *(*dump_plain)(struct port *port, uint32_t *size);
 	/**
 	 * Destroy a port and release associated resources.
 	 */
@@ -158,6 +163,18 @@ port_dump_msgpack(struct port *port, struct obuf *out);
 int
 port_dump_msgpack_16(struct port *port, struct obuf *out);
 
+/**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+const char *
+port_dump_plain(struct port *port, uint32_t *size);
+
 void
 port_init(void);
 
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..b1d9ecf44 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -135,7 +135,7 @@ fio_writen(int fd, const void *buf, size_t count)
 }
 
 ssize_t
-fio_writev(int fd, struct iovec *iov, int iovcnt)
+fio_writev(int fd, const struct iovec *iov, int iovcnt)
 {
 	assert(iov && iovcnt >= 0);
 	ssize_t nwr;
diff --git a/src/fio.h b/src/fio.h
index 12749afcb..fb6383c81 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -133,7 +133,7 @@ fio_writen(int fd, const void *buf, size_t count);
  *         returns the total number of bytes written, or -1 if error.
  */
 ssize_t
-fio_writev(int fd, struct iovec *iov, int iovcnt);
+fio_writev(int fd, const struct iovec *iov, int iovcnt);
 
 /**
  * A wrapper around writev, but retries for partial writes
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index d2e88b55b..b1d7166d4 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(60)
+test:plan(61)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console")
 -- gh-2677: box.session.push, text protocol support.
 --
 client:write('box.session.push(200)\n')
-test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n',
+        "pushed message")
+test:is(client:read(EOL), '---\n- true\n...\n', "pushed message")
 
 -- Execute some command
 client:write("1\n")
-- 
2.15.1 (Apple Git-101)

  parent reply	other threads:[~2018-04-20 13:24 UTC|newest]

Thread overview: 34+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-20 13:24 [PATCH v2 00/10] session: introduce box.session.push Vladislav Shpilevoy
2018-04-20 13:24 ` [PATCH v2 01/10] yaml: don't throw OOM on any error in yaml encoding Vladislav Shpilevoy
2018-05-10 18:10   ` [tarantool-patches] " Konstantin Osipov
2018-04-20 13:24 ` [tarantool-patches] [PATCH v2 10/10] session: introduce binary box.session.push Vladislav Shpilevoy
2018-05-10 19:50   ` Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-20 13:24 ` [PATCH v2 02/10] yaml: introduce yaml.encode_tagged Vladislav Shpilevoy
2018-05-10 18:22   ` [tarantool-patches] " Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-05-30 19:15       ` Konstantin Osipov
2018-05-30 20:49         ` Vladislav Shpilevoy
2018-05-31 10:46           ` Konstantin Osipov
2018-04-20 13:24 ` [PATCH v2 03/10] yaml: introduce yaml.decode_tag Vladislav Shpilevoy
2018-05-10 18:41   ` [tarantool-patches] " Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-05-31 10:54       ` Konstantin Osipov
2018-05-31 11:36       ` Konstantin Osipov
2018-04-20 13:24 ` [PATCH v2 04/10] console: use Lua C API to do formatting for console Vladislav Shpilevoy
2018-05-10 18:46   ` [tarantool-patches] " Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-20 13:24 ` [PATCH v2 05/10] session: move salt into iproto connection Vladislav Shpilevoy
2018-05-10 18:47   ` [tarantool-patches] " Konstantin Osipov
2018-04-20 13:24 ` [PATCH v2 06/10] session: introduce session vtab and meta Vladislav Shpilevoy
2018-05-10 19:20   ` [tarantool-patches] " Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-20 13:24 ` [PATCH v2 07/10] port: rename dump() into dump_msgpack() Vladislav Shpilevoy
2018-05-10 19:21   ` [tarantool-patches] " Konstantin Osipov
2018-04-20 13:24 ` Vladislav Shpilevoy [this message]
2018-05-10 19:27   ` [tarantool-patches] [PATCH v2 08/10] session: introduce text box.session.push Konstantin Osipov
2018-05-24 20:50     ` [tarantool-patches] " Vladislav Shpilevoy
2018-04-20 13:24 ` [PATCH v2 09/10] session: enable box.session.push in local console Vladislav Shpilevoy
2018-05-10 19:28   ` [tarantool-patches] " Konstantin Osipov
2018-05-24 20:50 ` [tarantool-patches] [PATCH 1/1] netbox: introduce iterable future objects Vladislav Shpilevoy
2018-06-04 22:17   ` [tarantool-patches] " Vladislav Shpilevoy

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=f16c5d5a1089229efe330bf4385fdd5434be052b.1524228894.git.v.shpilevoy@tarantool.org \
    --to=v.shpilevoy@tarantool.org \
    --cc=tarantool-patches@freelists.org \
    --cc=vdavydov.dev@gmail.com \
    --subject='Re: [PATCH v2 08/10] session: introduce text box.session.push' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox