[tarantool-patches] Re: [PATCH v2 08/10] session: introduce text box.session.push

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu May 24 23:50:43 MSK 2018



On 10/05/2018 22:27, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/04/20 16:31]:
>> +/**
>> + * Push a tagged YAML document into a console socket.
>> + * @param session Console session.
>> + * @param port Port with YAML to push.
>> + *
>> + * @retval  0 Success.
>> + * @retval -1 Error.
>> + */
>> +static int
>> +console_session_push(struct session *session, struct port *port)
>> +{
>> +	assert(session_vtab_registry[session->type].push ==
>> +	       console_session_push);
>> +	uint32_t text_len;
>> +	const char *text = port_dump_plain(port, &text_len);
>> +	if (text == NULL)
>> +		return -1;
>> +	int fd = session_fd(session);
>> +	while (text_len > 0) {
>> +		while (coio_wait(fd, COIO_WRITE,
>> +				 TIMEOUT_INFINITY) != COIO_WRITE);
> 
> Nitpick: the socket is ready in 99% of cases, there is no reason
> to call coio_wait() unless you get EINTR.
> Second, why the choice of fio_writev() rather than write() or
> send?

We have discussed that verbally, but I duplicate it here for the
record. I used fio_writev since it does not return -1 on EINTR, but
it appeared to return -1 on EWOULDBLOCK/EAGAIN, so I was wrong -
I can not use it here, you are right.

> 
> What is wrong with coio_writev or similar?

Because console socket has no ev_io object. You asked me investigate if
lua socket has encapsulated ev_io - it does not. Lua socket always
uses file descriptor only, and does coio_wait(int fd). So I can not pass it
to session meta directly from socket.lua with no big refactoring of Lua
socket. If you really want it, I can open a separate issue for that,
label 'good first issue' and assign a student. It is a simple but big
task, that does not block push.

I refactored console push to write into a socket, and only then do
coio_wait if necessary.

diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 65a2192da..5ded99c98 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -394,17 +394,20 @@ console_session_push(struct session *session, struct port *port)
  		return -1;
  	int fd = session_fd(session);
  	while (text_len > 0) {
-		while (coio_wait(fd, COIO_WRITE,
-				 TIMEOUT_INFINITY) != COIO_WRITE);
-		const struct iovec iov = {
-			.iov_base = (void *) text,
-			.iov_len = text_len
-		};
-		ssize_t rc = fio_writev(fd, &iov, 1);
-		if (rc < 0)
-			return -1;
-		text_len -= rc;
-		text += rc;
+		ssize_t rc = write(fd, text, text_len);
+		if (rc < 0) {
+			if (errno == EAGAIN || errno == EWOULDBLOCK) {
+				while (coio_wait(fd, COIO_WRITE,
+						 TIMEOUT_INFINITY) !=
+				       COIO_WRITE);
+			} else if (errno != EINTR) {
+				diag_set(SocketError, fd, strerror(errno));
+				return -1;
+			}
+		} else {
+			text_len -= (uint32_t) rc;
+			text += rc;
+		}
  	}
  	return 0;
  }

> 
>> +		const struct iovec iov = {
>> +			.iov_base = (void *) text,
>> +			.iov_len = text_len
>> +		};
>> +		ssize_t rc = fio_writev(fd, &iov, 1);
>> +		if (rc < 0)
>> +			return -1;
>> +		text_len -= rc;
>> +		text += rc;
>> +	}
>> +	return 0;
>> +}
>> @@ -92,13 +93,27 @@ local text_connection_mt = {
>>           --
>>           eval = function(self, text)
>>               text = text..'$EOF$\n'
>> -            if self:write(text) then
>> +            if not self:write(text) then
>> +                error(self:set_error())
>> +            end
>> +            while true do
>>                   local rc = self:read()
>> -                if rc then
>> +                if not rc then
>> +                    break
>> +                end
>> +                local handle, prefix = yaml.decode_tag(rc)
>> +                assert(handle or not prefix)
>> +                if not handle then
>> +                    -- Can not fail - tags are encoded with no
>> +                    -- user participation and are correct always.
>> +                    assert(not prefix)
> 
> In Lua, asserts take CPU time. Please don't use them unless in a
> test.

diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index b8ae5ba59..574882188 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -102,11 +102,9 @@ local text_connection_mt = {
                      break
                  end
                  local handle, prefix = yaml.decode_tag(rc)
-                assert(handle or not prefix)
                  if not handle then
                      -- Can not fail - tags are encoded with no
                      -- user participation and are correct always.
-                    assert(not prefix)
                      return rc
                  end
                  if handle == PUSH_TAG_HANDLE and self.print_f then

> 
>>                       return rc
>>                   end
>> +                if handle == PUSH_TAG_HANDLE and self.print_f then
>> +                    self.print_f(rc)
>> +                end
>>               end
>> -            error(self:set_error())
>> +            return rc
>>           end,
>>           --
>>           -- Make the connection be in error state, set error
>> @@ -121,15 +136,18 @@ local text_connection_mt = {
>>   -- netbox-like object.
>>   -- @param connection Socket to wrap.
>>   -- @param url Parsed destination URL.
>> +-- @param print_f Function to print push messages.
>> +--
>>   -- @retval nil, err Error, and err contains an error message.
>>   -- @retval  not nil Netbox-like object.
>>   --
>> -local function wrap_text_socket(connection, url)
>> +local function wrap_text_socket(connection, url, print_f)
>>       local conn = setmetatable({
>>           _socket = connection,
>>           state = 'active',
>>           host = url.host or 'localhost',
>>           port = url.service,
>> +        print_f = print_f,
>>       }, text_connection_mt)
>>       if not conn:write('require("console").delimiter("$EOF$")\n') or
>>          not conn:read() then
>> @@ -369,7 +387,8 @@ local function connect(uri, opts)
>>       end
>>       local remote
>>       if greeting.protocol == 'Lua console' then
>> -        remote = wrap_text_socket(connection, u)
>> +        remote = wrap_text_socket(connection, u,
>> +                                  function(msg) self:print(msg) end)
>>       else
>>           opts = {
>>               connect_timeout = opts.timeout,
>> diff --git a/src/box/lua/session.c b/src/box/lua/session.c
>> index 5fe5f08d4..306271809 100644
>> --- a/src/box/lua/session.c
>> +++ b/src/box/lua/session.c
>> @@ -367,7 +415,11 @@ lbox_session_push(struct lua_State *L)
>>   	if (lua_gettop(L) != 1)
>>   		return luaL_error(L, "Usage: box.session.push(data)");
>>   
>> -	if (session_push(current_session(), NULL) != 0) {
>> +	struct lua_push_port port;
>> +	port.vtab = &lua_push_port_vtab;
> 
> Why do you need a separate port?

I need a separate port, because origin struct port does not have
struct lua_State *L. I need the lua stack in console push to get
the object, encode it into YAML and send to a socket.

> 
> And why do you need to create a port for every push? Can'st you
> reuse the same port as is used for the Lua call itself?

Lua call does not reuse port. The port is created on each call in
iproto.cc on stack, exactly like here. I can not understand why do
you think that creating an object on the stack and setting a pair
of its fields makes noticeable performance input. Especially taking
into account that Lua does most of work here.

> 
>> +	port.L = L;
>> +
>> +	if (session_push(current_session(), (struct port *) &port) != 0) {
>>   		lua_pushnil(L);
>>   		luaT_pusherror(L, box_error_last());
>>   		return 2;
>> diff --git a/src/box/port.c b/src/box/port.c
>> index 255eb732c..f9b655840 100644
> 

The patch is below:

=============================================================================

commit 9c626685ce752c03029f49fc8b8298fe6a2e2f9f
Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date:   Thu Apr 19 19:50:25 2018 +0300

     session: introduce text box.session.push
     
     box.session.push allows to send some intermediate results in the
     scope of main request with no finalizing it. Messages can be
     sent over text and binary protocol. This patch allows to send
     text pushes.
     
     Text push is a YAML document tagged with '!push!' handle and
     'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way
     to define a type of the document.
     
     Console received push message just prints it to the stdout (or
     sends to a next console, if it is remote console too).
     
     Part of #2677

diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 7e17fa30a..5ded99c98 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -31,10 +31,12 @@
  
  #include "box/lua/console.h"
  #include "box/session.h"
+#include "box/port.h"
  #include "lua/utils.h"
  #include "lua/fiber.h"
  #include "fiber.h"
  #include "coio.h"
  #include "lua-yaml/lyaml.h"
  #include <lua.h>
  #include <lauxlib.h>
@@ -366,6 +368,50 @@ console_session_fd(struct session *session)
  	return session->meta.fd;
  }
  
+int
+console_encode_push(struct lua_State *L)
+{
+	return lua_yaml_encode_tagged(L, luaL_yaml_default, "!push!",
+				      "tag:tarantool.io/push,2018");
+}
+
+/**
+ * Push a tagged YAML document into a console socket.
+ * @param session Console session.
+ * @param port Port with YAML to push.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static int
+console_session_push(struct session *session, struct port *port)
+{
+	assert(session_vtab_registry[session->type].push ==
+	       console_session_push);
+	uint32_t text_len;
+	const char *text = port_dump_plain(port, &text_len);
+	if (text == NULL)
+		return -1;
+	int fd = session_fd(session);
+	while (text_len > 0) {
+		ssize_t rc = write(fd, text, text_len);
+		if (rc < 0) {
+			if (errno == EAGAIN || errno == EWOULDBLOCK) {
+				while (coio_wait(fd, COIO_WRITE,
+						 TIMEOUT_INFINITY) !=
+				       COIO_WRITE);
+			} else if (errno != EINTR) {
+				diag_set(SocketError, fd, strerror(errno));
+				return -1;
+			}
+		} else {
+			text_len -= (uint32_t) rc;
+			text += rc;
+		}
+	}
+	return 0;
+}
+
  void
  tarantool_lua_console_init(struct lua_State *L)
  {
@@ -400,7 +446,7 @@ tarantool_lua_console_init(struct lua_State *L)
  	 */
  	lua_setfield(L, -2, "formatter");
  	struct session_vtab console_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ console_session_push,
  		/* .fd = */ console_session_fd,
  		/* .sync = */ generic_session_sync,
  	};
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..6d1449810 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -36,6 +36,16 @@ extern "C" {
  
  struct lua_State;
  
+/**
+ * Encode a single value on top of the stack into YAML document
+ * tagged as push message.
+ * @param object Any lua object on top of the stack.
+ * @retval nil, error Error occured.
+ * @retval not nil Tagged YAML document.
+ */
+int
+console_encode_push(struct lua_State *L);
+
  void
  tarantool_lua_console_init(struct lua_State *L);
  
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index bc4e02bfc..574882188 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -11,6 +11,7 @@ local yaml = require('yaml')
  local net_box = require('net.box')
  
  local YAML_TERM = '\n...\n'
+local PUSH_TAG_HANDLE = '!push!'
  
  local function format(status, ...)
      local err
@@ -92,13 +93,25 @@ local text_connection_mt = {
          --
          eval = function(self, text)
              text = text..'$EOF$\n'
-            if self:write(text) then
+            if not self:write(text) then
+                error(self:set_error())
+            end
+            while true do
                  local rc = self:read()
-                if rc then
+                if not rc then
+                    break
+                end
+                local handle, prefix = yaml.decode_tag(rc)
+                if not handle then
+                    -- Can not fail - tags are encoded with no
+                    -- user participation and are correct always.
                      return rc
                  end
+                if handle == PUSH_TAG_HANDLE and self.print_f then
+                    self.print_f(rc)
+                end
              end
-            error(self:set_error())
+            return rc
          end,
          --
          -- Make the connection be in error state, set error
@@ -121,15 +134,18 @@ local text_connection_mt = {
  -- netbox-like object.
  -- @param connection Socket to wrap.
  -- @param url Parsed destination URL.
+-- @param print_f Function to print push messages.
+--
  -- @retval nil, err Error, and err contains an error message.
  -- @retval  not nil Netbox-like object.
  --
-local function wrap_text_socket(connection, url)
+local function wrap_text_socket(connection, url, print_f)
      local conn = setmetatable({
          _socket = connection,
          state = 'active',
          host = url.host or 'localhost',
          port = url.service,
+        print_f = print_f,
      }, text_connection_mt)
      if not conn:write('require("console").delimiter("$EOF$")\n') or
         not conn:read() then
@@ -369,7 +385,8 @@ local function connect(uri, opts)
      end
      local remote
      if greeting.protocol == 'Lua console' then
-        remote = wrap_text_socket(connection, u)
+        remote = wrap_text_socket(connection, u,
+                                  function(msg) self:print(msg) end)
      else
          opts = {
              connect_timeout = opts.timeout,
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 05010c4c3..ade85491f 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -41,6 +41,8 @@
  #include "box/session.h"
  #include "box/user.h"
  #include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
  
  static const char *sessionlib_name = "box.session";
  
@@ -355,6 +357,52 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
  	return 3;
  }
  
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+	const struct port_vtab *vtab;
+	/**
+	 * Lua state, containing data to dump on top of the stack.
+	 */
+	struct lua_State *L;
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+       .dump_msgpack = NULL,
+       /*
+        * Dump_16 has no sense, since push appears since 1.10
+        * protocol.
+        */
+       .dump_msgpack_16 = NULL,
+       .dump_plain = lua_push_port_dump_plain,
+       .destroy = NULL,
+};
+
+static const char *
+lua_push_port_dump_plain(struct port *port, uint32_t *size)
+{
+	struct lua_push_port *lua_port = (struct lua_push_port *) port;
+	assert(lua_port->vtab == &lua_push_port_vtab);
+	struct lua_State *L = lua_port->L;
+	int rc = console_encode_push(L);
+	if (rc == 2) {
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		return NULL;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	return result;
+}
+
  /**
   * Push a message using a protocol, depending on a session type.
   * @param L Lua state. First argument on the stack is data to
@@ -368,7 +416,11 @@ lbox_session_push(struct lua_State *L)
  	if (lua_gettop(L) != 1)
  		return luaL_error(L, "Usage: box.session.push(data)");
  
-	if (session_push(current_session(), NULL) != 0) {
+	struct lua_push_port port;
+	port.vtab = &lua_push_port_vtab;
+	port.L = L;
+
+	if (session_push(current_session(), (struct port *) &port) != 0) {
  		lua_pushnil(L);
  		luaT_pusherror(L, box_error_last());
  		return 2;
diff --git a/src/box/port.c b/src/box/port.c
index 255eb732c..f9b655840 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out)
  	return port->vtab->dump_msgpack_16(port, out);
  }
  
+const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+	return port->vtab->dump_plain(port, size);
+}
+
  void
  port_init(void)
  {
diff --git a/src/box/port.h b/src/box/port.h
index 1c44b9b00..7fc1b8972 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
  	 * 1.6 format.
  	 */
  	int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+	/**
+	 * Dump a port content as a plain text into a buffer,
+	 * allocated inside.
+	 */
+	const char *(*dump_plain)(struct port *port, uint32_t *size);
  	/**
  	 * Destroy a port and release associated resources.
  	 */
@@ -158,6 +163,18 @@ port_dump_msgpack(struct port *port, struct obuf *out);
  int
  port_dump_msgpack_16(struct port *port, struct obuf *out);
  
+/**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+const char *
+port_dump_plain(struct port *port, uint32_t *size);
+
  void
  port_init(void);
  
diff --git a/src/diag.h b/src/diag.h
index dc6c132d5..4fcbab5c3 100644
--- a/src/diag.h
+++ b/src/diag.h
@@ -249,6 +249,9 @@ struct error *
  BuildSystemError(const char *file, unsigned line, const char *format, ...);
  struct error *
  BuildXlogError(const char *file, unsigned line, const char *format, ...);
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...);
  
  struct index_def;
  
diff --git a/src/sio.cc b/src/sio.cc
index c906a97a8..8d71f0382 100644
--- a/src/sio.cc
+++ b/src/sio.cc
@@ -67,6 +67,22 @@ SocketError::SocketError(const char *file, unsigned line, int fd,
  	errno = save_errno;
  }
  
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...)
+{
+	try {
+		SocketError *e = new SocketError(file, line, fd, "");
+		va_list ap;
+		va_start(ap, format);
+		error_vformat_msg(e, format, ap);
+		va_end(ap);
+		return e;
+	} catch (OutOfMemory *e) {
+		return e;
+	}
+}
+
  /** Pretty print socket name and peer (for exceptions) */
  const char *
  sio_socketname(int fd)
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index a5b3061a9..237b3d002 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
  
  test = tap.test("console")
  
-test:plan(60)
+test:plan(61)
  
  -- Start console and connect to it
  local server = console.listen(CONSOLE_SOCKET)
@@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console")
  -- gh-2677: box.session.push, text protocol support.
  --
  client:write('box.session.push(200)\n')
-test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n',
+        "pushed message")
+test:is(client:read(EOL), '---\n- true\n...\n', "pushed message")
  
  -- Execute some command
  client:write("1\n")



More information about the Tarantool-patches mailing list