[tarantool-patches] Re: [PATCH v2 10/10] session: introduce binary box.session.push

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu May 24 23:50:46 MSK 2018



On 10/05/2018 22:50, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/04/20 16:25]:
>> Box.session.push() allows to send a message to a client with no
>> finishing a main request.
>>
>> Tarantool after this patch supports pushes over binary protocol.
>>
>> IProto message is encoded using a new header code - IPROTO_CHUNK.
>> TX thread to notify IProto thread about new data in obuf sends
>> a message 'push_msg'. IProto thread, got this message, notifies
>> libev about new data, and then sends 'push_msg' back with
>> updated write position. TX thread, received the message back,
>> updates its version of a write position. If IProto would not send
>> a write position, then TX would write to the same obuf again and
>> again, because it can not know that IProto already flushed
>> another obuf.
>>
>> To avoid multiple 'push_msg' in fly between IProto and TX, the
>> only one 'push_msg' per connection is used. To deliver pushes,
>> appeared when 'push_msg' was in fly, TX thread sets a flag every
>> time when sees, that 'push_msg' is sent, and there is a new push.
>> When 'push_msg' returns, it checks this flag, and if it is set,
>> the IProto is notified again.
> 
> I don't see any reason for this restriction.
> Any connection has two independent rotating output buffers
> of infinite size. If you ever want to block a push message, you
> should block it because both buffers are busy.

I do not block a user. We have discussed it verbally, but I
duplicate the explanation here for the record.

When a message is pushed, it is written to obuf, and either cmsg
is created or the flag is set, and the control is immediately
returned to the caller.

If the flag was not set on push, then at the end of event loop
iteration the cmsg with push notification is delivered to IProto thead,
where it trigger on_output event. Then the message is returned to TX.

If the flag was set on push, then push notification will be delivered to
IProto on the next event loop iteration.

> 
>> +	 * Is_push_in_progress is set, when a push_msg is sent to
>> +	 * IProto thread, and reset, when the message is returned
>> +	 * to TX. If a new push sees, that a push_msg is already
>> +	 * sent to IProto, then has_new_pushes is set. After push
>> +	 * notification is returned to TX, it checks
>> +	 * has_new_pushes. If it is set, then the notification is
>> +	 * sent again. This ping-pong continues, until TX stopped
>> +	 * pushing. It allows to
>> +	 * 1) avoid multiple push_msg from one session in fly,
>> +	 * 2) do not block push() until a previous push() is
>> +	 *    finished.
> 
> Please make it radically simpler, every push can create a new
> message which has an independent life cycle. Messages can never
> run one over each other, so you have nothing to worry about.

Same. Discussed verbally and you agreed, that you misunderstood me.
This way is ok, and moreover, it can be reused to get rid of two-end
routes that now deliver iproto_msg to TX thread and are kept there
until the request is finished.

