Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v3 0/4] box.session.push
@ 2018-06-01 20:55 Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Vladislav Shpilevoy
                   ` (3 more replies)
  0 siblings, 4 replies; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-01 20:55 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

Branch: http://github.com/tarantool/tarantool/tree/gh-2677-box-session-push
Issue: https://github.com/tarantool/tarantool/issues/2677

Sometimes it is needed to send some intermediate results from long poll request.
Or to create a watcher on a server side, which listens for some events and sends
notifications to a client. Or split a big response into multiple ones, as it is
described in Tarantool Wire Protocol RFC.

IPROTO_CHUNK protocol command solves all of the problems. IPROTO_CHUNK is a
non-final response, and a single request can produce multiple response chunks.
Box.session.push is API to send a chunk. It takes a Lua object and sends it to
a client using the way depending of a session type: console or binary.

Console session pushes are YAML documents tagged with using YAML %TAG feature -
push message is tagged with !push! tag. For example:
tarantool> box.session.push({a = 100, b = 200})
%TAG !push! tag:tarantool.io/push,2018
---       <--------- Here pushed message starts.
a: 100
b: 200
...
---       <--------- Here push() call result starts.
- true
...
It is not the same as future console.print() pushes - they are not linked with
chunks.

Binary push is an IProto response with IPROTO_CHUNK key in the REQUEST_TYPE
header field. In the rest it has the same structure as any another response.

IPROTO_CHUNK response must have the same sync as a request, and to avoid
specifying it in the box.session.push() API, that is ugly, the request sync is
stored in the local storage of the fiber that executes the request. Sync key
does not extend the storage size - instead it shares one of storage cells with
another fiber local variable, that is never used in IProto sessions.

Vladislav Shpilevoy (4):
  session: introduce text box.session.push
  session: enable box.session.push in local console
  session: introduce binary box.session.push
  netbox: introduce iterable future objects

 src/box/iproto.cc             | 241 +++++++++++++++++-----
 src/box/iproto_constants.h    |   3 +
 src/box/lua/call.c            |   4 +
 src/box/lua/console.c         |  75 ++++++-
 src/box/lua/console.lua       |  29 ++-
 src/box/lua/net_box.lua       | 188 ++++++++++++++---
 src/box/lua/session.c         |   6 +-
 src/box/port.c                |   7 +
 src/box/port.h                |  17 ++
 src/box/xrow.c                |  12 ++
 src/box/xrow.h                |  12 ++
 src/diag.h                    |   3 +
 src/fiber.h                   |  14 +-
 src/sio.cc                    |  16 ++
 test/app-tap/console.test.lua |   6 +-
 test/box/net.box.result       |   8 +-
 test/box/net.box.test.lua     |   8 +-
 test/box/push.result          | 461 ++++++++++++++++++++++++++++++++++++++++--
 test/box/push.test.lua        | 227 +++++++++++++++++++--
 19 files changed, 1218 insertions(+), 119 deletions(-)

-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push
  2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
@ 2018-06-01 20:55 ` Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 2/4] session: enable box.session.push in local console Vladislav Shpilevoy
                   ` (2 subsequent siblings)
  3 siblings, 0 replies; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-01 20:55 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

box.session.push allows to send some intermediate results in the
scope of main request with no finalizing it. Messages can be
sent over text and binary protocol. This patch allows to send
text pushes.

Text push is a YAML document tagged with '!push!' handle and
'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way
to define a type of the document.

Console received push message just prints it to the stdout (or
sends to a next console, if it is remote console too).

Part of #2677
---
 src/box/lua/call.c            |  4 +++
 src/box/lua/console.c         | 74 ++++++++++++++++++++++++++++++++++++++++++-
 src/box/lua/console.lua       | 27 +++++++++++++---
 src/box/lua/session.c         |  6 ++--
 src/box/port.c                |  7 ++++
 src/box/port.h                | 17 ++++++++++
 src/diag.h                    |  3 ++
 src/sio.cc                    | 16 ++++++++++
 test/app-tap/console.test.lua |  6 ++--
 9 files changed, 150 insertions(+), 10 deletions(-)

diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index 831583d26..e682820df 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -412,9 +412,13 @@ port_lua_destroy(struct port *base)
 	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, port->ref);
 }
 
+extern const char *
+port_lua_dump_plain(struct port *port, uint32_t *size);
+
 static const struct port_vtab port_lua_vtab = {
 	.dump_msgpack = port_lua_dump,
 	.dump_msgpack_16 = port_lua_dump_16,
+	.dump_plain = port_lua_dump_plain,
 	.destroy = port_lua_destroy,
 };
 
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index fea96bc05..49065a53c 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -31,6 +31,8 @@
 
 #include "box/lua/console.h"
 #include "box/session.h"
+#include "box/port.h"
+#include "box/error.h"
 #include "lua/utils.h"
 #include "lua/fiber.h"
 #include "fiber.h"
@@ -366,6 +368,76 @@ console_session_fd(struct session *session)
 	return session->meta.fd;
 }
 
+/**
+ * Dump port lua data as a YAML document tagged with !push! global
+ * tag.
+ * @param port Port lua.
+ * @param[out] size Size of the result.
+ *
+ * @retval not NULL Tagged YAML document.
+ * @retval NULL Error.
+ */
+const char *
+port_lua_dump_plain(struct port *port, uint32_t *size)
+{
+	struct port_lua *port_lua = (struct port_lua *) port;
+	struct lua_State *L = port_lua->L;
+	int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!",
+				 "tag:tarantool.io/push,2018");
+	if (rc == 2) {
+		/*
+		 * Nil and error object are pushed onto the stack.
+		 */
+		assert(lua_isnil(L, -2));
+		assert(lua_isstring(L, -1));
+		diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1));
+		return NULL;
+	}
+	assert(rc == 1);
+	assert(lua_isstring(L, -1));
+	size_t len;
+	const char *result = lua_tolstring(L, -1, &len);
+	*size = (uint32_t) len;
+	return result;
+}
+
+/**
+ * Push a tagged YAML document into a console socket.
+ * @param session Console session.
+ * @param port Port with YAML to push.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static int
+console_session_push(struct session *session, struct port *port)
+{
+	assert(session_vtab_registry[session->type].push ==
+	       console_session_push);
+	uint32_t text_len;
+	const char *text = port_dump_plain(port, &text_len);
+	if (text == NULL)
+		return -1;
+	int fd = session_fd(session);
+	while (text_len > 0) {
+		ssize_t rc = write(fd, text, text_len);
+		if (rc < 0) {
+			if (errno == EAGAIN || errno == EWOULDBLOCK) {
+				while (coio_wait(fd, COIO_WRITE,
+						 TIMEOUT_INFINITY) !=
+				       COIO_WRITE);
+			} else if (errno != EINTR) {
+				diag_set(SocketError, fd, strerror(errno));
+				return -1;
+			}
+		} else {
+			text_len -= (uint32_t) rc;
+			text += rc;
+		}
+	}
+	return 0;
+}
+
 void
 tarantool_lua_console_init(struct lua_State *L)
 {
@@ -400,7 +472,7 @@ tarantool_lua_console_init(struct lua_State *L)
 	 */
 	lua_setfield(L, -2, "formatter");
 	struct session_vtab console_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ console_session_push,
 		/* .fd = */ console_session_fd,
 		/* .sync = */ generic_session_sync,
 	};
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index bc4e02bfc..6271b416b 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -11,6 +11,7 @@ local yaml = require('yaml')
 local net_box = require('net.box')
 
 local YAML_TERM = '\n...\n'
+local PUSH_TAG_HANDLE = '!push!'
 
 local function format(status, ...)
     local err
@@ -92,13 +93,25 @@ local text_connection_mt = {
         --
         eval = function(self, text)
             text = text..'$EOF$\n'
-            if self:write(text) then
+            if not self:write(text) then
+                error(self:set_error())
+            end
+            while true do
                 local rc = self:read()
-                if rc then
+                if not rc then
+                    break
+                end
+                local handle, prefix = yaml.decode(rc, {tag_only = true})
+                if not handle then
+                    -- Can not fail - tags are encoded with no
+                    -- user participation and are correct always.
                     return rc
                 end
+                if handle == PUSH_TAG_HANDLE and self.print_f then
+                    self.print_f(rc)
+                end
             end
-            error(self:set_error())
+            return rc
         end,
         --
         -- Make the connection be in error state, set error
@@ -121,15 +134,18 @@ local text_connection_mt = {
 -- netbox-like object.
 -- @param connection Socket to wrap.
 -- @param url Parsed destination URL.
+-- @param print_f Function to print push messages.
+--
 -- @retval nil, err Error, and err contains an error message.
 -- @retval  not nil Netbox-like object.
 --
-local function wrap_text_socket(connection, url)
+local function wrap_text_socket(connection, url, print_f)
     local conn = setmetatable({
         _socket = connection,
         state = 'active',
         host = url.host or 'localhost',
         port = url.service,
+        print_f = print_f,
     }, text_connection_mt)
     if not conn:write('require("console").delimiter("$EOF$")\n') or
        not conn:read() then
@@ -369,7 +385,8 @@ local function connect(uri, opts)
     end
     local remote
     if greeting.protocol == 'Lua console' then
-        remote = wrap_text_socket(connection, u)
+        remote = wrap_text_socket(connection, u,
+                                  function(msg) self:print(msg) end)
     else
         opts = {
             connect_timeout = opts.timeout,
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 05010c4c3..627f62f59 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -41,6 +41,7 @@
 #include "box/session.h"
 #include "box/user.h"
 #include "box/schema.h"
+#include "box/port.h"
 
 static const char *sessionlib_name = "box.session";
 
@@ -367,8 +368,9 @@ lbox_session_push(struct lua_State *L)
 {
 	if (lua_gettop(L) != 1)
 		return luaL_error(L, "Usage: box.session.push(data)");
-
-	if (session_push(current_session(), NULL) != 0) {
+	struct port port;
+	port_lua_create(&port, L);
+	if (session_push(current_session(), &port) != 0) {
 		lua_pushnil(L);
 		luaT_pusherror(L, box_error_last());
 		return 2;
diff --git a/src/box/port.c b/src/box/port.c
index 255eb732c..03ca323d7 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out)
 	return port->vtab->dump_msgpack_16(port, out);
 }
 
+const char *
+port_dump_plain(struct port *port, uint32_t *size)
+{
+	return port->vtab->dump_plain(port, size);
+}
+
 void
 port_init(void)
 {
@@ -159,5 +165,6 @@ port_free(void)
 const struct port_vtab port_tuple_vtab = {
 	.dump_msgpack = port_tuple_dump_msgpack,
 	.dump_msgpack_16 = port_tuple_dump_msgpack_16,
+	.dump_plain = NULL,
 	.destroy = port_tuple_destroy,
 };
diff --git a/src/box/port.h b/src/box/port.h
index 6f1e5f364..882bb3791 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -77,6 +77,11 @@ struct port_vtab {
 	 * 1.6 format.
 	 */
 	int (*dump_msgpack_16)(struct port *port, struct obuf *out);
+	/**
+	 * Dump a port content as a plain text into a buffer,
+	 * allocated inside.
+	 */
+	const char *(*dump_plain)(struct port *port, uint32_t *size);
 	/**
 	 * Destroy a port and release associated resources.
 	 */
@@ -179,6 +184,18 @@ port_dump_msgpack(struct port *port, struct obuf *out);
 int
 port_dump_msgpack_16(struct port *port, struct obuf *out);
 
+/**
+ * Dump a port content as a plain text into a buffer,
+ * allocated inside.
+ * @param port Port with data to dump.
+ * @param[out] size Length of a result plain text.
+ *
+ * @retval nil Error.
+ * @retval not nil Plain text.
+ */
+const char *
+port_dump_plain(struct port *port, uint32_t *size);
+
 void
 port_init(void);
 
diff --git a/src/diag.h b/src/diag.h
index bd5a539b0..24b6383b6 100644
--- a/src/diag.h
+++ b/src/diag.h
@@ -251,6 +251,9 @@ struct error *
 BuildXlogError(const char *file, unsigned line, const char *format, ...);
 struct error *
 BuildCollationError(const char *file, unsigned line, const char *format, ...);
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...);
 
 struct index_def;
 
diff --git a/src/sio.cc b/src/sio.cc
index c906a97a8..8d71f0382 100644
--- a/src/sio.cc
+++ b/src/sio.cc
@@ -67,6 +67,22 @@ SocketError::SocketError(const char *file, unsigned line, int fd,
 	errno = save_errno;
 }
 
+struct error *
+BuildSocketError(const char *file, unsigned line, int fd, const char *format,
+		 ...)
+{
+	try {
+		SocketError *e = new SocketError(file, line, fd, "");
+		va_list ap;
+		va_start(ap, format);
+		error_vformat_msg(e, format, ap);
+		va_end(ap);
+		return e;
+	} catch (OutOfMemory *e) {
+		return e;
+	}
+}
+
 /** Pretty print socket name and peer (for exceptions) */
 const char *
 sio_socketname(int fd)
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index a5b3061a9..237b3d002 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(60)
+test:plan(61)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console")
 -- gh-2677: box.session.push, text protocol support.
 --
 client:write('box.session.push(200)\n')
-test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n',
+        "pushed message")
+test:is(client:read(EOL), '---\n- true\n...\n', "pushed message")
 
 -- Execute some command
 client:write("1\n")
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] [PATCH v3 2/4] session: enable box.session.push in local console
  2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Vladislav Shpilevoy
@ 2018-06-01 20:55 ` Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects Vladislav Shpilevoy
  3 siblings, 0 replies; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-01 20:55 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

