[tarantool-patches] [PATCH v2 10/10] session: introduce binary box.session.push
Vladislav Shpilevoy
v.shpilevoy at tarantool.org
Fri Apr 20 16:24:26 MSK 2018
Box.session.push() allows to send a message to a client with no
finishing a main request.
Tarantool after this patch supports pushes over binary protocol.
IProto message is encoded using a new header code - IPROTO_CHUNK.
TX thread to notify IProto thread about new data in obuf sends
a message 'push_msg'. IProto thread, got this message, notifies
libev about new data, and then sends 'push_msg' back with
updated write position. TX thread, received the message back,
updates its version of a write position. If IProto would not send
a write position, then TX would write to the same obuf again and
again, because it can not know that IProto already flushed
another obuf.
To avoid multiple 'push_msg' in fly between IProto and TX, the
only one 'push_msg' per connection is used. To deliver pushes,
appeared when 'push_msg' was in fly, TX thread sets a flag every
time when sees, that 'push_msg' is sent, and there is a new push.
When 'push_msg' returns, it checks this flag, and if it is set,
the IProto is notified again.
Closes #2677
---
src/box/iproto.cc | 221 +++++++++++++++++++++++++++++++++++--------
src/box/iproto_constants.h | 3 +
src/box/lua/net_box.lua | 50 ++++++----
src/box/lua/session.c | 33 ++++++-
src/box/xrow.c | 13 +++
src/box/xrow.h | 12 +++
src/fiber.h | 14 ++-
test/box/net.box.result | 4 +-
test/box/net.box.test.lua | 4 +-
test/box/push.result | 231 ++++++++++++++++++++++++++++++++++++++++++---
test/box/push.test.lua | 134 +++++++++++++++++++++++---
11 files changed, 635 insertions(+), 84 deletions(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 38baf1b8d..78a551039 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -67,6 +67,41 @@ enum { IPROTO_MSG_MAX = 768 };
enum { IPROTO_SALT_SIZE = 32 };
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+ struct obuf *obuf;
+ struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+ wpos->obuf = out;
+ wpos->svp = obuf_create_svp(out);
+}
+
+/**
+ * Message to notify IProto thread about new data in an output
+ * buffer. Struct iproto_msg is not used here, because push
+ * notification can be much more compact: it does not have
+ * request, ibuf, length, flags ...
+ */
+struct iproto_push_msg {
+ struct cmsg base;
+ /**
+ * Before sending to IProto thread, the wpos is set to a
+ * current position in an output buffer. Before IProto
+ * returns the message to TX, it sets wpos to the last
+ * flushed position (works like iproto_msg.wpos).
+ */
+ struct iproto_wpos wpos;
+};
+
/**
* Network readahead. A signed integer to avoid
* automatic type coercion to an unsigned type.
@@ -111,24 +146,6 @@ iproto_reset_input(struct ibuf *ibuf)
}
}
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
- struct obuf *obuf;
- struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
- wpos->obuf = out;
- wpos->svp = obuf_create_svp(out);
-}
-
/* {{{ iproto_msg - declaration */
/**
@@ -367,6 +384,37 @@ struct iproto_connection
/** Pointer to the current output buffer. */
struct obuf *p_obuf;
} tx;
+ /**
+ * Is_push_in_progress is set, when a push_msg is sent to
+ * IProto thread, and reset, when the message is returned
+ * to TX. If a new push sees, that a push_msg is already
+ * sent to IProto, then has_new_pushes is set. After push
+ * notification is returned to TX, it checks
+ * has_new_pushes. If it is set, then the notification is
+ * sent again. This ping-pong continues, until TX stopped
+ * pushing. It allows to
+ * 1) avoid multiple push_msg from one session in fly,
+ * 2) do not block push() until a previous push() is
+ * finished.
+ *
+ * IProto TX
+ * -------------------------------------------------------
+ * + [push message]
+ * start socket <--- notification ----
+ * write
+ * + [push message]
+ * + [push message]
+ * ...
+ * end socket
+ * write ----------------> check for new
+ * pushes - found
+ * <--- notification ---
+ * ....
+ */
+ bool has_new_pushes;
+ bool is_push_in_progress;
+ /** Push notification for IProto thread. */
+ struct iproto_push_msg push_msg;
/** Authentication salt. */
char salt[IPROTO_SALT_SIZE];
};
@@ -806,6 +854,7 @@ iproto_connection_new(int fd)
{
struct iproto_connection *con = (struct iproto_connection *)
mempool_alloc_xc(&iproto_connection_pool);
+ memset(con, 0, sizeof(*con));
con->input.data = con->output.data = con;
con->loop = loop();
ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
@@ -818,9 +867,6 @@ iproto_connection_new(int fd)
con->tx.p_obuf = &con->obuf[0];
iproto_wpos_create(&con->wpos, con->tx.p_obuf);
iproto_wpos_create(&con->wend, con->tx.p_obuf);
- con->parse_size = 0;
- con->long_poll_requests = 0;
- con->session = NULL;
rlist_create(&con->in_stop_list);
/* It may be very awkward to allocate at close. */
con->disconnect = iproto_msg_new(con);
@@ -1010,9 +1056,9 @@ error:
}
static void
-tx_fiber_init(struct session *session, uint64_t sync)
+tx_fiber_init(struct session *session, uint64_t *sync)
{
- session->meta.sync = sync;
+ session->meta.sync = sync != NULL ? *sync : 0;
/*
* We do not cleanup fiber keys at the end of each request.
* This does not lead to privilege escalation as long as
@@ -1025,6 +1071,7 @@ tx_fiber_init(struct session *session, uint64_t sync)
*/
fiber_set_session(fiber(), session);
fiber_set_user(fiber(), &session->credentials);
+ fiber_set_key(fiber(), FIBER_KEY_SYNC, (void *) sync);
}
/**
@@ -1038,7 +1085,7 @@ tx_process_disconnect(struct cmsg *m)
struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
if (con->session) {
- tx_fiber_init(con->session, 0);
+ tx_fiber_init(con->session, NULL);
if (! rlist_empty(&session_on_disconnect))
session_run_on_disconnect_triggers(con->session);
session_destroy(con->session);
@@ -1108,15 +1155,16 @@ tx_discard_input(struct iproto_msg *msg)
* not, the empty buffer is selected.
* - if neither of the buffers are empty, the function
* does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ * thread.
*/
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
{
- struct iproto_msg *msg = (struct iproto_msg *) m;
- struct iproto_connection *con = msg->connection;
-
struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
- if (msg->wpos.obuf == con->tx.p_obuf) {
+ if (wpos->obuf == con->tx.p_obuf) {
/*
* We got a message advancing the buffer which
* is being appended to. The previous buffer is
@@ -1134,6 +1182,13 @@ tx_accept_msg(struct cmsg *m)
*/
con->tx.p_obuf = prev;
}
+}
+
+static inline struct iproto_msg *
+tx_accept_msg(struct cmsg *m)
+{
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_wpos(msg->connection, &msg->wpos);
return msg;
}
@@ -1179,7 +1234,7 @@ tx_process1(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
- tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_fiber_init(msg->connection->session, &msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1213,7 +1268,7 @@ tx_process_select(struct cmsg *m)
int rc;
struct request *req = &msg->dml;
- tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_fiber_init(msg->connection->session, &msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1263,7 +1318,7 @@ tx_process_call(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
- tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_fiber_init(msg->connection->session, &msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1347,7 +1402,7 @@ tx_process_misc(struct cmsg *m)
struct iproto_connection *con = msg->connection;
struct obuf *out = con->tx.p_obuf;
- tx_fiber_init(con->session, msg->header.sync);
+ tx_fiber_init(con->session, &msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1386,7 +1441,7 @@ tx_process_join_subscribe(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
- tx_fiber_init(con->session, msg->header.sync);
+ tx_fiber_init(con->session, &msg->header.sync);
try {
switch (msg->header.type) {
@@ -1503,7 +1558,7 @@ tx_process_connect(struct cmsg *m)
if (con->session == NULL)
diag_raise();
con->session->meta.conn = con;
- tx_fiber_init(con->session, 0);
+ tx_fiber_init(con->session, NULL);
static __thread char greeting[IPROTO_GREETING_SIZE];
/* TODO: dirty read from tx thread */
struct tt_uuid uuid = INSTANCE_UUID;
@@ -1653,6 +1708,98 @@ iproto_session_sync(struct session *session)
return session->meta.sync;
}
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param conn IProto connection.
+ */
+static void
+tx_begin_push(struct iproto_connection *conn);
+
+/**
+ * Create an event to send push.
+ * @param m IProto push message.
+ */
+static void
+net_push_msg(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_connection *conn =
+ container_of(msg, struct iproto_connection, push_msg);
+ conn->wend = msg->wpos;
+ msg->wpos = conn->wpos;
+ if (evio_has_fd(&conn->output) && !ev_is_active(&conn->output))
+ ev_feed_event(conn->loop, &conn->output, EV_WRITE);
+}
+
+/**
+ * After a message notifies IProto thread about pushed data, TX
+ * thread can already have a new push in one of obufs. This
+ * function checks for new pushes and possibly re-sends push
+ * notification to IProto thread.
+ */
+static void
+tx_check_for_new_push(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_connection *conn =
+ container_of(msg, struct iproto_connection, push_msg);
+ tx_accept_wpos(conn, &msg->wpos);
+ conn->is_push_in_progress = false;
+ if (conn->has_new_pushes)
+ tx_begin_push(conn);
+}
+
+static const struct cmsg_hop push_route[] = {
+ { net_push_msg, &tx_pipe },
+ { tx_check_for_new_push, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_connection *conn)
+{
+ assert(! conn->is_push_in_progress);
+ cmsg_init((struct cmsg *) &conn->push_msg, push_route);
+ iproto_wpos_create(&conn->push_msg.wpos, conn->tx.p_obuf);
+ conn->has_new_pushes = false;
+ conn->is_push_in_progress = true;
+ cpipe_push(&net_pipe, (struct cmsg *) &conn->push_msg);
+}
+
+/**
+ * Push a message from @a port to a remote client.
+ * @param session IProto session.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval 0 Success, a message is wrote to an output buffer. But
+ * it is not guaranteed, that it will be sent
+ * successfully.
+ */
+static int
+iproto_session_push(struct session *session, struct port *port)
+{
+ struct iproto_connection *conn =
+ (struct iproto_connection *) session->meta.conn;
+ struct obuf_svp svp;
+ if (iproto_prepare_select(conn->tx.p_obuf, &svp) != 0)
+ return -1;
+ if (port_dump_msgpack(port, conn->tx.p_obuf) != 0) {
+ obuf_rollback_to_svp(conn->tx.p_obuf, &svp);
+ return -1;
+ }
+ iproto_reply_chunk(conn->tx.p_obuf, &svp, fiber_sync(fiber()),
+ ::schema_version);
+ if (! conn->is_push_in_progress)
+ tx_begin_push(conn);
+ else
+ conn->has_new_pushes = true;
+ return 0;
+}
+
+/** }}} */
+
/** Initialize the iproto subsystem and start network io thread */
void
iproto_init()
@@ -1666,7 +1813,7 @@ iproto_init()
cpipe_create(&net_pipe, "net");
cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
struct session_vtab iproto_session_vtab = {
- /* .push = */ generic_session_push,
+ /* .push = */ iproto_session_push,
/* .fd = */ iproto_session_fd,
/* .sync = */ iproto_session_sync,
};
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 92534ef6e..67514ca03 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -163,6 +163,9 @@ enum iproto_type {
/** Vinyl row index stored in .run file */
VY_RUN_ROW_INDEX = 102,
+ /** Non-final response type. */
+ IPROTO_CHUNK = 128,
+
/**
* Error codes = (IPROTO_TYPE_ERROR | ER_XXX from errcode.h)
*/
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 4ed2b375d..814358a2e 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -40,6 +40,8 @@ local IPROTO_SCHEMA_VERSION_KEY = 0x05
local IPROTO_DATA_KEY = 0x30
local IPROTO_ERROR_KEY = 0x31
local IPROTO_GREETING_SIZE = 128
+local IPROTO_CHUNK_KEY = 128
+local IPROTO_OK_KEY = 0
-- select errors from box.error
local E_UNKNOWN = box.error.UNKNOWN
@@ -267,7 +269,8 @@ local function create_transport(host, port, user, password, callback,
end
-- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ local function perform_request(timeout, buffer, method, on_push,
+ schema_version, ...)
if state ~= 'active' then
return last_errno or E_NO_CONNECTION, last_error
end
@@ -280,11 +283,12 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_codec[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
+ local request = table_new(0, 7) -- reserve space for 7 keys
request.client = fiber_self()
request.method = method
request.schema_version = schema_version
request.buffer = buffer
+ request.on_push = on_push
requests[id] = request
repeat
local timeout = max(0, deadline - fiber_clock())
@@ -308,12 +312,12 @@ local function create_transport(host, port, user, password, callback,
if request == nil then -- nobody is waiting for the response
return
end
- requests[id] = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
- if status ~= 0 then
+ if status > IPROTO_CHUNK_KEY then
-- Handle errors
+ requests[id] = nil
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -328,16 +332,27 @@ local function create_transport(host, port, user, password, callback,
local body_len = body_end - body_rpos
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
- request.response = tonumber(body_len)
- wakeup_client(request.client)
- return
+ if status == IPROTO_OK_KEY then
+ request.response = tonumber(body_len)
+ requests[id] = nil
+ wakeup_client(request.client)
+ elseif request.on_push then
+ assert(status == IPROTO_CHUNK_KEY)
+ request.on_push(tonumber(body_len))
+ end
+ else
+ -- Decode xrow.body[DATA] to Lua objects
+ body, body_end_check = decode(body_rpos)
+ assert(body_end == body_end_check, "invalid xrow length")
+ if status == IPROTO_OK_KEY then
+ request.response = body[IPROTO_DATA_KEY]
+ requests[id] = nil
+ wakeup_client(request.client)
+ elseif request.on_push then
+ assert(status == IPROTO_CHUNK_KEY)
+ request.on_push(body[IPROTO_DATA_KEY][1])
+ end
end
-
- -- Decode xrow.body[DATA] to Lua objects
- body, body_end_check = decode(body_rpos)
- assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
- wakeup_client(request.client)
end
local function new_request_id()
@@ -834,7 +849,8 @@ function remote_methods:_request(method, opts, ...)
timeout = deadline and max(0, deadline - fiber_clock())
end
err, res = perform_request(timeout, buffer, method,
- self.schema_version, ...)
+ opts and opts.on_push, self.schema_version,
+ ...)
if not err and buffer ~= nil then
return res -- the length of xrow.body
elseif not err then
@@ -864,7 +880,7 @@ function remote_methods:ping(opts)
timeout = deadline and max(0, deadline - fiber_clock())
or (opts and opts.timeout)
end
- local err = self._transport.perform_request(timeout, nil, 'ping',
+ local err = self._transport.perform_request(timeout, nil, 'ping', nil,
self.schema_version)
return not err or err == E_WRONG_SCHEMA_VERSION
end
@@ -1045,10 +1061,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+ err, res = pr(timeout, nil, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+ err, res = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 306271809..c3db93627 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
#include "session.h"
#include "lua/utils.h"
#include "lua/trigger.h"
+#include "lua/msgpack.h"
#include <lua.h>
#include <lauxlib.h>
@@ -43,6 +44,7 @@
#include "box/schema.h"
#include "box/port.h"
#include "box/lua/console.h"
+#include "small/obuf.h"
static const char *sessionlib_name = "box.session";
@@ -371,8 +373,11 @@ struct lua_push_port {
static const char *
lua_push_port_dump_plain(struct port *port, uint32_t *size);
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf);
+
static const struct port_vtab lua_push_port_vtab = {
- .dump_msgpack = NULL,
+ .dump_msgpack = lua_push_port_dump_msgpack,
/*
* Dump_16 has no sense, since push appears since 1.10
* protocol.
@@ -403,6 +408,32 @@ lua_push_port_dump_plain(struct port *port, uint32_t *size)
return result;
}
+static void
+obuf_error_cb(void *ctx)
+{
+ *((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf)
+{
+ struct lua_push_port *lua_port = (struct lua_push_port *) port;
+ assert(lua_port->vtab == &lua_push_port_vtab);
+ struct mpstream stream;
+ int rc = 0;
+ /*
+ * Do not use luamp_error to allow a caller to clear the
+ * obuf, if it already has allocated something (for
+ * example, iproto push reserves memory for a header).
+ */
+ mpstream_init(&stream, obuf, obuf_reserve_cb, obuf_alloc_cb,
+ obuf_error_cb, &rc);
+ luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+ if (rc == 0)
+ mpstream_flush(&stream);
+ return rc;
+}
+
/**
* Push a message using a protocol, depending on a session type.
* @param data Data to push, first argument on a stack.
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f48525645..adb52deeb 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -373,6 +373,19 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
}
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version)
+{
+ char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+ iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version,
+ obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+ struct iproto_body_bin body = iproto_body_bin;
+ body.v_data_len = mp_bswap_u32(1);
+
+ memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
+}
+
int
xrow_decode_dml(struct xrow_header *row, struct request *request,
uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index d407d151b..7fe1debbf 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -390,6 +390,18 @@ int
iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
uint32_t schema_version);
+/**
+ * Write an IPROTO_CHUNK header from a specified position in a
+ * buffer.
+ * @param buf Buffer to write to.
+ * @param svp Position to write from.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ */
+void
+iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version);
+
/** Write error directly to a socket. */
void
iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
diff --git a/src/fiber.h b/src/fiber.h
index 8231bba24..eb89c48cd 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -105,8 +105,13 @@ enum fiber_key {
/** User global privilege and authentication token */
FIBER_KEY_USER = 3,
FIBER_KEY_MSG = 4,
- /** Storage for lua stack */
+ /**
+ * The storage cell number 5 is shared between lua stack
+ * for fibers created from Lua, and IProto sync for fibers
+ * created to execute a binary request.
+ */
FIBER_KEY_LUA_STACK = 5,
+ FIBER_KEY_SYNC = 5,
FIBER_KEY_MAX = 6
};
@@ -610,6 +615,13 @@ fiber_get_key(struct fiber *fiber, enum fiber_key key)
return fiber->fls[key];
}
+static inline uint64_t
+fiber_sync(struct fiber *fiber)
+{
+ uint64_t *sync = (uint64_t *) fiber_get_key(fiber, FIBER_KEY_SYNC);
+ return sync != NULL ? *sync : 0;
+}
+
/**
* Finalizer callback
* \sa fiber_key_on_gc()
diff --git a/test/box/net.box.result b/test/box/net.box.result
index cf7b27f0b..93561dc64 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -2363,7 +2363,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
---
...
c.state
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 576b5cfea..b05e1f0be 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -965,7 +965,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
+_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
diff --git a/test/box/push.result b/test/box/push.result
index 816f06e00..91e2981ed 100644
--- a/test/box/push.result
+++ b/test/box/push.result
@@ -1,5 +1,8 @@
+test_run = require('test_run').new()
+---
+...
--
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
--
--
-- Usage.
@@ -12,18 +15,34 @@ box.session.push(1, 2)
---
- error: 'Usage: box.session.push(data)'
...
-ok = nil
+fiber = require('fiber')
---
...
-err = nil
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
---
+- true
...
-function do_push() ok, err = box.session.push(1) end
+function on_push(message)
+ table.insert(messages, message)
+end;
---
...
---
--- Test binary protocol.
---
+function do_pushes()
+ for i = 1, 5 do
+ box.session.push(i)
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
netbox = require('net.box')
---
...
@@ -37,27 +56,213 @@ c:ping()
---
- true
...
-c:call('do_push')
+c:call('do_pushes', {}, {on_push = on_push})
---
+- 300
...
-ok, err
+messages
+---
+- - 1
+ - 2
+ - 3
+ - 4
+ - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+ box.session.push('started dml')
+ s:replace{key}
+ box.session.push('continued dml')
+ s:replace{-key}
+ box.session.push('finished dml')
+ return key
+end;
+---
+...
+function do_pushes(val)
+ for i = 1, 5 do
+ box.session.push(i)
+ fiber.yield()
+ end
+ return val
+end;
+---
+...
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3, 'dml sends 3 messages')
+ assert(c.messages[1] == 'started dml', 'started')
+ assert(c.messages[2] == 'continued dml', 'continued')
+ assert(c.messages[3] == 'finished dml', 'finished')
+ assert(s:get{c.retval}, 'inserted +')
+ assert(s:get{-c.retval}, 'inserted -')
+ else
+ assert(c.retval, 'something is returned')
+ assert(#c.messages == 5, 'non-dml sends 5 messages')
+ for k, v in pairs(c.messages) do
+ assert(k == v, 'with equal keys and values')
+ end
+ end
+end;
---
-- null
-- Session 'binary' does not support push()
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+---
+...
+messages = {}
+---
+...
+c:call('push_null', {}, {on_push = on_push})
+---
+...
+messages
+---
+- - null
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 8
+ - 8
+ - 8
+ - 8
+ - 8
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {48: [1]}
+ - {48: [2]}
+ - {48: [3]}
+ - {48: [4]}
+ - {48: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
...
c:close()
---
...
+s:drop()
+---
+...
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
---
...
--
-- Ensure can not push in background.
--
-fiber = require('fiber')
+ok = nil
+---
+...
+err = nil
---
...
-f = fiber.create(do_push)
+f = fiber.create(function() ok, err = box.session.push(100) end)
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
index a59fe0a4c..e5bd3287f 100644
--- a/test/box/push.test.lua
+++ b/test/box/push.test.lua
@@ -1,5 +1,6 @@
+test_run = require('test_run').new()
--
--- gh-2677: box.session.push.
+-- gh-2677: box.session.push binary protocol tests.
--
--
@@ -8,28 +9,139 @@
box.session.push()
box.session.push(1, 2)
-ok = nil
-err = nil
-function do_push() ok, err = box.session.push(1) end
+fiber = require('fiber')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function on_push(message)
+ table.insert(messages, message)
+end;
+
+function do_pushes()
+ for i = 1, 5 do
+ box.session.push(i)
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+test_run:cmd("setopt delimiter ''");
---
--- Test binary protocol.
---
netbox = require('net.box')
box.schema.user.grant('guest', 'read,write,execute', 'universe')
c = netbox.connect(box.cfg.listen)
c:ping()
-c:call('do_push')
-ok, err
+c:call('do_pushes', {}, {on_push = on_push})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+ box.session.push('started dml')
+ s:replace{key}
+ box.session.push('continued dml')
+ s:replace{-key}
+ box.session.push('finished dml')
+ return key
+end;
+function do_pushes(val)
+ for i = 1, 5 do
+ box.session.push(i)
+ fiber.yield()
+ end
+ return val
+end;
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3, 'dml sends 3 messages')
+ assert(c.messages[1] == 'started dml', 'started')
+ assert(c.messages[2] == 'continued dml', 'continued')
+ assert(c.messages[3] == 'finished dml', 'finished')
+ assert(s:get{c.retval}, 'inserted +')
+ assert(s:get{-c.retval}, 'inserted -')
+ else
+ assert(c.retval, 'something is returned')
+ assert(#c.messages == 5, 'non-dml sends 5 messages')
+ for k, v in pairs(c.messages) do
+ assert(k == v, 'with equal keys and values')
+ end
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Ok to push NULL.
+--
+function push_null() box.session.push(box.NULL) end
+messages = {}
+c:call('push_null', {}, {on_push = on_push})
+messages
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
c:close()
+s:drop()
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
--
-- Ensure can not push in background.
--
-fiber = require('fiber')
-f = fiber.create(do_push)
+ok = nil
+err = nil
+f = fiber.create(function() ok, err = box.session.push(100) end)
while f:status() ~= 'dead' do fiber.sleep(0.01) end
ok, err
--
2.15.1 (Apple Git-101)
More information about the Tarantool-patches
mailing list