> 
>> @@ -1038,7 +1085,7 @@ tx_process_disconnect(struct cmsg *m)
>>   	struct iproto_msg *msg = (struct iproto_msg *) m;
>>   	struct iproto_connection *con = msg->connection;
>>   	if (con->session) {
>> -		tx_fiber_init(con->session, 0);
>> +		tx_fiber_init(con->session, NULL);
> 
> Why do you need to make it more complex than it is now?
> 
> Every Lua procedure which makes a push takes a long-polling
> reference to the connection already. Until this procedure
> ends, you can't disconnect a connection.

I know, but I need sync of this request. Not the whole connection.
Session->sync is valid only until the next yield. After it can be
changed to sync of a new request. Here I reuse one of fiber local
storage fields, that it is never used in TX fiber pool fibers.

> 
>> +static void
>> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>>   {
>> -	struct iproto_msg *msg = (struct iproto_msg *) m;
>> -	struct iproto_connection *con = msg->connection;
>> -
>>   	struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
>> -	if (msg->wpos.obuf == con->tx.p_obuf) {
>> +	if (wpos->obuf == con->tx.p_obuf) {
>>   		/*
>>   		 * We got a message advancing the buffer which
>>   		 * is being appended to. The previous buffer is
>> @@ -1134,6 +1182,13 @@ tx_accept_msg(struct cmsg *m)
>>   		 */
>>   		con->tx.p_obuf = prev;
>>   	}
>> +}
>> +
>> +static inline struct iproto_msg *
>> +tx_accept_msg(struct cmsg *m)
>> +{
>> +	struct iproto_msg *msg = (struct iproto_msg *) m;
>> +	tx_accept_wpos(msg->connection, &msg->wpos);
>>   	return msg;
>>   }
> 
> This somehow looks half-baked, I don't know how yet.
> 
>> +c:call('push_null', {}, {on_push = on_push})
> 
> What happens if on_push handler is not set? Can I get the entire
> data set in a result when all pushes are over?
> 
> Can I get a data set as an iterable and yield in the iterator
> instead?
> 

I have discussed it with community (if it can be called 'discussion' -
almost every one does not care about API). The accepted decision - make
on_push and is_async be mutually exclusive, and make future object be
iterable.

If no one is set, then pushed messages are ignored.
The iterable API I have pushed as a separate commit, that I sends to the
same thread.

Also verbally you said that we should name Kharon iproto push message, that
notifies IProto thread about updated obuf. I have done it. Look the complete
diff below:

==========================================================================

commit 7525c7db82d01b5ad77c50f0fb70a6cede8f16c2
Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date:   Fri Apr 20 00:45:32 2018 +0300

     session: introduce binary box.session.push
     
     Box.session.push() allows to send a message to a client with no
     finishing a main request. Tarantool after this patch supports
     pushes over binary protocol.
     
     IProto message is encoded using a new header code - IPROTO_CHUNK.
     Push works as follows: a user calls box.session.push(message).
     The message is encoded into currently active obuf in TX thread,
     and then Kharon notifies IProto thread about new data.
     
     Originally Kharon is the ferryman of Hades who carries souls of
     the newly deceased across the rivers Styx and Acheron that
     divided the world of the living from the world of the dead. In
     Tarantool Kharon is a message and does the similar work. It
     notifies IProto thread about new data in an output buffer
     carrying pushed messages to IProto. Styx here is cpipe, and the
     boat is cbus message.
     
     One connection has single Kharon for all pushes. But Kharon can
     not be in two places at the time. So once he got away from TX to
     IProto, new messages can not send Kharon. They just set a special
     flag. When Kharon is back to TX and sees the flag is set, he
     immediately takes the road back to IProto.
     
     Herewith a user is not blocked to write to obuf when Kharon is
     busy. The user just updates obuf and set the flag if not set.
     There is no waiting for Kharon arrival back.
     
     Closes #2677

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 736a699a2..f9508ed7d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -66,6 +66,44 @@
  
  enum { IPROTO_SALT_SIZE = 32 };
  
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+	struct obuf *obuf;
+	struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+	wpos->obuf = out;
+	wpos->svp = obuf_create_svp(out);
+}
+
+/**
+ * Kharon is the ferryman of Hades who carries souls of the newly
+ * deceased across the rivers Styx and Acheron that divided the
+ * world of the living from the world of the dead.
+ * Here Kharon is a message and does the similar work. It notifies
+ * IProto thread about new data in an output buffer carrying
+ * pushed messages to IProto. Styx here is cpipe, and the boat is
+ * cbus message.
+ */
+struct kharon {
+	struct cmsg base;
+	/**
+	 * Before sending to IProto thread, the wpos is set to a
+	 * current position in an output buffer. Before IProto
+	 * returns the message to TX, it sets wpos to the last
+	 * flushed position (works like iproto_msg.wpos).
+	 */
+	struct iproto_wpos wpos;
+};
+
  /**
   * Network readahead. A signed integer to avoid
   * automatic type coercion to an unsigned type.
@@ -113,24 +151,6 @@ iproto_reset_input(struct ibuf *ibuf)
  	}
  }
  
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
-	struct obuf *obuf;
-	struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
-	wpos->obuf = out;
-	wpos->svp = obuf_create_svp(out);
-}
-
  /* {{{ iproto_msg - declaration */
  
  /**
@@ -365,6 +385,48 @@ struct iproto_connection
  		/** Pointer to the current output buffer. */
  		struct obuf *p_obuf;
  	} tx;