It is quite simple - just use stdout file descriptor as the
destination for push messages. It is needed to make remote and
local console be similar.
---
 src/box/lua/console.c   | 1 +
 src/box/lua/console.lua | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 49065a53c..edf8ad480 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -477,6 +477,7 @@ tarantool_lua_console_init(struct lua_State *L)
 		/* .sync = */ generic_session_sync,
 	};
 	session_vtab_registry[SESSION_TYPE_CONSOLE] = console_session_vtab;
+	session_vtab_registry[SESSION_TYPE_REPL] = console_session_vtab;
 }
 
 /*
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index 6271b416b..a822a7fd9 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -353,7 +353,7 @@ local function start()
         self.history_file = home_dir .. '/.tarantool_history'
         internal.load_history(self.history_file)
     end
-    session_internal.create(-1, "repl")
+    session_internal.create(1, "repl")
     repl(self)
     started = false
 end
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push
  2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Vladislav Shpilevoy
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 2/4] session: enable box.session.push in local console Vladislav Shpilevoy
@ 2018-06-01 20:55 ` Vladislav Shpilevoy
  2018-06-07 12:53   ` [tarantool-patches] " Konstantin Osipov
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects Vladislav Shpilevoy
  3 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-01 20:55 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

Box.session.push() allows to send a message to a client with no
finishing a main request. Tarantool after this patch supports
pushes over binary protocol.

IProto message is encoded using a new header code - IPROTO_CHUNK.
Push works as follows: a user calls box.session.push(message).
The message is encoded into currently active obuf in TX thread,
and then Kharon notifies IProto thread about new data.

Originally Kharon is the ferryman of Hades who carries souls of
the newly deceased across the rivers Styx and Acheron that
divided the world of the living from the world of the dead. In
Tarantool Kharon is a message and does the similar work. It
notifies IProto thread about new data in an output buffer
carrying pushed messages to IProto. Styx here is cpipe, and the
boat is cbus message.

One connection has single Kharon for all pushes. But Kharon can
not be in two places at the time. So once he got away from TX to
IProto, new messages can not send Kharon. They just set a special
flag. When Kharon is back to TX and sees the flag is set, he
immediately takes the road back to IProto.

Herewith a user is not blocked to write to obuf when Kharon is
busy. The user just updates obuf and set the flag if not set.
There is no waiting for Kharon arrival back.

Closes #2677
---
 src/box/iproto.cc          | 241 +++++++++++++++++++++++++++++++--------
 src/box/iproto_constants.h |   3 +
 src/box/lua/net_box.lua    | 104 ++++++++++++-----
 src/box/xrow.c             |  12 ++
 src/box/xrow.h             |  12 ++
 src/fiber.h                |  14 ++-
 test/box/net.box.result    |   8 +-
 test/box/net.box.test.lua  |   8 +-
 test/box/push.result       | 274 ++++++++++++++++++++++++++++++++++++++++++---
 test/box/push.test.lua     | 148 ++++++++++++++++++++++--
 10 files changed, 719 insertions(+), 105 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 76844555d..a1bbdb375 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -66,6 +66,44 @@
 
 enum { IPROTO_SALT_SIZE = 32 };
 
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+	struct obuf *obuf;
+	struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+	wpos->obuf = out;
+	wpos->svp = obuf_create_svp(out);
+}
+
+/**
+ * Kharon is the ferryman of Hades who carries souls of the newly
+ * deceased across the rivers Styx and Acheron that divided the
+ * world of the living from the world of the dead.
+ * Here Kharon is a message and does the similar work. It notifies
+ * IProto thread about new data in an output buffer carrying
+ * pushed messages to IProto. Styx here is cpipe, and the boat is
+ * cbus message.
+ */
+struct kharon {
+	struct cmsg base;
+	/**
+	 * Before sending to IProto thread, the wpos is set to a
+	 * current position in an output buffer. Before IProto
+	 * returns the message to TX, it sets wpos to the last
+	 * flushed position (works like iproto_msg.wpos).
+	 */
+	struct iproto_wpos wpos;
+};
+
 /**
  * Network readahead. A signed integer to avoid
  * automatic type coercion to an unsigned type.
@@ -113,24 +151,6 @@ iproto_reset_input(struct ibuf *ibuf)
 	}
 }
 
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
-	struct obuf *obuf;
-	struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
-	wpos->obuf = out;
-	wpos->svp = obuf_create_svp(out);
-}
-
 /* {{{ iproto_msg - declaration */
 
 /**
@@ -365,6 +385,48 @@ struct iproto_connection
 		/** Pointer to the current output buffer. */
 		struct obuf *p_obuf;
 	} tx;
