[tarantool-patches] [PATCH v2 10/10] session: introduce binary box.session.push

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 20 16:24:26 MSK 2018


Box.session.push() allows to send a message to a client with no
finishing a main request.

Tarantool after this patch supports pushes over binary protocol.

IProto message is encoded using a new header code - IPROTO_CHUNK.
TX thread to notify IProto thread about new data in obuf sends
a message 'push_msg'. IProto thread, got this message, notifies
libev about new data, and then sends 'push_msg' back with
updated write position. TX thread, received the message back,
updates its version of a write position. If IProto would not send
a write position, then TX would write to the same obuf again and
again, because it can not know that IProto already flushed
another obuf.

To avoid multiple 'push_msg' in fly between IProto and TX, the
only one 'push_msg' per connection is used. To deliver pushes,
appeared when 'push_msg' was in fly, TX thread sets a flag every
time when sees, that 'push_msg' is sent, and there is a new push.
When 'push_msg' returns, it checks this flag, and if it is set,
the IProto is notified again.

Closes #2677
---
 src/box/iproto.cc          | 221 +++++++++++++++++++++++++++++++++++--------
 src/box/iproto_constants.h |   3 +
 src/box/lua/net_box.lua    |  50 ++++++----
 src/box/lua/session.c      |  33 ++++++-
 src/box/xrow.c             |  13 +++
 src/box/xrow.h             |  12 +++
 src/fiber.h                |  14 ++-
 test/box/net.box.result    |   4 +-
 test/box/net.box.test.lua  |   4 +-
 test/box/push.result       | 231 ++++++++++++++++++++++++++++++++++++++++++---
 test/box/push.test.lua     | 134 +++++++++++++++++++++++---
 11 files changed, 635 insertions(+), 84 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 38baf1b8d..78a551039 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -67,6 +67,41 @@ enum { IPROTO_MSG_MAX = 768 };
 
 enum { IPROTO_SALT_SIZE = 32 };
 
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+	struct obuf *obuf;
+	struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+	wpos->obuf = out;
+	wpos->svp = obuf_create_svp(out);
+}
+
+/**
+ * Message to notify IProto thread about new data in an output
+ * buffer. Struct iproto_msg is not used here, because push
+ * notification can be much more compact: it does not have
+ * request, ibuf, length, flags ...
+ */
+struct iproto_push_msg {
+	struct cmsg base;
+	/**
+	 * Before sending to IProto thread, the wpos is set to a
+	 * current position in an output buffer. Before IProto
+	 * returns the message to TX, it sets wpos to the last
+	 * flushed position (works like iproto_msg.wpos).
+	 */
+	struct iproto_wpos wpos;
+};
+
 /**
  * Network readahead. A signed integer to avoid
  * automatic type coercion to an unsigned type.
@@ -111,24 +146,6 @@ iproto_reset_input(struct ibuf *ibuf)
 	}
 }
 
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
-	struct obuf *obuf;
-	struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
-	wpos->obuf = out;
-	wpos->svp = obuf_create_svp(out);
-}
-
 /* {{{ iproto_msg - declaration */
 
 /**
@@ -367,6 +384,37 @@ struct iproto_connection
 		/** Pointer to the current output buffer. */
 		struct obuf *p_obuf;
 	} tx;
+	/**
+	 * Is_push_in_progress is set, when a push_msg is sent to
+	 * IProto thread, and reset, when the message is returned
+	 * to TX. If a new push sees, that a push_msg is already
+	 * sent to IProto, then has_new_pushes is set. After push
+	 * notification is returned to TX, it checks
+	 * has_new_pushes. If it is set, then the notification is
+	 * sent again. This ping-pong continues, until TX stopped
+	 * pushing. It allows to
+	 * 1) avoid multiple push_msg from one session in fly,
+	 * 2) do not block push() until a previous push() is
+	 *    finished.
+	 *
+	 *      IProto                           TX
+	 * -------------------------------------------------------
+	 *                                 + [push message]
+	 *    start socket <--- notification ----
+	 *       write
+	 *                                 + [push message]
+	 *                                 + [push message]
+	 *                                       ...
+	 *      end socket
+	 *        write    ----------------> check for new
+	 *                                   pushes - found
+	 *                 <--- notification ---
+	 *                      ....
+	 */
+	bool has_new_pushes;
+	bool is_push_in_progress;
+	/** Push notification for IProto thread. */
+	struct iproto_push_msg push_msg;
 	/** Authentication salt. */
 	char salt[IPROTO_SALT_SIZE];
 };