+	/**
+	 * Kharon serves to notify IProto about new data in obuf
+	 * when TX thread is initiator of the transfer. It is
+	 * possible for example on box.session.push.
+	 *
+	 * But Kharon can not be in two places at the time. So
+	 * once he got away from TX to IProto, new messages can
+	 * not send Kharon. They just set a special flag:
+	 * has_new_pushes. When Kharon is back to TX and sees the
+	 * flag is set, he immediately takes the road back to
+	 * IProto.
+	 *
+	 * Herewith a user is not blocked to write to obuf when
+	 * Kharon is busy. The user just updates obuf and set the
+	 * flag if not set. There is no waiting for Kharon arrival
+	 * back.
+	 *
+	 * Kharon allows to
+	 * 1) avoid multiple cbus messages from one session in
+	 *    fly,
+	 * 2) do not block a new push() until a previous push() is
+	 *    finished.
+	 *
+	 *      IProto                           TX
+	 * -------------------------------------------------------
+	 *                                 + [push message]
+	 *    start socket <--- notification ----
+	 *       write
+	 *                                 + [push message]
+	 *                                 + [push message]
+	 *                                       ...
+	 *      end socket
+	 *        write    ----------------> check for new
+	 *                                   pushes - found
+	 *                 <--- notification ---
+	 *                      ....
+	 */
+	bool has_new_pushes;
+	/** True if Kharon is on the way. */
+	bool is_kharon_on_road;
+	/** Push notification for IProto thread. */
+	struct kharon kharon;
  	/** Authentication salt. */
  	char salt[IPROTO_SALT_SIZE];
  };
@@ -882,6 +944,7 @@ iproto_connection_new(int fd)
  		diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
  		return NULL;
  	}
+	memset(con, 0, sizeof(*con));
  	con->input.data = con->output.data = con;
  	con->loop = loop();
  	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -894,9 +957,6 @@ iproto_connection_new(int fd)
  	con->tx.p_obuf = &con->obuf[0];
  	iproto_wpos_create(&con->wpos, con->tx.p_obuf);
  	iproto_wpos_create(&con->wend, con->tx.p_obuf);
-	con->parse_size = 0;
-	con->long_poll_requests = 0;
-	con->session = NULL;
  	rlist_create(&con->in_stop_list);
  	/* It may be very awkward to allocate at close. */
  	cmsg_init(&con->disconnect, disconnect_route);
@@ -1084,9 +1144,9 @@ error:
  }
  
  static void