+	/**
+	 * Kharon serves to notify IProto about new data in obuf
+	 * when TX thread is initiator of the transfer. It is
+	 * possible for example on box.session.push.
+	 *
+	 * But Kharon can not be in two places at the time. So
+	 * once he got away from TX to IProto, new messages can
+	 * not send Kharon. They just set a special flag:
+	 * has_new_pushes. When Kharon is back to TX and sees the
+	 * flag is set, he immediately takes the road back to
+	 * IProto.
+	 *
+	 * Herewith a user is not blocked to write to obuf when
+	 * Kharon is busy. The user just updates obuf and set the
+	 * flag if not set. There is no waiting for Kharon arrival
+	 * back.
+	 *
+	 * Kharon allows to
+	 * 1) avoid multiple cbus messages from one session in
+	 *    fly,
+	 * 2) do not block a new push() until a previous push() is
+	 *    finished.
+	 *
+	 *      IProto                           TX
+	 * -------------------------------------------------------
+	 *                                 + [push message]
+	 *    start socket <--- notification ----
+	 *       write
+	 *                                 + [push message]
+	 *                                 + [push message]
+	 *                                       ...
+	 *      end socket
+	 *        write    ----------------> check for new
+	 *                                   pushes - found
+	 *                 <--- notification ---
+	 *                      ....
+	 */
+	bool has_new_pushes;
+	/** True if Kharon is on the way. */
+	bool is_kharon_on_road;
+	/** Push notification for IProto thread. */
+	struct kharon kharon;
 	/** Authentication salt. */
 	char salt[IPROTO_SALT_SIZE];
 };
@@ -882,6 +944,7 @@ iproto_connection_new(int fd)
 		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
 		return NULL;
 	}
+	memset(con, 0, sizeof(*con));
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -894,9 +957,6 @@ iproto_connection_new(int fd)
 	con->tx.p_obuf = &con->obuf[0];
 	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
 	iproto_wpos_create(&con->wend, con->tx.p_obuf);
-	con->parse_size = 0;
-	con->long_poll_requests = 0;
-	con->session = NULL;
 	rlist_create(&con->in_stop_list);
 	/* It may be very awkward to allocate at close. */
 	cmsg_init(&con->disconnect, disconnect_route);
@@ -1084,9 +1144,9 @@ error:
 }
 
 static void
-tx_fiber_init(struct session *session, uint64_t sync)
+tx_fiber_init(struct session *session, uint64_t *sync)
 {
-	session->meta.sync = sync;
+	session->meta.sync = sync != NULL ? *sync : 0;
 	/*
 	 * We do not cleanup fiber keys at the end of each request.
 	 * This does not lead to privilege escalation as long as
@@ -1099,6 +1159,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
 	 */
 	fiber_set_session(fiber(), session);
 	fiber_set_user(fiber(), &session->credentials);
+	fiber_set_key(fiber(), FIBER_KEY_SYNC, (void *) sync);
 }
 
 /**
@@ -1112,7 +1173,7 @@ tx_process_disconnect(struct cmsg *m)
 	struct iproto_connection *con =
 		container_of(m, struct iproto_connection, disconnect);
 	if (con->session) {
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
 		if (! rlist_empty(&session_on_disconnect))
 			session_run_on_disconnect_triggers(con->session);
 		session_destroy(con->session);
@@ -1185,15 +1246,16 @@ tx_discard_input(struct iproto_msg *msg)
  *   not, the empty buffer is selected.
  * - if neither of the buffers are empty, the function
  *   does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ *        thread.
  */
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	struct iproto_connection *con = msg->connection;
-
 	struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
-	if (msg->wpos.obuf == con->tx.p_obuf) {
+	if (wpos->obuf == con->tx.p_obuf) {
 		/*
 		 * We got a message advancing the buffer which
 		 * is being appended to. The previous buffer is
@@ -1211,6 +1273,14 @@ tx_accept_msg(struct cmsg *m)
 		 */
 		con->tx.p_obuf = prev;
 	}
+}
+
+static inline struct iproto_msg *
+tx_accept_msg(struct cmsg *m)
+{
+	struct iproto_msg *msg = (struct iproto_msg *) m;
+	tx_accept_wpos(msg->connection, &msg->wpos);
+	tx_fiber_init(msg->connection->session, &msg->header.sync);
 	return msg;
 }
 