@@ -806,6 +854,7 @@ iproto_connection_new(int fd)
 {
 	struct iproto_connection *con = (struct iproto_connection *)
 		mempool_alloc_xc(&iproto_connection_pool);
+	memset(con, 0, sizeof(*con));
 	con->input.data = con->output.data = con;
 	con->loop = loop();
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -818,9 +867,6 @@ iproto_connection_new(int fd)
 	con->tx.p_obuf = &con->obuf[0];
 	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
 	iproto_wpos_create(&con->wend, con->tx.p_obuf);
-	con->parse_size = 0;
-	con->long_poll_requests = 0;
-	con->session = NULL;
 	rlist_create(&con->in_stop_list);
 	/* It may be very awkward to allocate at close. */
 	con->disconnect = iproto_msg_new(con);
@@ -1010,9 +1056,9 @@ error:
 }
 
 static void
-tx_fiber_init(struct session *session, uint64_t sync)
+tx_fiber_init(struct session *session, uint64_t *sync)
 {
-	session->meta.sync = sync;
+	session->meta.sync = sync != NULL ? *sync : 0;
 	/*
 	 * We do not cleanup fiber keys at the end of each request.
 	 * This does not lead to privilege escalation as long as
@@ -1025,6 +1071,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
 	 */
 	fiber_set_session(fiber(), session);
 	fiber_set_user(fiber(), &session->credentials);
+	fiber_set_key(fiber(), FIBER_KEY_SYNC, (void *) sync);
 }
 
 /**
@@ -1038,7 +1085,7 @@ tx_process_disconnect(struct cmsg *m)
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	struct iproto_connection *con = msg->connection;
 	if (con->session) {
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
 		if (! rlist_empty(&session_on_disconnect))
 			session_run_on_disconnect_triggers(con->session);
 		session_destroy(con->session);
@@ -1108,15 +1155,16 @@ tx_discard_input(struct iproto_msg *msg)
  *   not, the empty buffer is selected.
  * - if neither of the buffers are empty, the function
  *   does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ *        thread.
  */
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	struct iproto_connection *con = msg->connection;
-
 	struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
-	if (msg->wpos.obuf == con->tx.p_obuf) {
+	if (wpos->obuf == con->tx.p_obuf) {
 		/*
 		 * We got a message advancing the buffer which
 		 * is being appended to. The previous buffer is
@@ -1134,6 +1182,13 @@ tx_accept_msg(struct cmsg *m)
 		 */
 		con->tx.p_obuf = prev;
 	}
+}
+
+static inline struct iproto_msg *
+tx_accept_msg(struct cmsg *m)
+{
+	struct iproto_msg *msg = (struct iproto_msg *) m;
+	tx_accept_wpos(msg->connection, &msg->wpos);
 	return msg;
 }
 