-tx_fiber_init(struct session *session, uint64_t sync)
+tx_fiber_init(struct session *session, uint64_t *sync)
  {
-	session->meta.sync = sync;
+	session->meta.sync = sync != NULL ? *sync : 0;
  	/*
  	 * We do not cleanup fiber keys at the end of each request.
  	 * This does not lead to privilege escalation as long as
@@ -1099,6 +1159,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
  	 */
  	fiber_set_session(fiber(), session);
  	fiber_set_user(fiber(), &session->credentials);
+	fiber_set_key(fiber(), FIBER_KEY_SYNC, (void *) sync);
  }
  
  /**
@@ -1112,7 +1173,7 @@ tx_process_disconnect(struct cmsg *m)
  	struct iproto_connection *con =
  		container_of(m, struct iproto_connection, disconnect);
  	if (con->session) {
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
  		if (! rlist_empty(&session_on_disconnect))
  			session_run_on_disconnect_triggers(con->session);
  		session_destroy(con->session);
@@ -1185,15 +1246,16 @@ tx_discard_input(struct iproto_msg *msg)
   *   not, the empty buffer is selected.
   * - if neither of the buffers are empty, the function
   *   does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ *        thread.
   */
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
  {
-	struct iproto_msg *msg = (struct iproto_msg *) m;
-	struct iproto_connection *con = msg->connection;
-
  	struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
-	if (msg->wpos.obuf == con->tx.p_obuf) {
+	if (wpos->obuf == con->tx.p_obuf) {
  		/*
  		 * We got a message advancing the buffer which
  		 * is being appended to. The previous buffer is
@@ -1211,6 +1273,14 @@ tx_accept_msg(struct cmsg *m)
  		 */
  		con->tx.p_obuf = prev;
  	}
+}
+
+static inline struct iproto_msg *
+tx_accept_msg(struct cmsg *m)
+{
+	struct iproto_msg *msg = (struct iproto_msg *) m;
+	tx_accept_wpos(msg->connection, &msg->wpos);
+	tx_fiber_init(msg->connection->session, &msg->header.sync);
  	return msg;
  }
  
@@ -1255,8 +1325,6 @@ static void
  tx_process1(struct cmsg *m)
  {
  	struct iproto_msg *msg = tx_accept_msg(m);
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
  	if (tx_check_schema(msg->header.schema_version))
  		goto error;
  
@@ -1289,9 +1357,6 @@ tx_process_select(struct cmsg *m)
  	int count;
  	int rc;
  	struct request *req = &msg->dml;
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-
  	if (tx_check_schema(msg->header.schema_version))
  		goto error;
  
@@ -1339,9 +1404,6 @@ static void
  tx_process_call(struct cmsg *m)
  {
  	struct iproto_msg *msg = tx_accept_msg(m);
-
-	tx_fiber_init(msg->connection->session, msg->header.sync);
-
  	if (tx_check_schema(msg->header.schema_version))
  		goto error;
  
@@ -1423,9 +1485,6 @@ tx_process_misc(struct cmsg *m)
  	struct iproto_msg *msg = tx_accept_msg(m);
  	struct iproto_connection *con = msg->connection;
  	struct obuf *out = con->tx.p_obuf;
-
-	tx_fiber_init(con->session, msg->header.sync);
-
  	if (tx_check_schema(msg->header.schema_version))
  		goto error;
  
@@ -1463,9 +1522,6 @@ tx_process_join_subscribe(struct cmsg *m)
  {
  	struct iproto_msg *msg = tx_accept_msg(m);
  	struct iproto_connection *con = msg->connection;
-
-	tx_fiber_init(con->session, msg->header.sync);
-
  	try {
  		switch (msg->header.type) {
  		case IPROTO_JOIN:
@@ -1582,7 +1638,7 @@ tx_process_connect(struct cmsg *m)
  		if (con->session == NULL)
  			diag_raise();
  		con->session->meta.connection = con;
-		tx_fiber_init(con->session, 0);
+		tx_fiber_init(con->session, NULL);
  		static __thread char greeting[IPROTO_GREETING_SIZE];
  		/* TODO: dirty read from tx thread */
  		struct tt_uuid uuid = INSTANCE_UUID;
@@ -1741,6 +1797,99 @@ iproto_session_sync(struct session *session)
  	return session->meta.sync;
  }
  
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param conn IProto connection.
+ */
+static void
+tx_begin_push(struct iproto_connection *conn);
+
+/**
+ * Kharon reached the dead world (IProto). He schedules an event
+ * to flush new obuf data up to brought wpos.
+ * @param m Kharon.
+ */
+static void
+kharon_go_dead(struct cmsg *m)
+{
+	struct kharon *kharon = (struct kharon *) m;
+	struct iproto_connection *conn =
+		container_of(kharon, struct iproto_connection, kharon);
+	conn->wend = kharon->wpos;
+	kharon->wpos = conn->wpos;
+	if (evio_has_fd(&conn->output) && !ev_is_active(&conn->output))
+		ev_feed_event(conn->loop, &conn->output, EV_WRITE);
+}
+
+/**
+ * Kharon is in living world (TX) back from dead one (IProto). He
+ * checks if new push messages appeared during its trip and goes
+ * to IProto again if needed.
+ * @param m Kharon.
+ */
+static void
+kharon_go_alive(struct cmsg *m)
+{
+	struct kharon *kharon = (struct kharon *) m;
+	struct iproto_connection *conn =
+		container_of(kharon, struct iproto_connection, kharon);
+	tx_accept_wpos(conn, &kharon->wpos);
+	conn->is_kharon_on_road = false;
+	if (conn->has_new_pushes)
+		tx_begin_push(conn);
+}
+
+static const struct cmsg_hop push_route[] = {
+	{ kharon_go_dead, &tx_pipe },
+	{ kharon_go_alive, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_connection *conn)
+{
+	assert(! conn->is_kharon_on_road);
+	cmsg_init((struct cmsg *) &conn->kharon, push_route);
+	iproto_wpos_create(&conn->kharon.wpos, conn->tx.p_obuf);
+	conn->has_new_pushes = false;
+	conn->is_kharon_on_road = true;
+	cpipe_push(&net_pipe, (struct cmsg *) &conn->kharon);
+}
+
+/**
+ * Push a message from @a port to a remote client.
+ * @param session IProto session.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success, a message is wrote to an output buffer. But
+ *          it is not guaranteed, that it will be sent
+ *          successfully.
+ */
+static int
+iproto_session_push(struct session *session, struct port *port)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *) session->meta.connection;
+	struct obuf_svp svp;
+	if (iproto_prepare_select(con->tx.p_obuf, &svp) != 0)
+		return -1;
+	if (port_dump_msgpack(port, con->tx.p_obuf) != 0) {
+		obuf_rollback_to_svp(con->tx.p_obuf, &svp);
+		return -1;
+	}
+	iproto_reply_chunk(con->tx.p_obuf, &svp, fiber_sync(fiber()),
+			   ::schema_version);
+	if (! con->is_kharon_on_road)
+		tx_begin_push(con);
+	else
+		con->has_new_pushes = true;
+	return 0;
+}
+
+/** }}} */
+
  /** Initialize the iproto subsystem and start network io thread */
  void
  iproto_init()
@@ -1754,7 +1903,7 @@ iproto_init()
  	cpipe_create(&net_pipe, "net");
  	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
  	struct session_vtab iproto_session_vtab = {
-		/* .push = */ generic_session_push,
+		/* .push = */ iproto_session_push,
  		/* .fd = */ iproto_session_fd,
  		/* .sync = */ iproto_session_sync,
  	};
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 46d47719b..dd50bf874 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -164,6 +164,9 @@ enum iproto_type {
  	/** Vinyl row index stored in .run file */
  	VY_RUN_ROW_INDEX = 102,
  
+	/** Non-final response type. */
+	IPROTO_CHUNK = 128,
+
  	/**
  	 * Error codes = (IPROTO_TYPE_ERROR | ER_XXX from errcode.h)
  	 */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8fcaf89e8..fe113e740 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -39,6 +39,8 @@ local IPROTO_SCHEMA_VERSION_KEY = 0x05
  local IPROTO_DATA_KEY      = 0x30
  local IPROTO_ERROR_KEY     = 0x31
  local IPROTO_GREETING_SIZE = 128
+local IPROTO_CHUNK_KEY     = 128
+local IPROTO_OK_KEY        = 0
  
  -- select errors from box.error
  local E_UNKNOWN              = box.error.UNKNOWN
@@ -74,6 +76,10 @@ local function decode_count(raw_data)
      local response, raw_end = decode(raw_data)
      return response[IPROTO_DATA_KEY][1], raw_end
  end
+local function decode_push(raw_data)
+    local response, raw_end = decode(raw_data)
+    return response[IPROTO_DATA_KEY][1], raw_end
+end
  
  local method_encoder = {
      ping    = internal.encode_ping,
@@ -114,6 +120,7 @@ local method_decoder = {
      max     = decode_get,
      count   = decode_count,
      inject  = decode_data,
+    push    = decode_push,
  }
  
  local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
@@ -149,6 +156,12 @@ local function establish_connection(host, port, timeout)
      return s, greeting
  end
  
+--
+-- Default action on push during a synchronous request -
+-- ignore.
+--
+local function on_push_sync_default(...) end
+
  --
  -- Basically, *transport* is a TCP connection speaking one of
  -- Tarantool network protocols. This is a low-level interface.
@@ -397,7 +410,8 @@ local function create_transport(host, port, user, password, callback,
      -- @retval nil, error Error occured.
      -- @retval not nil Future object.
      --
-    local function perform_async_request(buffer, method, ...)
+    local function perform_async_request(buffer, method, on_push, on_push_ctx,
+                                         ...)
          if state ~= 'active' and state ~= 'fetch_schema' then
              return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
                                         reason = last_error})
@@ -410,14 +424,17 @@ local function create_transport(host, port, user, password, callback,
          local id = next_request_id
          method_encoder[method](send_buf, id, ...)
          next_request_id = next_id(id)
-        -- Request has maximum 6 members:
-        -- method, buffer, id, cond, errno, response.
-        local request = setmetatable(table_new(0, 6), request_mt)
+        -- Request in most cases has maximum 8 members:
+        -- method, buffer, id, cond, errno, response, on_push,
+        -- on_push_ctx.
+        local request = setmetatable(table_new(0, 8), request_mt)
          request.method = method
          request.buffer = buffer
          request.id = id
          request.cond = fiber.cond()
          requests[id] = request
+        request.on_push = on_push
+        request.on_push_ctx = on_push_ctx
          return request
      end
  
@@ -426,9 +443,10 @@ local function create_transport(host, port, user, password, callback,
      -- @retval nil, error Error occured.
      -- @retval not nil Response object.
      --
-    local function perform_request(timeout, buffer, method, ...)
+    local function perform_request(timeout, buffer, method, on_push,
+                                   on_push_ctx, ...)
          local request, err =
-            perform_async_request(buffer, method, ...)
+            perform_async_request(buffer, method, on_push, on_push_ctx, ...)
          if not request then
              return nil, err
          end
@@ -441,13 +459,13 @@ local function create_transport(host, port, user, password, callback,
          if request == nil then -- nobody is waiting for the response
              return
          end
-        requests[id] = nil
-        request.id = nil
          local status = hdr[IPROTO_STATUS_KEY]
          local body, body_end_check
  
-        if status ~= 0 then
+        if status > IPROTO_CHUNK_KEY then
              -- Handle errors
+            requests[id] = nil
+            request.id = nil
              body, body_end_check = decode(body_rpos)
              assert(body_end == body_end_check, "invalid xrow length")
              request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -462,16 +480,33 @@ local function create_transport(host, port, user, password, callback,
              local body_len = body_end - body_rpos
              local wpos = buffer:alloc(body_len)
              ffi.copy(wpos, body_rpos, body_len)
-            request.response = tonumber(body_len)
+            body_len = tonumber(body_len)
+            if status == IPROTO_OK_KEY then
+                request.response = body_len
+                requests[id] = nil
+                request.id = nil
+            else
+                request.on_push(request.on_push_ctx, body_len)
+            end
              request.cond:broadcast()
              return
          end
  
-        -- Decode xrow.body[DATA] to Lua objects
          local real_end
-        request.response, real_end, request.errno =
-            method_decoder[request.method](body_rpos, body_end)
-        assert(real_end == body_end, "invalid body length")
+        -- Decode xrow.body[DATA] to Lua objects
+        if status == IPROTO_OK_KEY then
+            request.response, real_end, request.errno =
+                method_decoder[request.method](body_rpos, body_end)
+            assert(real_end == body_end, "invalid body length")
+            requests[id] = nil
+            request.id = nil
+        else
+            local msg
+            msg, real_end, request.errno =
+                method_decoder.push(body_rpos, body_end)
+            assert(real_end == body_end, "invalid body length")
+            request.on_push(request.on_push_ctx, msg)
+        end
          request.cond:broadcast()
      end
  
@@ -947,25 +982,40 @@ end
  
  function remote_methods:_request(method, opts, ...)
      local transport = self._transport
-    local buffer = opts and opts.buffer
-    if opts and opts.is_async then
-        return transport.perform_async_request(buffer, method, ...)
-    end
-    local deadline
-    if opts and opts.timeout then
-        -- conn.space:request(, { timeout = timeout })
-        deadline = fiber_clock() + opts.timeout
+    local on_push, on_push_ctx, buffer, deadline
+    -- Extract options, set defaults, check if the request is
+    -- async.
+    if opts then
+        buffer = opts.buffer
+        if opts.is_async then
+            if opts.on_push or opts.on_push_ctx then
+                error('To handle pushes in an async request use future:pairs()')
+            end
+            return transport.perform_async_request(buffer, method, table.insert,
+                                                   {}, ...)
+        end
+        if opts.timeout then
+            -- conn.space:request(, { timeout = timeout })
+            deadline = fiber_clock() + opts.timeout
+        else
+            -- conn:timeout(timeout).space:request()
+            -- @deprecated since 1.7.4
+            deadline = self._deadlines[fiber_self()]
+        end
+        on_push = opts.on_push or on_push_sync_default
+        on_push_ctx = opts.on_push_ctx
      else
-        -- conn:timeout(timeout).space:request()
-        -- @deprecated since 1.7.4
          deadline = self._deadlines[fiber_self()]
+        on_push = on_push_sync_default
      end
+    -- Execute synchronous request.
      local timeout = deadline and max(0, deadline - fiber_clock())
      if self.state ~= 'active' then
          transport.wait_state('active', timeout)
          timeout = deadline and max(0, deadline - fiber_clock())
      end
-    local res, err = transport.perform_request(timeout, buffer, method, ...)
+    local res, err = transport.perform_request(timeout, buffer, method,
+                                               on_push, on_push_ctx, ...)
      if err then
          box.error(err)
      end
@@ -1159,10 +1209,10 @@ function console_methods:eval(line, timeout)
      end
      if self.protocol == 'Binary' then
          local loader = 'return require("console").eval(...)'
-        res, err = pr(timeout, nil, 'eval', loader, {line})
+        res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
      else
          assert(self.protocol == 'Lua console')
-        res, err = pr(timeout, nil, 'inject', line..'$EOF$\n')
+        res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
      end
      if err then
          box.error(err)
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 306271809..c3db93627 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
  #include "session.h"
  #include "lua/utils.h"
  #include "lua/trigger.h"
+#include "lua/msgpack.h"
  
  #include <lua.h>
  #include <lauxlib.h>
@@ -43,6 +44,7 @@
  #include "box/schema.h"
  #include "box/port.h"
  #include "box/lua/console.h"
+#include "small/obuf.h"
  
  static const char *sessionlib_name = "box.session";
  
@@ -371,8 +373,11 @@ struct lua_push_port {
  static const char *
  lua_push_port_dump_plain(struct port *port, uint32_t *size);
  
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf);
+
  static const struct port_vtab lua_push_port_vtab = {
-       .dump_msgpack = NULL,
+       .dump_msgpack = lua_push_port_dump_msgpack,
         /*
          * Dump_16 has no sense, since push appears since 1.10
          * protocol.
@@ -403,6 +408,32 @@ lua_push_port_dump_plain(struct port *port, uint32_t *size)
  	return result;
  }
  
+static void
+obuf_error_cb(void *ctx)
+{
+	*((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf)
+{
+	struct lua_push_port *lua_port = (struct lua_push_port *) port;
+	assert(lua_port->vtab == &lua_push_port_vtab);
+	struct mpstream stream;
+	int rc = 0;
+	/*
+	 * Do not use luamp_error to allow a caller to clear the
+	 * obuf, if it already has allocated something (for
+	 * example, iproto push reserves memory for a header).
+	 */
+	mpstream_init(&stream, obuf, obuf_reserve_cb, obuf_alloc_cb,
+		      obuf_error_cb, &rc);
+	luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+	if (rc == 0)
+		mpstream_flush(&stream);
+	return rc;
+}
+
  /**
   * Push a message using a protocol, depending on a session type.
   * @param data Data to push, first argument on a stack.
diff --git a/src/box/xrow.c b/src/box/xrow.c
index b3f81a86f..64f490995 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -377,6 +377,19 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
  	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
  }
  
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version)
+{
+	char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+	iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version,
+			     obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+	struct iproto_body_bin body = iproto_body_bin;
+	body.v_data_len = mp_bswap_u32(1);
+
+	memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
+}
+
  int
  xrow_decode_dml(struct xrow_header *row, struct request *request,
  		uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b10bf26d5..1bb5f103b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -411,6 +411,18 @@ int
  iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
  		   uint32_t schema_version);
  
+/**
+ * Write an IPROTO_CHUNK header from a specified position in a
+ * buffer.
+ * @param buf Buffer to write to.
+ * @param svp Position to write from.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ */
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+		   uint32_t schema_version);
+
  /** Write error directly to a socket. */
  void
  iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
diff --git a/src/fiber.h b/src/fiber.h
index 8231bba24..eb89c48cd 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,8 +105,13 @@ enum fiber_key {
  	/** User global privilege and authentication token */
  	FIBER_KEY_USER = 3,
  	FIBER_KEY_MSG = 4,
-	/** Storage for lua stack */
+	/**
+	 * The storage cell number 5 is shared between lua stack
+	 * for fibers created from Lua, and IProto sync for fibers
+	 * created to execute a binary request.
+	 */
  	FIBER_KEY_LUA_STACK = 5,
+	FIBER_KEY_SYNC      = 5,
  	FIBER_KEY_MAX = 6
  };
  
@@ -610,6 +615,13 @@ fiber_get_key(struct fiber *fiber, enum fiber_key key)
  	return fiber->fls[key];
  }
  
+static inline uint64_t
+fiber_sync(struct fiber *fiber)
+{
+	uint64_t *sync = (uint64_t *) fiber_get_key(fiber, FIBER_KEY_SYNC);
+	return sync != NULL ? *sync : 0;
+}
+
  /**
   * Finalizer callback
   * \sa fiber_key_on_gc()
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 1674c27e1..1a9758cd2 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
      return cn:_request('select', opts, space_id, index_id, iterator,
                         offset, limit, key)
  end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
  test_run:cmd("setopt delimiter ''");
  ---
  ...
@@ -2365,7 +2365,7 @@ c.space.test:delete{1}
  --
  -- Break a connection to test reconnect_after.
  --
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  ---
  ...
  c.state
@@ -2939,7 +2939,7 @@ c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
  future = c:call('long_function', {1, 2, 3}, {is_async = true})
  ---
  ...
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  ---
  ...
  while not c:is_connected() do fiber.sleep(0.01) end
@@ -3053,7 +3053,7 @@ c:ping()
  -- new attempts to read any data - the connection is closed
  -- already.
  --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', 'long', {}) c._transport.perform_request(nil, nil, 'inject', '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  ---
  ...
  while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index c34616aec..6a36c812d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
      return cn:_request('select', opts, space_id, index_id, iterator,
                         offset, limit, key)
  end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
  test_run:cmd("setopt delimiter ''");
  
  LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
  --
  -- Break a connection to test reconnect_after.
  --
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  c.state
  while not c:is_connected() do fiber.sleep(0.01) end
  c:ping()
@@ -1173,7 +1173,7 @@ finalize_long()
  --
  c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
  future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  while not c:is_connected() do fiber.sleep(0.01) end
  finalize_long()
  future:wait_result(100)
@@ -1222,7 +1222,7 @@ c:ping()
  -- new attempts to read any data - the connection is closed
  -- already.
  --
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', 'long', {}) c._transport.perform_request(nil, nil, 'inject', '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil, nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
  while f:status() ~= 'dead' do fiber.sleep(0.01) end
  c:close()
  
diff --git a/test/box/push.result b/test/box/push.result
index 816f06e00..04cdc474b 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -1,5 +1,8 @@
+test_run = require('test_run').new()
+---
+...
  --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
  --
  --
  -- Usage.
@@ -12,18 +15,29 @@ box.session.push(1, 2)
  ---
  - error: 'Usage: box.session.push(data)'
  ...
-ok = nil
+fiber = require('fiber')
  ---
  ...
-err = nil
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
  ---
+- true
  ...
-function do_push() ok, err = box.session.push(1) end
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
  ---
  ...
---
--- Test binary protocol.
---
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
  netbox = require('net.box')
  ---
  ...
@@ -37,27 +51,213 @@ c:ping()
  ---
  - true
  ...
-c:call('do_push')
+c:call('do_pushes', {}, {on_push = table.insert, on_push_ctx = messages})
  ---
+- 300
  ...
-ok, err
+messages
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+---
+...
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+---
+...
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+---
+...
+messages = {}
+---
+...
+c:call('push_null', {}, {on_push = table.insert, on_push_ctx = messages})
+---
+...
+messages
+---
+- - null
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = table.insert, on_push_ctx = messages, buffer = ibuf})
  ---
-- null
-- Session 'binary' does not support push()
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 8
+  - 8
+  - 8
+  - 8
+  - 8
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {48: [1]}
+  - {48: [2]}
+  - {48: [3]}
+  - {48: [4]}
+  - {48: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
  ...
  c:close()
  ---
  ...
+s:drop()
+---
+...
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
  ---
  ...
  --
  -- Ensure can not push in background.
  --
-fiber = require('fiber')
+ok = nil
+---
+...
+err = nil
  ---
  ...
-f = fiber.create(do_push)
+f = fiber.create(function() ok, err = box.session.push(100) end)
  ---
  ...
  while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index a59fe0a4c..b0ff218bb 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -1,5 +1,6 @@
+test_run = require('test_run').new()
  --
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
  --
  
  --
@@ -8,28 +9,135 @@
  box.session.push()
  box.session.push(1, 2)
  
-ok = nil
-err = nil
-function do_push() ok, err = box.session.push(1) end
+fiber = require('fiber')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function do_pushes()
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+test_run:cmd("setopt delimiter ''");
  
---
--- Test binary protocol.
---
  netbox = require('net.box')
  box.schema.user.grant('guest', 'read,write,execute', 'universe')
  
  c = netbox.connect(box.cfg.listen)
  c:ping()
-c:call('do_push')
-ok, err
+c:call('do_pushes', {}, {on_push = table.insert, on_push_ctx = messages})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+    box.session.push('started dml')
+    s:replace{key}
+    box.session.push('continued dml')
+    s:replace{-key}
+    box.session.push('finished dml')
+    return key
+end;
+function do_pushes(val)
+    for i = 1, 5 do
+        box.session.push(i)
+        fiber.yield()
+    end
+    return val
+end;
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started},
+                            {on_push = table.insert,
+                             on_push_ctx = catcher.messages})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3, 'dml sends 3 messages')
+        assert(c.messages[1] == 'started dml', 'started')
+        assert(c.messages[2] == 'continued dml', 'continued')
+        assert(c.messages[3] == 'finished dml', 'finished')
+        assert(s:get{c.retval}, 'inserted +')
+        assert(s:get{-c.retval}, 'inserted -')
+    else
+        assert(c.retval, 'something is returned')
+        assert(#c.messages == 5, 'non-dml sends 5 messages')
+        for k, v in pairs(c.messages) do
+            assert(k == v, 'with equal keys and values')
+        end
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+messages = {}
+c:call('push_null', {}, {on_push = table.insert, on_push_ctx = messages})
+messages
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = table.insert, on_push_ctx = messages, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
  c:close()
+s:drop()
  
  box.schema.user.revoke('guest', 'read,write,execute', 'universe')
  
  --
  -- Ensure can not push in background.
  --
-fiber = require('fiber')
-f = fiber.create(do_push)
+ok = nil
+err = nil
+f = fiber.create(function() ok, err = box.session.push(100) end)
  while f:status() ~= 'dead' do fiber.sleep(0.01) end
  ok, err




More information about the Tarantool-patches mailing list