@@ -1255,8 +1325,6 @@ static void
 tx_process1(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1289,9 +1357,6 @@ tx_process_select(struct cmsg *m)
 	int count;
 	int rc;
 	struct request *req = &msg->dml;
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1339,9 +1404,6 @@ static void
 tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1423,9 +1485,6 @@ tx_process_misc(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
-
-	tx_fiber_init(con->session, msg->header.sync);
-
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1463,9 +1522,6 @@ tx_process_join_subscribe(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
-
-	tx_fiber_init(con->session, msg->header.sync);
-
 	try {
 		switch (msg->header.type) {
 		case IPROTO_JOIN:
@@ -1582,7 +1638,7 @@ tx_process_connect(struct cmsg *m)
 		if (con->session == NULL)
 			diag_raise();
 		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
 		static __thread char greeting[IPROTO_GREETING_SIZE];
 		/* TODO: dirty read from tx thread */
 		struct tt_uuid uuid = INSTANCE_UUID;
@@ -1741,6 +1797,99 @@ iproto_session_sync(struct session *session)
 	return session->meta.sync;
 }
 
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param conn IProto connection.
+ */
+static void
+tx_begin_push(struct iproto_connection *conn);
+
+/**
+ * Kharon reached the dead world (IProto). He schedules an event
+ * to flush new obuf data up to brought wpos.
+ * @param m Kharon.
+ */
+static void
+kharon_go_dead(struct cmsg *m)
+{
+	struct kharon *kharon = (struct kharon *) m;
+	struct iproto_connection *conn =
+		container_of(kharon, struct iproto_connection, kharon);
+	conn->wend = kharon->wpos;
+	kharon->wpos = conn->wpos;
+	if (evio_has_fd(&conn->output) && !ev_is_active(&conn->output))
+		ev_feed_event(conn->loop, &conn->output, EV_WRITE);
+}
+
+/**
+ * Kharon is in living world (TX) back from dead one (IProto). He
+ * checks if new push messages appeared during its trip and goes
+ * to IProto again if needed.
+ * @param m Kharon.
+ */
+static void
+kharon_go_alive(struct cmsg *m)
+{
+	struct kharon *kharon = (struct kharon *) m;
+	struct iproto_connection *conn =
+		container_of(kharon, struct iproto_connection, kharon);
+	tx_accept_wpos(conn, &kharon->wpos);
+	conn->is_kharon_on_road = false;
+	if (conn->has_new_pushes)
+		tx_begin_push(conn);
+}
+
+static const struct cmsg_hop push_route[] = {
+	{ kharon_go_dead, &tx_pipe },
+	{ kharon_go_alive, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_connection *conn)
+{
+	assert(! conn->is_kharon_on_road);
+	cmsg_init((struct cmsg *) &conn->kharon, push_route);
+	iproto_wpos_create(&conn->kharon.wpos, conn->tx.p_obuf);
+	conn->has_new_pushes = false;
+	conn->is_kharon_on_road = true;
+	cpipe_push(&net_pipe, (struct cmsg *) &conn->kharon);
+}
+
+/**
+ * Push a message from @a port to a remote client.
+ * @param session IProto session.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success, a message is wrote to an output buffer. But
+ *          it is not guaranteed, that it will be sent
+ *          successfully.
+ */
+static int
+iproto_session_push(struct session *session, struct port *port)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *) session->meta.connection;
+	struct obuf_svp svp;
+	if (iproto_prepare_select(con->tx.p_obuf, &svp) != 0)
+		return -1;
+	if (port_dump_msgpack(port, con->tx.p_obuf) < 0) {
+		obuf_rollback_to_svp(con->tx.p_obuf, &svp);
+		return -1;
+	}
+	iproto_reply_chunk(con->tx.p_obuf, &svp, fiber_sync(fiber()),
+			   ::schema_version);
+	if (! con->is_kharon_on_road)
+		tx_begin_push(con);
+	else
+		con->has_new_pushes = true;
+	return 0;
+}
+
+/** }}} */
+
 /** Initialize the iproto subsystem and start network io thread */
 void
 iproto_init()
@@ -1754,7 +1903,7 @@ iproto_init()
 	cpipe_create(&net_pipe, "net");
 	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
 	struct session_vtab iproto_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ iproto_session_push,
 		/* .fd = */ iproto_session_fd,
 		/* .sync = */ iproto_session_sync,
 	};
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 46d47719b..dd50bf874 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -164,6 +164,9 @@ enum iproto_type {
 	/** Vinyl row index stored in .run file */
 	VY_RUN_ROW_INDEX = 102,
 
+	/** Non-final response type. */
+	IPROTO_CHUNK = 128,
+
 	/**
 	 * Error codes = (IPROTO_TYPE_ERROR | ER_XXX from errcode.h)
 	 */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8fcaf89e8..fe113e740 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -39,6 +39,8 @@ local IPROTO_SCHEMA_VERSION_KEY = 0x05
 local IPROTO_DATA_KEY      = 0x30
 local IPROTO_ERROR_KEY     = 0x31
 local IPROTO_GREETING_SIZE = 128
+local IPROTO_CHUNK_KEY     = 128
+local IPROTO_OK_KEY        = 0
 
 -- select errors from box.error
 local E_UNKNOWN              = box.error.UNKNOWN
@@ -74,6 +76,10 @@ local function decode_count(raw_data)
     local response, raw_end = decode(raw_data)
     return response[IPROTO_DATA_KEY][1], raw_end
 end
+local function decode_push(raw_data)
+    local response, raw_end = decode(raw_data)
+    return response[IPROTO_DATA_KEY][1], raw_end
+end
 
 local method_encoder = {
     ping    = internal.encode_ping,
@@ -114,6 +120,7 @@ local method_decoder = {
     max     = decode_get,
     count   = decode_count,
     inject  = decode_data,
+    push    = decode_push,
 }
 
 local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
@@ -149,6 +156,12 @@ local function establish_connection(host, port, timeout)
     return s, greeting
 end
 
+--
+-- Default action on push during a synchronous request -
+-- ignore.
+--
+local function on_push_sync_default(...) end
+
 --
 -- Basically, *transport* is a TCP connection speaking one of
 -- Tarantool network protocols. This is a low-level interface.
@@ -397,7 +410,8 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Future object.
     --
-    local function perform_async_request(buffer, method, ...)
+    local function perform_async_request(buffer, method, on_push, on_push_ctx,
+                                         ...)
         if state ~= 'active' and state ~= 'fetch_schema' then
             return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
                                        reason = last_error})
@@ -410,14 +424,17 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_encoder[method](send_buf, id, ...)
         next_request_id = next_id(id)
-        -- Request has maximum 6 members:
-        -- method, buffer, id, cond, errno, response.
-        local request = setmetatable(table_new(0, 6), request_mt)
+        -- Request in most cases has maximum 8 members:
+        -- method, buffer, id, cond, errno, response, on_push,
+        -- on_push_ctx.
+        local request = setmetatable(table_new(0, 8), request_mt)
         request.method = method
         request.buffer = buffer
         request.id = id
         request.cond = fiber.cond()
         requests[id] = request
+        request.on_push = on_push
+        request.on_push_ctx = on_push_ctx
         return request
     end
 
@@ -426,9 +443,10 @@ local function create_transport(host, port, user, password, callback,
     -- @retval nil, error Error occured.
     -- @retval not nil Response object.
     --
-    local function perform_request(timeout, buffer, method, ...)
+    local function perform_request(timeout, buffer, method, on_push,
+                                   on_push_ctx, ...)
         local request, err =
-            perform_async_request(buffer, method, ...)
+            perform_async_request(buffer, method, on_push, on_push_ctx, ...)
         if not request then
             return nil, err
         end
@@ -441,13 +459,13 @@ local function create_transport(host, port, user, password, callback,
         if request == nil then -- nobody is waiting for the response
             return
         end
-        requests[id] = nil
-        request.id = nil
         local status = hdr[IPROTO_STATUS_KEY]
         local body, body_end_check
 
-        if status ~= 0 then
+        if status > IPROTO_CHUNK_KEY then
             -- Handle errors
+            requests[id] = nil
+            request.id = nil
             body, body_end_check = decode(body_rpos)
             assert(body_end == body_end_check, "invalid xrow length")
             request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -462,16 +480,33 @@ local function create_transport(host, port, user, password, callback,
             local body_len = body_end - body_rpos
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
-            request.response = tonumber(body_len)
+            body_len = tonumber(body_len)
+            if status == IPROTO_OK_KEY then
+                request.response = body_len
+                requests[id] = nil
+                request.id = nil
+            else
+                request.on_push(request.on_push_ctx, body_len)
+            end
             request.cond:broadcast()
             return
         end
 
-        -- Decode xrow.body[DATA] to Lua objects
         local real_end
-        request.response, real_end, request.errno =
-            method_decoder[request.method](body_rpos, body_end)
-        assert(real_end == body_end, "invalid body length")
+        -- Decode xrow.body[DATA] to Lua objects
+        if status == IPROTO_OK_KEY then
+            request.response, real_end, request.errno =
+                method_decoder[request.method](body_rpos, body_end)
+            assert(real_end == body_end, "invalid body length")
+            requests[id] = nil
+            request.id = nil
+        else
+            local msg
+            msg, real_end, request.errno =
+                method_decoder.push(body_rpos, body_end)
+            assert(real_end == body_end, "invalid body length")
+            request.on_push(request.on_push_ctx, msg)
+        end
         request.cond:broadcast()
     end
 
@@ -947,25 +982,40 @@ end
 
 function remote_methods:_request(method, opts, ...)
     local transport = self._transport
-    local buffer = opts and opts.buffer
-    if opts and opts.is_async then
-        return transport.perform_async_request(buffer, method, ...)
-    end
-    local deadline
-    if opts and opts.timeout then
-        -- conn.space:request(, { timeout = timeout })
-        deadline = fiber_clock() + opts.timeout
+    local on_push, on_push_ctx, buffer, deadline
+    -- Extract options, set defaults, check if the request is
+    -- async.
+    if opts then
+        buffer = opts.buffer
+        if opts.is_async then
+            if opts.on_push or opts.on_push_ctx then
+                error('To handle pushes in an async request use future:pairs()')
+            end
+            return transport.perform_async_request(buffer, method, table.insert,
+                                                   {}, ...)
+        end
+        if opts.timeout then
+            -- conn.space:request(, { timeout = timeout })
+            deadline = fiber_clock() + opts.timeout
+        else
+            -- conn:timeout(timeout).space:request()
+            -- @deprecated since 1.7.4
+            deadline = self._deadlines[fiber_self()]
+        end
+        on_push = opts.on_push or on_push_sync_default
+        on_push_ctx = opts.on_push_ctx
     else
-        -- conn:timeout(timeout).space:request()
-        -- @deprecated since 1.7.4
         deadline = self._deadlines[fiber_self()]
+        on_push = on_push_sync_default
     end
+    -- Execute synchronous request.
     local timeout = deadline and max(0, deadline - fiber_clock())
     if self.state ~= 'active' then
         transport.wait_state('active', timeout)
         timeout = deadline and max(0, deadline - fiber_clock())
     end
-    local res, err = transport.perform_request(timeout, buffer, method, ...)
+    local res, err = transport.perform_request(timeout, buffer, method,
+                                               on_push, on_push_ctx, ...)
     if err then
         box.error(err)
     end
@@ -1159,10 +1209,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, 'eval', loader, {line})
+        res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
+        res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
     end
     if err then
         box.error(err)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index b3f81a86f..9fc7ea3df 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -377,6 +377,18 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
 	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
 }
 
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version)
+{
+	char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+	iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version,
+			     obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+	struct iproto_body_bin body = iproto_body_bin;
+	body.v_data_len = mp_bswap_u32(1);
+	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
+}
+
 int
 xrow_decode_dml(struct xrow_header *row, struct request *request,
 		uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b10bf26d5..1bb5f103b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -411,6 +411,18 @@ int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 		   uint32_t schema_version);
 
+/**
+ * Write an IPROTO_CHUNK header from a specified position in a
+ * buffer.
+ * @param buf Buffer to write to.
+ * @param svp Position to write from.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ */
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version);
+
 /** Write error directly to a socket. */
 void
 iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
diff --git a/src/fiber.h b/src/fiber.h
index 8231bba24..eb89c48cd 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,8 +105,13 @@ enum fiber_key {
 	/** User global privilege and authentication token */
 	FIBER_KEY_USER = 3,
 	FIBER_KEY_MSG = 4,
-	/** Storage for lua stack */
+	/**
+	 * The storage cell number 5 is shared between lua stack
+	 * for fibers created from Lua, and IProto sync for fibers
+	 * created to execute a binary request.
+	 */
 	FIBER_KEY_LUA_STACK = 5,
+	FIBER_KEY_SYNC      = 5,
 	FIBER_KEY_MAX = 6
 };
 
@@ -610,6 +615,13 @@ fiber_get_key(struct fiber *fiber, enum fiber_key key)
 	return fiber->fls[key];
 }
 