@@ -1179,7 +1234,7 @@ tx_process1(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	tx_fiber_init(msg->connection->session, &msg->header.sync);
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1213,7 +1268,7 @@ tx_process_select(struct cmsg *m)
 	int rc;
 	struct request *req = &msg->dml;
 
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	tx_fiber_init(msg->connection->session, &msg->header.sync);
 
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
@@ -1263,7 +1318,7 @@ tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 
-	tx_fiber_init(msg->connection->session, msg->header.sync);
+	tx_fiber_init(msg->connection->session, &msg->header.sync);
 
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
@@ -1347,7 +1402,7 @@ tx_process_misc(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
 
-	tx_fiber_init(con->session, msg->header.sync);
+	tx_fiber_init(con->session, &msg->header.sync);
 
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
@@ -1386,7 +1441,7 @@ tx_process_join_subscribe(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
 
-	tx_fiber_init(con->session, msg->header.sync);
+	tx_fiber_init(con->session, &msg->header.sync);
 
 	try {
 		switch (msg->header.type) {
@@ -1503,7 +1558,7 @@ tx_process_connect(struct cmsg *m)
 		if (con->session == NULL)
 			diag_raise();
 		con->session->meta.conn = con;
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
 		static __thread char greeting[IPROTO_GREETING_SIZE];
 		/* TODO: dirty read from tx thread */
 		struct tt_uuid uuid = INSTANCE_UUID;
@@ -1653,6 +1708,98 @@ iproto_session_sync(struct session *session)
 	return session->meta.sync;
 }
 
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param conn IProto connection.
+ */
+static void
+tx_begin_push(struct iproto_connection *conn);
+
+/**
+ * Create an event to send push.
+ * @param m IProto push message.
+ */
+static void
+net_push_msg(struct cmsg *m)
+{
+	struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+	struct iproto_connection *conn =
+		container_of(msg, struct iproto_connection, push_msg);
+	conn->wend = msg->wpos;
+	msg->wpos = conn->wpos;
+	if (evio_has_fd(&conn->output) && !ev_is_active(&conn->output))
+		ev_feed_event(conn->loop, &conn->output, EV_WRITE);
+}
+
+/**
+ * After a message notifies IProto thread about pushed data, TX
+ * thread can already have a new push in one of obufs. This
+ * function checks for new pushes and possibly re-sends push
+ * notification to IProto thread.
+ */
+static void
+tx_check_for_new_push(struct cmsg *m)
+{
+	struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+	struct iproto_connection *conn =
+		container_of(msg, struct iproto_connection, push_msg);
+	tx_accept_wpos(conn, &msg->wpos);
+	conn->is_push_in_progress = false;
+	if (conn->has_new_pushes)
+		tx_begin_push(conn);
+}
+
+static const struct cmsg_hop push_route[] = {
+	{ net_push_msg, &tx_pipe },
+	{ tx_check_for_new_push, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_connection *conn)
+{
+	assert(! conn->is_push_in_progress);
+	cmsg_init((struct cmsg *) &conn->push_msg, push_route);
+	iproto_wpos_create(&conn->push_msg.wpos, conn->tx.p_obuf);
+	conn->has_new_pushes = false;
+	conn->is_push_in_progress = true;
+	cpipe_push(&net_pipe, (struct cmsg *) &conn->push_msg);
+}
+
+/**
+ * Push a message from @a port to a remote client.
+ * @param session IProto session.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success, a message is wrote to an output buffer. But
+ *          it is not guaranteed, that it will be sent
+ *          successfully.
+ */
+static int
+iproto_session_push(struct session *session, struct port *port)
+{
+	struct iproto_connection *conn =
+		(struct iproto_connection *) session->meta.conn;
+	struct obuf_svp svp;
+	if (iproto_prepare_select(conn->tx.p_obuf, &svp) != 0)
+		return -1;
+	if (port_dump_msgpack(port, conn->tx.p_obuf) != 0) {
+		obuf_rollback_to_svp(conn->tx.p_obuf, &svp);
+		return -1;
+	}
+	iproto_reply_chunk(conn->tx.p_obuf, &svp, fiber_sync(fiber()),
+			   ::schema_version);
+	if (! conn->is_push_in_progress)
+		tx_begin_push(conn);
+	else
+		conn->has_new_pushes = true;
+	return 0;
+}
+
+/** }}} */
+
 /** Initialize the iproto subsystem and start network io thread */
 void
 iproto_init()
@@ -1666,7 +1813,7 @@ iproto_init()
 	cpipe_create(&net_pipe, "net");
 	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
 	struct session_vtab iproto_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ iproto_session_push,
 		/* .fd = */ iproto_session_fd,
 		/* .sync = */ iproto_session_sync,
 	};
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 92534ef6e..67514ca03 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -163,6 +163,9 @@ enum iproto_type {
 	/** Vinyl row index stored in .run file */
 	VY_RUN_ROW_INDEX = 102,
 
+	/** Non-final response type. */
+	IPROTO_CHUNK = 128,
+
 	/**
 	 * Error codes = (IPROTO_TYPE_ERROR | ER_XXX from errcode.h)
 	 */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 4ed2b375d..814358a2e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -40,6 +40,8 @@ local IPROTO_SCHEMA_VERSION_KEY = 0x05
 local IPROTO_DATA_KEY      = 0x30
 local IPROTO_ERROR_KEY     = 0x31
 local IPROTO_GREETING_SIZE = 128
+local IPROTO_CHUNK_KEY = 128
+local IPROTO_OK_KEY = 0
 
 -- select errors from box.error
 local E_UNKNOWN              = box.error.UNKNOWN
@@ -267,7 +269,8 @@ local function create_transport(host, port, user, password, callback,
     end
 
     -- REQUEST/RESPONSE --
-    local function perform_request(timeout, buffer, method, schema_version, ...)
+    local function perform_request(timeout, buffer, method, on_push,
+                                   schema_version, ...)
         if state ~= 'active' then
             return last_errno or E_NO_CONNECTION, last_error
         end
@@ -280,11 +283,12 @@ local function create_transport(host, port, user, password, callback,
         local id = next_request_id
         method_codec[method](send_buf, id, schema_version, ...)
         next_request_id = next_id(id)
-        local request = table_new(0, 6) -- reserve space for 6 keys
+        local request = table_new(0, 7) -- reserve space for 7 keys
         request.client = fiber_self()
         request.method = method
         request.schema_version = schema_version
         request.buffer = buffer
+        request.on_push = on_push
         requests[id] = request
         repeat
             local timeout = max(0, deadline - fiber_clock())
@@ -308,12 +312,12 @@ local function create_transport(host, port, user, password, callback,
         if request == nil then -- nobody is waiting for the response
             return
         end
-        requests[id] = nil
         local status = hdr[IPROTO_STATUS_KEY]
         local body, body_end_check
 
-        if status ~= 0 then
+        if status > IPROTO_CHUNK_KEY then
             -- Handle errors
+            requests[id] = nil
             body, body_end_check = decode(body_rpos)
             assert(body_end == body_end_check, "invalid xrow length")
             request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -328,16 +332,27 @@ local function create_transport(host, port, user, password, callback,
             local body_len = body_end - body_rpos
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
-            request.response = tonumber(body_len)
-            wakeup_client(request.client)
-            return
+            if status == IPROTO_OK_KEY then
+                request.response = tonumber(body_len)
+                requests[id] = nil
+                wakeup_client(request.client)
+            elseif request.on_push then
+                assert(status == IPROTO_CHUNK_KEY)
+                request.on_push(tonumber(body_len))
+            end
+        else
+            -- Decode xrow.body[DATA] to Lua objects
+            body, body_end_check = decode(body_rpos)
+            assert(body_end == body_end_check, "invalid xrow length")
+            if status == IPROTO_OK_KEY then
+                request.response = body[IPROTO_DATA_KEY]
+                requests[id] = nil
+                wakeup_client(request.client)
+            elseif request.on_push then
+                assert(status == IPROTO_CHUNK_KEY)
+                request.on_push(body[IPROTO_DATA_KEY][1])
+            end
         end
-
-        -- Decode xrow.body[DATA] to Lua objects
-        body, body_end_check = decode(body_rpos)
-        assert(body_end == body_end_check, "invalid xrow length")
-        request.response = body[IPROTO_DATA_KEY]
-        wakeup_client(request.client)
     end
 
     local function new_request_id()
@@ -834,7 +849,8 @@ function remote_methods:_request(method, opts, ...)
             timeout = deadline and max(0, deadline - fiber_clock())
         end
         err, res = perform_request(timeout, buffer, method,
-                                   self.schema_version, ...)
+                                   opts and opts.on_push, self.schema_version,
+                                   ...)
         if not err and buffer ~= nil then
             return res -- the length of xrow.body
         elseif not err then
@@ -864,7 +880,7 @@ function remote_methods:ping(opts)
         timeout = deadline and max(0, deadline - fiber_clock())
                             or (opts and opts.timeout)
     end
-    local err = self._transport.perform_request(timeout, nil, 'ping',
+    local err = self._transport.perform_request(timeout, nil, 'ping', nil,
                                                 self.schema_version)
     return not err or err == E_WRONG_SCHEMA_VERSION
 end
@@ -1045,10 +1061,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+        err, res = pr(timeout, nil, 'eval', nil, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+        err, res = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
     end
     if err then
         box.error({code = err, reason = res})
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 306271809..c3db93627 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
 #include "session.h"
 #include "lua/utils.h"
 #include "lua/trigger.h"
+#include "lua/msgpack.h"
 
 #include <lua.h>
 #include <lauxlib.h>
@@ -43,6 +44,7 @@
 #include "box/schema.h"
 #include "box/port.h"
 #include "box/lua/console.h"
+#include "small/obuf.h"
 
 static const char *sessionlib_name = "box.session";
 
@@ -371,8 +373,11 @@ struct lua_push_port {
 static const char *
 lua_push_port_dump_plain(struct port *port, uint32_t *size);
 
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf);
+
 static const struct port_vtab lua_push_port_vtab = {
-       .dump_msgpack = NULL,
+       .dump_msgpack = lua_push_port_dump_msgpack,
        /*
         * Dump_16 has no sense, since push appears since 1.10
         * protocol.
@@ -403,6 +408,32 @@ lua_push_port_dump_plain(struct port *port, uint32_t *size)
 	return result;
 }
 
+static void
+obuf_error_cb(void *ctx)
+{
+	*((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf)
+{
+	struct lua_push_port *lua_port = (struct lua_push_port *) port;
+	assert(lua_port->vtab == &lua_push_port_vtab);
+	struct mpstream stream;
+	int rc = 0;
+	/*
+	 * Do not use luamp_error to allow a caller to clear the
+	 * obuf, if it already has allocated something (for
+	 * example, iproto push reserves memory for a header).
+	 */
+	mpstream_init(&stream, obuf, obuf_reserve_cb, obuf_alloc_cb,
+		      obuf_error_cb, &rc);
+	luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+	if (rc == 0)
+		mpstream_flush(&stream);
+	return rc;
+}
+
 /**
  * Push a message using a protocol, depending on a session type.
  * @param data Data to push, first argument on a stack.
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f48525645..adb52deeb 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -373,6 +373,19 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
 	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
 }
 
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version)
+{
+	char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+	iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version,
+			     obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+	struct iproto_body_bin body = iproto_body_bin;
+	body.v_data_len = mp_bswap_u32(1);
+
+	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
+}
+
 int
 xrow_decode_dml(struct xrow_header *row, struct request *request,
 		uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index d407d151b..7fe1debbf 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -390,6 +390,18 @@ int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 		   uint32_t schema_version);
 
+/**
+ * Write an IPROTO_CHUNK header from a specified position in a
+ * buffer.
+ * @param buf Buffer to write to.
+ * @param svp Position to write from.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ */
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version);
+
 /** Write error directly to a socket. */
 void
 iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
diff --git a/src/fiber.h b/src/fiber.h
index 8231bba24..eb89c48cd 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,8 +105,13 @@ enum fiber_key {
 	/** User global privilege and authentication token */
 	FIBER_KEY_USER = 3,
 	FIBER_KEY_MSG = 4,
-	/** Storage for lua stack */
+	/**
+	 * The storage cell number 5 is shared between lua stack
+	 * for fibers created from Lua, and IProto sync for fibers
+	 * created to execute a binary request.
+	 */
 	FIBER_KEY_LUA_STACK = 5,
+	FIBER_KEY_SYNC      = 5,
 	FIBER_KEY_MAX = 6
 };
 
@@ -610,6 +615,13 @@ fiber_get_key(struct fiber *fiber, enum fiber_key key)
 	return fiber->fls[key];
 }
 
+static inline uint64_t
+fiber_sync(struct fiber *fiber)
+{
+	uint64_t *sync = (uint64_t *) fiber_get_key(fiber, FIBER_KEY_SYNC);
+	return sync != NULL ? *sync : 0;
+}
+
 /**
  * Finalizer callback
  * \sa fiber_key_on_gc()
diff --git a/test/box/net.box.result b/test/box/net.box.result
index cf7b27f0b..93561dc64 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
@@ -2363,7 +2363,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 ---
 ...
 c.state
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..b05e1f0be 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
 --
 -- Break a connection to test reconnect_after.
 --
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
 c.state
 while not c:is_connected() do fiber.sleep(0.01) end
 c:ping()
diff --git a/test/box/push.result b/test/box/push.result
index 816f06e00..91e2981ed 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -1,5 +1,8 @@
+test_run = require('test_run').new()
+---
+...
 --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
 --
 --
 -- Usage.
@@ -12,18 +15,34 @@ box.session.push(1, 2)
 ---
 - error: 'Usage: box.session.push(data)'
 ...
-ok = nil
+fiber = require('fiber')
 ---
 ...
-err = nil
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
 ---
+- true
 ...
-function do_push() ok, err = box.session.push(1) end
+function on_push(message)
+    table.insert(messages, message)
+end;
 ---
 ...
---
--- Test binary protocol.
---
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
 netbox = require('net.box')
 ---
 ...
@@ -37,27 +56,213 @@ c:ping()
 ---
 - true
 ...
-c:call('do_push')
+c:call('do_pushes', {}, {on_push = on_push})
 ---
+- 300
 ...
-ok, err
+messages
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+---
+...
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+---
+...
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
 ---
-- null
-- Session 'binary' does not support push()
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+---
+...
+messages = {}
+---
+...
+c:call('push_null', {}, {on_push = on_push})
+---
+...
+messages
+---
+- - null
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 8
+  - 8
+  - 8
+  - 8
+  - 8
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {48: [1]}
+  - {48: [2]}
+  - {48: [3]}
+  - {48: [4]}
+  - {48: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
 ...
 c:close()
 ---
 ...
+s:drop()
+---
+...
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 ---
 ...
 --
 -- Ensure can not push in background.
 --
-fiber = require('fiber')
+ok = nil
+---
+...
+err = nil
 ---
 ...
-f = fiber.create(do_push)
+f = fiber.create(function() ok, err = box.session.push(100) end)
 ---
 ...
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index a59fe0a4c..e5bd3287f 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -1,5 +1,6 @@
+test_run = require('test_run').new()
 --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
 --
 
 --
@@ -8,28 +9,139 @@
 box.session.push()
 box.session.push(1, 2)
 
-ok = nil
-err = nil
-function do_push() ok, err = box.session.push(1) end
+fiber = require('fiber')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function on_push(message)
+    table.insert(messages, message)
+end;
+
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+test_run:cmd("setopt delimiter ''");
 
---
--- Test binary protocol.
---
 netbox = require('net.box')
 box.schema.user.grant('guest', 'read,write,execute', 'universe')
 
 c = netbox.connect(box.cfg.listen)
 c:ping()
-c:call('do_push')
-ok, err
+c:call('do_pushes', {}, {on_push = on_push})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+messages = {}
+c:call('push_null', {}, {on_push = on_push})
+messages
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
 c:close()
+s:drop()
 
 box.schema.user.revoke('guest', 'read,write,execute', 'universe')
 
 --
 -- Ensure can not push in background.
 --
-fiber = require('fiber')
-f = fiber.create(do_push)
+ok = nil
+err = nil
+f = fiber.create(function() ok, err = box.session.push(100) end)
 while f:status() ~= 'dead' do fiber.sleep(0.01) end
 ok, err
-- 
2.15.1 (Apple Git-101)





More information about the Tarantool-patches mailing list