[PATCH 5/5] session: introduce box.session.push

Vladimir Davydov vdavydov.dev at gmail.com
Wed Mar 21 12:10:50 MSK 2018


On Mon, Mar 19, 2018 at 04:34:52PM +0300, Vladislav Shpilevoy wrote:
> Box.session.push() allows to send a message to a client with no
> finishing a main request.
> 
> Tarantool supports two push types: via text protocol and via
> binary protocol. Text protocol push message contains "push:"
> prefix followed by a YAML formatted text, where as a final
> response always has '---' prefix. To catch pushed messages on a
> client side use console.connect() on_push options which takes
> a function with a single argument - message.
> 
> IProto message is encoded just like a regular response, but
> instead of IPROTO_DATA it has IPROTO_PUSH with a single element -
> pushed MessagePack encoded data.
> 
> Text push is trivial - it is just blocking write into a socket
> with a prefix. Binary push is more complex.
> 
> TX thread to notify IProto thread about new data in obuf sends
> a message 'push_msg'. IProto thread, got this message, notifies
> libev about new data, and then sends 'push_msg' back with
> updated write position. TX thread, received the message back,
> updates its version of a write position. If IProto will not send
> a write position, then TX will write to the same obuf again
> and again, because it can not know that IProto already flushed
> another obuf.
> 
> To avoid multiple 'push_msg' in fly between IProto and TX, the
> only one 'push_msg' per connection is used. To deliver pushes,
> appeared when 'push_msg' was in fly, TX thread sets a flag every
> time when sees, that 'push_msg' is sent, and there is a new push.
> When 'push_msg' returns, it checks this flag, and if it is set,
> then the IProto is notified again.
> 
> Closes #2677

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> @@ -70,6 +109,37 @@ struct iproto_session_owner {
>  	char salt[IPROTO_SALT_SIZE];
>  	/** IProto connection. */
>  	struct iproto_connection *connection;
> +	/**
> +	 * Is_push_in_progress is set, when a push_msg is sent to
> +	 * IProto thread, and reset, when the message is returned
> +	 * to TX. If a new push sees, that a push_msg is already
> +	 * sent to IProto, then has_new_pushes is set. After push
> +	 * notification is returned to TX, it checks
> +	 * has_new_pushes. If it is set, then the notification is
> +	 * sent again. This ping-pong continues, until TX stopped
> +	 * pushing. It allows to
> +	 * 1) avoid multiple push_msg from one session in fly,
> +	 * 2) do not block push() until a previous push() is
> +	 *    finished.
> +	 *
> +	 *      IProto                           TX
> +	 * -------------------------------------------------------
> +	 *                                 + [push message]
> +	 *    start socket <--- notification ----
> +	 *       write
> +	 *                                 + [push message]
> +	 *                                 + [push message]
> +	 *                                       ...
> +	 *      end socket
> +	 *        write    ----------------> check for new
> +	 *                                   pushes - found
> +	 *                 <--- notification ---
> +	 *                      ....
> +	 */
> +	bool has_new_pushes;
> +	bool is_push_in_progress;
> +	/** Push notification for IProto thread. */
> +	struct iproto_push_msg push_msg;

Please move this to iproto_connection.

> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> @@ -79,6 +79,14 @@ enum iproto_key {
>  	/* Leave a gap between request keys and response keys */
>  	IPROTO_DATA = 0x30,
>  	IPROTO_ERROR = 0x31,
> +	/**
> +	 * Tarantool supports two push types: binary and text.
> +	 * A text push can be distinguished from a response by a
> +	 * prefix "push:".

I doubt we need to designate text pushes at all. IMO they are useful
only for printing text to the user console. I suggest you disable
the on_push callback if net_box is operating in the 'console' mode,
instead just append pushes to the output, without a prefix.

> +	 * Binary push is encoded using IPROTO_PUSH key in a
> +	 * message body, which replaces IPROTO_DATA.
> +	 */
> +	IPROTO_PUSH = 0x32,

> diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> +/**
> + * Search for IPROTO_PUSH key in a MessagePack encoded response
> + * body. It is needed without entire message decoding, when a user
> + * wants to store raw responses and pushes in its own buffer.
> + */
> +static int
> +netbox_body_is_push(struct lua_State *L)
> +{
> +	uint32_t ctypeid;
> +	const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
> +	assert(ctypeid == luaL_ctypeid(L, "char *"));
> +	assert(mp_typeof(*body) == MP_MAP);
> +	lua_pushboolean(L, mp_decode_map(&body) == 1 &&
> +			   mp_typeof(*body) == MP_UINT &&
> +			   mp_decode_uint(&body) == IPROTO_PUSH);
> +	return 1;
> +}
> +

Can't you do this check in net_box.lua, without involving C?

> diff --git a/src/box/lua/session.c b/src/box/lua/session.c
> +/**
> + * Write @a text into @a fd in a blocking mode, ignoring transient
> + * socket errors.
> + * @param fd Console descriptor.
> + * @param text Text to send.
> + * @param len Length of @a text.
> + */
> +static inline int
> +console_do_push(int fd, const char *text, uint32_t len)
> +{
> +	while (len > 0) {
> +		int written = fio_write_silent(fd, text, len);

I don't think that using a blocking function here is acceptable
(AFAICS fio_write_silent() calls the write syscall on session fd).

> +		if (written < 0)
> +			return -1;
> +		assert((uint32_t) written <= len);
> +		len -= written;
> +		text += written;
> +	}
> +	return 0;
> +}
> +
> +static int
> +console_session_owner_push(struct session_owner *owner, uint64_t sync,
> +			   struct port *port)
> +{
> +	/* Console has no sync. */
> +	(void) sync;
> +	assert(owner->vtab == &console_session_owner_vtab);
> +	int fd = console_session_owner_fd(owner);
> +	uint32_t text_len;
> +	const char *text = port_dump_raw(port, &text_len);
> +	if (text == NULL ||
> +	    console_do_push(fd, "push:", strlen("push:")) != 0 ||
> +	    console_do_push(fd, text, text_len) != 0)
> +		return -1;
> +	else
> +		return 0;
> +}

> +/**
> + * Push a message using a protocol, depending on a session type.
> + * @param data Data to push, first argument on a stack.
> + * @param opts Options. Now requires a single possible option -
> + *        sync. Second argument on a stack.
> + */
> +static int
> +lbox_session_push(struct lua_State *L)
> +{
> +	if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
> +usage_error:
> +		return luaL_error(L, "Usage: box.session.push(data, opts)");

I don't think that we should oblige the user to pass the 'sync' value
explicitly - this would be really annoying. I think we should save the
sync somehow (fiber local storage, request?) and pass it implicitly.

> +	}
> +	lua_getfield(L, 2, "sync");
> +	if (! lua_isnumber(L, 3))
> +		goto usage_error;
> +	double lua_sync = lua_tonumber(L, 3);
> +	lua_pop(L, 1);
> +	uint64_t sync = (uint64_t) lua_sync;
> +	if (lua_sync != sync)
> +		goto usage_error;
> +	struct lua_push_port port;
> +	port.vtab = &lua_push_port_vtab;
> +	port.L = L;
> +	/*
> +	 * Pop the opts - they must not be pushed. Leave only data
> +	 * on a stack.
> +	 */
> +	lua_remove(L, 2);
> +	if (session_push(current_session(), sync, (struct port *) &port) != 0) {
> +		return luaT_error(L);
> +	} else {
> +		lua_pushboolean(L, true);
> +		return 1;

What's the point in returning 'true' on success?

> +	}
> +}

> diff --git a/src/box/port.h b/src/box/port.h
> @@ -76,6 +76,11 @@ struct port_vtab {
>  	 * format.
>  	 */
>  	int (*dump_16)(struct port *port, struct obuf *out);
> +	/**
> +	 * Same as dump, but find a memory for an output buffer
> +	 * for itself.
> +	 */
> +	const char *(*dump_raw)(struct port *port, uint32_t *size);

Somehow this doesn't feel right. May be, we should encode Lua stack in
msgpack first, and then re-encode it to Yaml. May be, we shouldn't use
the 'port' at all. May be, I'm being too picky, and we should leave it
as is. Anyway, please think of alternatives.

> diff --git a/src/box/xrow.c b/src/box/xrow.c
> @@ -43,6 +43,9 @@
>  #include "scramble.h"
>  #include "iproto_constants.h"
>  
> +static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
> +	      "encoded IPROTO_BODY keys must fit into one byte");
> +

Why check this now? IPROTO_DATA and IPROTO_PUSH can't occasionally
change as they are defined in the binary protocol so there's no point
in this static assertion IMO.

>  int
>  xrow_header_decode(struct xrow_header *header, const char **pos,
>  		   const char *end)
> @@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
>  	uint32_t v_data_len;               /* string length of array size */
>  };
>  
> +static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
> +	      IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
> +
>  static const struct iproto_body_bin iproto_body_bin = {
>  	0x81, IPROTO_DATA, 0xdd, 0
>  };
> @@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
>  	0x81, IPROTO_ERROR, 0xdb, 0
>  };
>  
> +struct PACKED iproto_body_push_bin {
> +	uint8_t m_body;       /* MP_MAP */
> +	uint8_t k_data;       /* IPROTO_PUSH */
> +	uint8_t v_data;       /* 1-size MP_ARRAY */
> +};

Why don't you just reuse iproto_body_bin for this?



More information about the Tarantool-patches mailing list