+static inline uint64_t
+fiber_sync(struct fiber *fiber)
+{
+	uint64_t *sync = (uint64_t *) fiber_get_key(fiber, FIBER_KEY_SYNC);
+	return sync != NULL ? *sync : 0;
+}
+
 /**
  * Finalizer callback
  * \sa fiber_key_on_gc()
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 1674c27e1..1a9758cd2 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
@@ -2365,7 +2365,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 ---
 ...
 c.state
@@ -2939,7 +2939,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
 ---
 ...
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 ---
 ...
 while not c:is_connected() do fiber.sleep(0.01) end
@@ -3053,7 +3053,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', 'long', {}) c._transport.perform_request(nil, nil, 'inject', '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index c34616aec..6a36c812d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 c.state
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
@@ -1173,7 +1173,7 @@ finalize_long()
 --
 c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
 future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 while not c:is_connected() do fiber.sleep(0.01) end
 finalize_long()
 future:wait_result(100)
@@ -1222,7 +1222,7 @@ c:ping()
 -- new attempts to read any data - the connection is closed
 -- already.
 --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', 'long', {}) c._transport.perform_request(nil, nil, 'inject', '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 c:close()
 
diff --git a/test/box/push.result b/test/box/push.result
index 816f06e00..8645533b5 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -1,5 +1,8 @@
+test_run = require('test_run').new()
+---
+...
 --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
 --
 --
 -- Usage.
@@ -12,18 +15,29 @@ box.session.push(1, 2)
 ---
 - error: 'Usage: box.session.push(data)'
 ...
-ok = nil
+fiber = require('fiber')
 ---
 ...
-err = nil
+messages = {}
 ---
 ...
-function do_push() ok, err = box.session.push(1) end
+test_run:cmd("setopt delimiter ';'")
 ---
+- true
+...
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
 ...
---
--- Test binary protocol.
---
 netbox = require('net.box')
 ---
 ...
@@ -37,27 +51,261 @@ c:ping()
 ---
 - true
 ...
-c:call('do_push')
+c:call('do_pushes', {}, {on_push = table.insert, on_push_ctx = messages})
+---
+- 300
+...
+messages
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+---
+...
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+---
+...
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+---
+...
+messages = {}
+---
+...
+c:call('push_null', {}, {on_push = table.insert, on_push_ctx = messages})
+---
+...
+messages
+---
+- - null
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = table.insert, on_push_ctx = messages, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 8
+  - 8
+  - 8
+  - 8
+  - 8
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {48: [1]}
+  - {48: [2]}
+  - {48: [3]}
+  - {48: [4]}
+  - {48: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
+...
+--
+-- Test error in __serialize.
+--
+ok = nil
+---
+...
+err = nil
+---
+...
+messages = {}
+---
+...
+t = setmetatable({100}, {__serialize = function() error('err in ser') end})
+---
+...
+function do_push() ok, err = box.session.push(t) end
+---
+...
+c:call('do_push', {}, {on_push = table.insert, on_push_ctx = messages})
 ---
 ...
 ok, err
 ---
 - null
-- Session 'binary' does not support push()
+- '[string "t = setmetatable({100}, {__serialize = functi..."]:1: err in ser'
+...
+messages
+---
+- []
+...
+--
+-- Test push from a non-call request.
+--
+s:truncate()
+---
+...
+_ = s:on_replace(function() box.session.push('replace') end)
+---
+...
+c:reload_schema()
+---
+...
+c.space.test:replace({200}, {on_push = table.insert, on_push_ctx = messages})
+---
+- [200]
+...
+messages
+---
+- - replace
+...
+s:select{}
+---
+- - [200]
 ...
 c:close()
 ---
 ...
+s:drop()
+---
+...
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
 --
 -- Ensure can not push in background.
 --
-fiber = require('fiber')
----
-...
-f = fiber.create(do_push)
+f = fiber.create(function() ok, err = box.session.push(100) end)
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index a59fe0a4c..da0634157 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -1,5 +1,6 @@
+test_run = require('test_run').new()
 --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
 --
 
 --
@@ -8,28 +9,155 @@
 box.session.push()
 box.session.push(1, 2)
 
-ok = nil
-err = nil
-function do_push() ok, err = box.session.push(1) end
+fiber = require('fiber')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+test_run:cmd("setopt delimiter ''");
 
---
--- Test binary protocol.
---
 netbox = require('net.box')
 box.schema.user.grant('guest', 'read,write,execute', 'universe')
 
 c = netbox.connect(box.cfg.listen)
 c:ping()
-c:call('do_push')
+c:call('do_pushes', {}, {on_push = table.insert, on_push_ctx = messages})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+messages = {}
+c:call('push_null', {}, {on_push = table.insert, on_push_ctx = messages})
+messages
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = table.insert, on_push_ctx = messages, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
+--
+-- Test error in __serialize.
+--
+ok = nil
+err = nil
+messages = {}
+t = setmetatable({100}, {__serialize = function() error('err in ser') end})
+function do_push() ok, err = box.session.push(t) end
+c:call('do_push', {}, {on_push = table.insert, on_push_ctx = messages})
 ok, err
+messages
+
+--
+-- Test push from a non-call request.
+--
+s:truncate()
+_ = s:on_replace(function() box.session.push('replace') end)
+c:reload_schema()
+c.space.test:replace({200}, {on_push = table.insert, on_push_ctx = messages})
+messages
+s:select{}
+
 c:close()
+s:drop()
 
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 
 --
 -- Ensure can not push in background.
 --
-fiber = require('fiber')
-f = fiber.create(do_push)
+f = fiber.create(function() ok, err = box.session.push(100) end)
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 ok, err
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects
  2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
                   ` (2 preceding siblings ...)
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push Vladislav Shpilevoy
@ 2018-06-01 20:55 ` Vladislav Shpilevoy
  2018-06-07 12:56   ` [tarantool-patches] " Konstantin Osipov
  3 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-01 20:55 UTC (permalink / raw)
  To: tarantool-patches; +Cc: kostja

Netbox has two major ways to execute a request: sync and async.
During execution of any a server can send multiplie responses via
IPROTO_CHUNK. And the execution ways differ in how to handle the
chunks (called messages or pushes).

For a sync request a one can specify on_push callback and its
on_push_ctx argument called on each message.

When a request is async a user has a future object only, and can
not specify any callbacks. To get the pushed messages a one must
iterate over future object like this:
for i, message in future:pairs(one_iteration_timeout) do
...
end
Or ignore messages just calling future:wait_result(). Anyway
messages are not deleted, so a one can iterate over future object
again and again.

Follow up #2677
---
 src/box/lua/net_box.lua |  84 +++++++++++++++++++++
 test/box/push.result    | 191 +++++++++++++++++++++++++++++++++++++++++++++++-
 test/box/push.test.lua  |  79 +++++++++++++++++++-
 3 files changed, 349 insertions(+), 5 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index fe113e740..5d896f7e3 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -272,6 +272,90 @@ local function create_transport(host, port, user, password, callback,
         end
     end
     --
+    -- Get the next message or the final result.
+    -- @param iterator Iterator object.
+    -- @param i Index to get a next message from.
+    --
+    -- @retval nil, nil The request is finished.
+    -- @retval i + 1, object A message/response and its index.
+    -- @retval box.NULL, error An error occured. When this
+    --         function is called in 'for k, v in future:pairs()',
+    --         `k` becomes box.NULL, and `v` becomes error object.
+    --         If a one want to stop the cycle, he can do break.
+    --         With no break the cycle will be continued until
+    --         the request is finished. The iteration continuation
+    --         is useful for example when time is out during a
+    --         next message waiting, but a one does not consider
+    --         this error be critical.
+    --         On error the key becomes exactly box.NULL instead
+    --         of nil, because nil is treated by Lua as iteration
+    --         end marker. Nil does not participate in iteration,
+    --         and does not allow to continue it.
+    --
+    local function request_iterator_next(iterator, i)
+        if i == box.NULL then
+            -- If a user continues iteration after an error -
+            -- restore position.
+            if not iterator.next_i then
+                return nil, nil
+            end
+            i = iterator.next_i
+            iterator.next_i = nil
+        else
+            i = i + 1
+        end
+        local request = iterator.request
+        local messages = request.on_push_ctx
+    ::retry::
+        if i <= #messages then
+            return i, messages[i]
+        end
+        if request:is_ready() then
+            -- After all the messages are iterated, `i` is equal
+            -- to #messages + 1. After response reading `i`
+            -- becomes #messages + 2. It is the trigger to finish
+            -- the iteration.
+            if i > #messages + 1 then
+                return nil, nil
+            end
+            local response, err = request:result()
+            if err then
+                return box.NULL, err
+            end
+            return i, response
+        end
+        local old_message_count = #messages
+        local timeout = iterator.timeout
+        repeat
+            local ts = fiber_clock()
+            request.cond:wait(timeout)
+            timeout = timeout - (fiber_clock() - ts)
+            if request:is_ready() or old_message_count ~= #messages then
+                goto retry
+            end
+        until timeout <= 0
+        iterator.next_i = i
+        return box.NULL, box.error.new(E_TIMEOUT)
+    end
+    --
+    -- Iterate over all messages, received by a request. @Sa
+    -- request_iterator_next for details what to expect in `for`
+    -- key/value pairs.
+    -- @param timeout One iteration timeout.
+    -- @retval next() callback, iterator, zero key.
+    --
+    function request_index:pairs(timeout)
+        if timeout then
+            if type(timeout) ~= 'number' or timeout < 0 then
+                error('Usage: future:pairs(timeout)')
+            end
+        else
+            timeout = TIMEOUT_INFINITY
+        end
+        local iterator = {request = self, timeout = timeout}
+        return request_iterator_next, iterator, 0
+    end
+    --
     -- Wait for a response or error max timeout seconds.
     -- @param timeout Max seconds to wait.
     -- @retval result, nil Success, the response is returned.
diff --git a/test/box/push.result b/test/box/push.result
index 8645533b5..0816c6754 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -299,9 +299,6 @@ c:close()
 s:drop()
 ---
 ...
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
----
-...
 --
 -- Ensure can not push in background.
 --
@@ -316,3 +313,191 @@ ok, err
 - null
 - Session 'background' does not support push()
 ...
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+---
+...
+cond = fiber.cond()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i + 100)
+        cond:wait()
+    end
+    return true
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+---
+...
+assert(not ok)
+---
+- true
+...
+assert(err:find('use future:pairs()') ~= nil)
+---
+- true
+...
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+---
+...
+future:result()
+---
+- [true]
+...
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 101
+  - 102
+  - 103
+  - 104
+  - 105
+  - [true]
+...
+keys
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+  - 6
+...
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+---
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+err_count = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i, message in future:pairs(0.01) do
+    if i == nil then
+        err_count = err_count + 1
+        assert(message.code == box.error.TIMEOUT)
+        if err_count % 2 == 0 then
+            cond:signal()
+        end
+    else
+        table.insert(messages, message)
+        table.insert(keys, i)
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+---
+- - 101
+  - 102
+  - 103
+  - 104
+  - 105
+  - [true]
+...
+keys
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+  - 6
+...
+err_count
+---
+- 10
+...
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1}
+---
+- [1]
+...
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+---
+...
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+---
+...
+future:wait_result(1000)
+---
+- null
+- Duplicate key exists in unique index 'pk' in space 'test'
+...
+messages = {}
+---
+...
+keys = {}
+---
+...
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+---
+...
+messages
+---
+- - 100
+  - Duplicate key exists in unique index 'pk' in space 'test'
+...
+keys
+---
+- - 1
+  - null
+...
+s:drop()
+---
+...
+c:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index da0634157..10bc201df 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -153,11 +153,86 @@ s:select{}
 c:close()
 s:drop()
 
-box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-
 --
 -- Ensure can not push in background.
 --
 f = fiber.create(function() ok, err = box.session.push(100) end)
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 ok, err
+
+--
+-- Async iterable pushes.
+--
+c = netbox.connect(box.cfg.listen)
+cond = fiber.cond()
+test_run:cmd("setopt delimiter ';'")
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i + 100)
+        cond:wait()
+    end
+    return true
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Can not combine callback and async mode.
+ok, err = pcall(c.call, c, 'do_pushes', {}, {is_async = true, on_push = function() end})
+assert(not ok)
+assert(err:find('use future:pairs()') ~= nil)
+future = c:call('do_pushes', {}, {is_async = true})
+-- Try to ignore pushes.
+while not future:wait_result(0.01) do cond:signal() end
+future:result()
+
+-- Even if pushes are ignored, they still are available via pairs.
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+-- Test timeouts inside `for`. Even if a timeout is got, a user
+-- can continue iteration making as many attempts to get a message
+-- as he wants.
+future = c:call('do_pushes', {}, {is_async = true})
+messages = {}
+keys = {}
+err_count = 0
+test_run:cmd("setopt delimiter ';'")
+for i, message in future:pairs(0.01) do
+    if i == nil then
+        err_count = err_count + 1
+        assert(message.code == box.error.TIMEOUT)
+        if err_count % 2 == 0 then
+            cond:signal()
+        end
+    else
+        table.insert(messages, message)
+        table.insert(keys, i)
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+-- Messages and keys are got in the correct order and with no
+-- duplicates regardless of big timeout count.
+messages
+keys
+err_count
+
+-- Test non-timeout error.
+s = box.schema.create_space('test')
+pk = s:create_index('pk')
+s:replace{1}
+
+function do_push_and_duplicate() box.session.push(100) s:insert{1} end
+future = c:call('do_push_and_duplicate', {}, {is_async = true})
+future:wait_result(1000)
+messages = {}
+keys = {}
+for i, message in future:pairs() do table.insert(messages, message) table.insert(keys, i) end
+messages
+keys
+
+s:drop()
+c:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
-- 
2.15.1 (Apple Git-101)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 3/4] session: introduce binary box.session.push
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push Vladislav Shpilevoy
@ 2018-06-07 12:53   ` Konstantin Osipov
  2018-06-07 17:02     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 12+ messages in thread
From: Konstantin Osipov @ 2018-06-07 12:53 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

Hi,

My code review for this patch is on branch
box-session-push-kostja.

What still needs to be done:

- convert push.test to box-tap
- add a second argument to box.session.push() - which is reply
  sync, and test that it works. If the sync is not provided,
  session sync should be used.

There is follow up work, once this part of the patch is done:

- investigate whether we can use kharon to get rid discard_input messages
- think about alternatives which would preserve sync in a Lua
  routine. Looks like using an upvalue would do
  https://www.lua.org/pil/27.3.3.html

As a side note, this patch could well be split into 3:
- tx_fiber_init() refactoring
- kharon, and push implementation
- fiber_sync() and connection-multiplexing-safe implementation of
  box.session.sync()
 
Thanks,

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/02 02:09]:

> Closes #2677
> ---
>  src/box/iproto.cc          | 241 +++++++++++++++++++++++++++++++--------
>  src/box/iproto_constants.h |   3 +
>  src/box/lua/net_box.lua    | 104 ++++++++++++-----
>  src/box/xrow.c             |  12 ++
>  src/box/xrow.h             |  12 ++
>  src/fiber.h                |  14 ++-
>  test/box/net.box.result    |   8 +-
>  test/box/net.box.test.lua  |   8 +-
>  test/box/push.result       | 274 ++++++++++++++++++++++++++++++++++++++++++---
>  test/box/push.test.lua     | 148 ++++++++++++++++++++++--
>  10 files changed, 719 insertions(+), 105 deletions(-)
> 

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 4/4] netbox: introduce iterable future objects
  2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects Vladislav Shpilevoy
@ 2018-06-07 12:56   ` Konstantin Osipov
  2018-06-07 17:02     ` Vladislav Shpilevoy
  0 siblings, 1 reply; 12+ messages in thread
From: Konstantin Osipov @ 2018-06-07 12:56 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/02 02:09]:
> Netbox has two major ways to execute a request: sync and async.
> During execution of any a server can send multiplie responses via
> IPROTO_CHUNK. And the execution ways differ in how to handle the
> chunks (called messages or pushes).

Please don't use asserts. They are hell to debug. Please use
tap suite if you need to check things, or simple output the result
you expect.

The patch itself looks good to me. Did you send a message to
docbot about the new API?

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 4/4] netbox: introduce iterable future objects
  2018-06-07 12:56   ` [tarantool-patches] " Konstantin Osipov
@ 2018-06-07 17:02     ` Vladislav Shpilevoy
  2018-06-08  3:52       ` Konstantin Osipov
  0 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-07 17:02 UTC (permalink / raw)
  To: tarantool-patches, Konstantin Osipov



On 07/06/2018 15:56, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/02 02:09]:
>> Netbox has two major ways to execute a request: sync and async.
>> During execution of any a server can send multiplie responses via
>> IPROTO_CHUNK. And the execution ways differ in how to handle the
>> chunks (called messages or pushes).
> 
> Please don't use asserts. They are hell to debug. Please use
> tap suite if you need to check things, or simple output the result
> you expect.

I have removed assertions and use output checking.

> 
> The patch itself looks good to me. Did you send a message to
> docbot about the new API?
> 

Yes, I have already created docbot request. But I do not agree with
the API, when push by default is taken from session.sync. If you want
make the sync be optional, we must either

* make it mandatory, fix box.session.sync()/introduce fiber_sync()/something
   else, then allow to omit sync;

Or

* fix sync and then commit box.session.push with optional sync. Or with no
   sync, and allow it when somebody asks.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 3/4] session: introduce binary box.session.push
  2018-06-07 12:53   ` [tarantool-patches] " Konstantin Osipov
@ 2018-06-07 17:02     ` Vladislav Shpilevoy
  2018-06-08  3:51       ` Konstantin Osipov
  0 siblings, 1 reply; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-07 17:02 UTC (permalink / raw)
  To: tarantool-patches, Konstantin Osipov



On 07/06/2018 15:53, Konstantin Osipov wrote:
> Hi,
> 
> My code review for this patch is on branch
> box-session-push-kostja.
> 
> What still needs to be done:
> 
> - convert push.test to box-tap

I have removed assertions and left the test be output checking.
Push test does many table members checking, and it is simpler to
just output the tables.

Assertions were in a single place were a table contains 400 members.
But ok, now I did it like in vinyl/select_consistency test.

> - think about alternatives which would preserve sync in a Lua
>    routine. Looks like using an upvalue would do
>    https://www.lua.org/pil/27.3.3.html

I had investigated it before you have recommended and before I
started the patch. But ok, I have investigated it again, and
wrote the simple patch, that shows upvalues to be unusable as
sync storage. See the diff below and the explanation under the diff.

+++ b/src/box/lua/call.c
@@ -434,8 +434,9 @@ box_process_lua(struct call_request *request, struct port *base,
-	lua_pushcfunction(L, handler);
+	uint64_t sync = request->header->sync;
+	lua_pushinteger(L, sync);
+	lua_pushcclosure(L, handler, 1);
+++ b/src/box/lua/session.c
@@ -370,6 +370,9 @@ lbox_session_push(struct lua_State *L)
+	uint64_t sync = lua_tointeger(L, lua_upvalueindex(1));
+	uint64_t f_sync = fiber_sync(fiber());
+	say_info("%llu, %llu", (unsigned long long) sync, f_sync);
  	if (session_push(current_session(), &port) != 0) {

After running push.test.lua I found say_info() printing always
0 + real sync. So the upvalue is actually nil here.

C upvalues are not the same as Lua upvalues. C upvalue can be
associated with a single function only, while the Lua ones can
be referenced in multiple and enclosed functions. C upvalue
can be accessed only from the function owning it.

In the diff above I have pushed sync as an upvalue and tried to
get it in lbox_session_push function. But the upvalue was not
available. Lets look at the implementation of
lua_tointeger(L, lua_upvalueindex(1)).

lua_upvalueindex(idx):

     #define LUA_GLOBALSINDEX	(-10002)
     #define lua_upvalueindex(i)	(LUA_GLOBALSINDEX-(i))

Nothing is interesting. Upvalue index is just an index.

lua_tointeger(L, idx):

     LUA_API lua_Integer lua_tointeger(lua_State *L, int idx)
     {
         cTValue *o = index2adr(L, idx);
         ...

index2addr(L, idx):

     ....
     } else {
         GCfunc *fn = curr_func(L);
         api_check(L, fn->c.gct == ~LJ_TFUNC && !isluafunc(fn));
         if (idx == LUA_ENVIRONINDEX) {
           TValue *o = &G(L)->tmptv;
           settabV(L, o, tabref(fn->c.env));
           return o;
         } else {
           idx = LUA_GLOBALSINDEX - idx;
           return idx <= fn->c.nupvalues ? &fn->c.upvalue[idx-1] : niltv(L);
         }
     }

As you see, upvalue array is got from curr_func(L).
lbox_session_push != C closure I had pushed earlier. And it has no
upvalues.

Summary: C upvalues can not be used as a sync storage.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 3/4] session: introduce binary box.session.push
  2018-06-07 17:02     ` Vladislav Shpilevoy
@ 2018-06-08  3:51       ` Konstantin Osipov
  0 siblings, 0 replies; 12+ messages in thread
From: Konstantin Osipov @ 2018-06-08  3:51 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/07 23:59]:

> > My code review for this patch is on branch
> > box-session-push-kostja.
> > 
> > What still needs to be done:
> > 
> > - convert push.test to box-tap
> 
> I have removed assertions and left the test be output checking.
> Push test does many table members checking, and it is simpler to
> just output the tables.
> 
> Assertions were in a single place were a table contains 400 members.
> But ok, now I did it like in vinyl/select_consistency test.
> 
> > - think about alternatives which would preserve sync in a Lua
> >    routine. Looks like using an upvalue would do
> >    https://www.lua.org/pil/27.3.3.html
> 
> I had investigated it before you have recommended and before I
> started the patch. But ok, I have investigated it again, and
> wrote the simple patch, that shows upvalues to be unusable as
> sync storage. See the diff below and the explanation under the diff.

Vlad, I need to push the patch first and investigate this problem
second. I understand somebody wanted to not bother with the sync
in the ticket, but it is not a priority for me. 

Please revert fiber_sync() changes asap and update the tests, so
that I can push the patch. We can talk about fixing
box.session.sync() later. My problem with the patch is that you
make a partial fix (box.session.sync() is still broken) and use
fiber local storage for something that has little to do with a
fiber.

I'm sure there are acceptable ways to fix box.session.sync(). 
I'll be happy to discuss them once we've done with the current
patch.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 4/4] netbox: introduce iterable future objects
  2018-06-07 17:02     ` Vladislav Shpilevoy
@ 2018-06-08  3:52       ` Konstantin Osipov
  2018-06-08 14:20         ` Vladislav Shpilevoy
  0 siblings, 1 reply; 12+ messages in thread
From: Konstantin Osipov @ 2018-06-08  3:52 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

* Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/07 23:59]:
> On 07/06/2018 15:56, Konstantin Osipov wrote:
> > * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/02 02:09]:
> > > Netbox has two major ways to execute a request: sync and async.
> > > During execution of any a server can send multiplie responses via
> > > IPROTO_CHUNK. And the execution ways differ in how to handle the
> > > chunks (called messages or pushes).
> > 
> > Please don't use asserts. They are hell to debug. Please use
> > tap suite if you need to check things, or simple output the result
> > you expect.
> 
> I have removed assertions and use output checking.
> 
> > 
> > The patch itself looks good to me. Did you send a message to
> > docbot about the new API?
> > 
> 
> Yes, I have already created docbot request. But I do not agree with
> the API, when push by default is taken from session.sync. If you want
> make the sync be optional, we must either
> 
> * make it mandatory, fix box.session.sync()/introduce fiber_sync()/something
>   else, then allow to omit sync;
> 
> Or
> 
> * fix sync and then commit box.session.push with optional sync. Or with no
>   sync, and allow it when somebody asks.

Vlad, thank you for voicing your disagreement. Please prepare a
patch which makes box.session.push() second argument optional.
Thanks.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v3 4/4] netbox: introduce iterable future objects
  2018-06-08  3:52       ` Konstantin Osipov
@ 2018-06-08 14:20         ` Vladislav Shpilevoy
  0 siblings, 0 replies; 12+ messages in thread
From: Vladislav Shpilevoy @ 2018-06-08 14:20 UTC (permalink / raw)
  To: tarantool-patches, Konstantin Osipov



On 08/06/2018 06:52, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/07 23:59]:
>> On 07/06/2018 15:56, Konstantin Osipov wrote:
>>> * Vladislav Shpilevoy <v.shpilevoy@tarantool.org> [18/06/02 02:09]:
>>>> Netbox has two major ways to execute a request: sync and async.
>>>> During execution of any a server can send multiplie responses via
>>>> IPROTO_CHUNK. And the execution ways differ in how to handle the
>>>> chunks (called messages or pushes).
>>>
>>> Please don't use asserts. They are hell to debug. Please use
>>> tap suite if you need to check things, or simple output the result
>>> you expect.
>>
>> I have removed assertions and use output checking.
>>
>>>
>>> The patch itself looks good to me. Did you send a message to
>>> docbot about the new API?
>>>
>>
>> Yes, I have already created docbot request. But I do not agree with
>> the API, when push by default is taken from session.sync. If you want
>> make the sync be optional, we must either
>>
>> * make it mandatory, fix box.session.sync()/introduce fiber_sync()/something
>>    else, then allow to omit sync;
>>
>> Or
>>
>> * fix sync and then commit box.session.push with optional sync. Or with no
>>    sync, and allow it when somebody asks.
> 
> Vlad, thank you for voicing your disagreement. Please prepare a
> patch which makes box.session.push() second argument optional.
> Thanks.
> 

Done.

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2018-06-08 14:20 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-06-01 20:55 [tarantool-patches] [PATCH v3 0/4] box.session.push Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 2/4] session: enable box.session.push in local console Vladislav Shpilevoy
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 3/4] session: introduce binary box.session.push Vladislav Shpilevoy
2018-06-07 12:53   ` [tarantool-patches] " Konstantin Osipov
2018-06-07 17:02     ` Vladislav Shpilevoy
2018-06-08  3:51       ` Konstantin Osipov
2018-06-01 20:55 ` [tarantool-patches] [PATCH v3 4/4] netbox: introduce iterable future objects Vladislav Shpilevoy
2018-06-07 12:56   ` [tarantool-patches] " Konstantin Osipov
2018-06-07 17:02     ` Vladislav Shpilevoy
2018-06-08  3:52       ` Konstantin Osipov
2018-06-08 14:20         ` Vladislav Shpilevoy

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox