From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 5B5B821DE0 for ; Fri, 20 Apr 2018 09:24:38 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id AyoeD6QXb3M4 for ; Fri, 20 Apr 2018 09:24:38 -0400 (EDT) Received: from smtp38.i.mail.ru (smtp38.i.mail.ru [94.100.177.98]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 4378A2190A for ; Fri, 20 Apr 2018 09:24:36 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v2 10/10] session: introduce binary box.session.push Date: Fri, 20 Apr 2018 16:24:26 +0300 Message-Id: <2e8899477ec65522992dd33d246e21c92ebbe246.1524228894.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com Box.session.push() allows to send a message to a client with no finishing a main request. Tarantool after this patch supports pushes over binary protocol. IProto message is encoded using a new header code - IPROTO_CHUNK. TX thread to notify IProto thread about new data in obuf sends a message 'push_msg'. IProto thread, got this message, notifies libev about new data, and then sends 'push_msg' back with updated write position. TX thread, received the message back, updates its version of a write position. If IProto would not send a write position, then TX would write to the same obuf again and again, because it can not know that IProto already flushed another obuf. To avoid multiple 'push_msg' in fly between IProto and TX, the only one 'push_msg' per connection is used. To deliver pushes, appeared when 'push_msg' was in fly, TX thread sets a flag every time when sees, that 'push_msg' is sent, and there is a new push. When 'push_msg' returns, it checks this flag, and if it is set, the IProto is notified again. Closes #2677 --- src/box/iproto.cc | 221 +++++++++++++++++++++++++++++++++++-------- src/box/iproto_constants.h | 3 + src/box/lua/net_box.lua | 50 ++++++---- src/box/lua/session.c | 33 ++++++- src/box/xrow.c | 13 +++ src/box/xrow.h | 12 +++ src/fiber.h | 14 ++- test/box/net.box.result | 4 +- test/box/net.box.test.lua | 4 +- test/box/push.result | 231 ++++++++++++++++++++++++++++++++++++++++++--- test/box/push.test.lua | 134 +++++++++++++++++++++++--- 11 files changed, 635 insertions(+), 84 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 38baf1b8d..78a551039 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -67,6 +67,41 @@ enum { IPROTO_MSG_MAX = 768 }; enum { IPROTO_SALT_SIZE = 32 }; +/** + * This structure represents a position in the output. + * Since we use rotating buffers to recycle memory, + * it includes not only a position in obuf, but also + * a pointer to obuf the position is for. + */ +struct iproto_wpos { + struct obuf *obuf; + struct obuf_svp svp; +}; + +static void +iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) +{ + wpos->obuf = out; + wpos->svp = obuf_create_svp(out); +} + +/** + * Message to notify IProto thread about new data in an output + * buffer. Struct iproto_msg is not used here, because push + * notification can be much more compact: it does not have + * request, ibuf, length, flags ... + */ +struct iproto_push_msg { + struct cmsg base; + /** + * Before sending to IProto thread, the wpos is set to a + * current position in an output buffer. Before IProto + * returns the message to TX, it sets wpos to the last + * flushed position (works like iproto_msg.wpos). + */ + struct iproto_wpos wpos; +}; + /** * Network readahead. A signed integer to avoid * automatic type coercion to an unsigned type. @@ -111,24 +146,6 @@ iproto_reset_input(struct ibuf *ibuf) } } -/** - * This structure represents a position in the output. - * Since we use rotating buffers to recycle memory, - * it includes not only a position in obuf, but also - * a pointer to obuf the position is for. - */ -struct iproto_wpos { - struct obuf *obuf; - struct obuf_svp svp; -}; - -static void -iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) -{ - wpos->obuf = out; - wpos->svp = obuf_create_svp(out); -} - /* {{{ iproto_msg - declaration */ /** @@ -367,6 +384,37 @@ struct iproto_connection /** Pointer to the current output buffer. */ struct obuf *p_obuf; } tx; + /** + * Is_push_in_progress is set, when a push_msg is sent to + * IProto thread, and reset, when the message is returned + * to TX. If a new push sees, that a push_msg is already + * sent to IProto, then has_new_pushes is set. After push + * notification is returned to TX, it checks + * has_new_pushes. If it is set, then the notification is + * sent again. This ping-pong continues, until TX stopped + * pushing. It allows to + * 1) avoid multiple push_msg from one session in fly, + * 2) do not block push() until a previous push() is + * finished. + * + * IProto TX + * ------------------------------------------------------- + * + [push message] + * start socket <--- notification ---- + * write + * + [push message] + * + [push message] + * ... + * end socket + * write ----------------> check for new + * pushes - found + * <--- notification --- + * .... + */ + bool has_new_pushes; + bool is_push_in_progress; + /** Push notification for IProto thread. */ + struct iproto_push_msg push_msg; /** Authentication salt. */ char salt[IPROTO_SALT_SIZE]; }; @@ -806,6 +854,7 @@ iproto_connection_new(int fd) { struct iproto_connection *con = (struct iproto_connection *) mempool_alloc_xc(&iproto_connection_pool); + memset(con, 0, sizeof(*con)); con->input.data = con->output.data = con; con->loop = loop(); ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ); @@ -818,9 +867,6 @@ iproto_connection_new(int fd) con->tx.p_obuf = &con->obuf[0]; iproto_wpos_create(&con->wpos, con->tx.p_obuf); iproto_wpos_create(&con->wend, con->tx.p_obuf); - con->parse_size = 0; - con->long_poll_requests = 0; - con->session = NULL; rlist_create(&con->in_stop_list); /* It may be very awkward to allocate at close. */ con->disconnect = iproto_msg_new(con); @@ -1010,9 +1056,9 @@ error: } static void -tx_fiber_init(struct session *session, uint64_t sync) +tx_fiber_init(struct session *session, uint64_t *sync) { - session->meta.sync = sync; + session->meta.sync = sync != NULL ? *sync : 0; /* * We do not cleanup fiber keys at the end of each request. * This does not lead to privilege escalation as long as @@ -1025,6 +1071,7 @@ tx_fiber_init(struct session *session, uint64_t sync) */ fiber_set_session(fiber(), session); fiber_set_user(fiber(), &session->credentials); + fiber_set_key(fiber(), FIBER_KEY_SYNC, (void *) sync); } /** @@ -1038,7 +1085,7 @@ tx_process_disconnect(struct cmsg *m) struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; if (con->session) { - tx_fiber_init(con->session, 0); + tx_fiber_init(con->session, NULL); if (! rlist_empty(&session_on_disconnect)) session_run_on_disconnect_triggers(con->session); session_destroy(con->session); @@ -1108,15 +1155,16 @@ tx_discard_input(struct iproto_msg *msg) * not, the empty buffer is selected. * - if neither of the buffers are empty, the function * does not rotate the buffer. + * + * @param con IProto connection. + * @param wpos Last flushed write position, received from IProto + * thread. */ -static struct iproto_msg * -tx_accept_msg(struct cmsg *m) +static void +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) { - struct iproto_msg *msg = (struct iproto_msg *) m; - struct iproto_connection *con = msg->connection; - struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf]; - if (msg->wpos.obuf == con->tx.p_obuf) { + if (wpos->obuf == con->tx.p_obuf) { /* * We got a message advancing the buffer which * is being appended to. The previous buffer is @@ -1134,6 +1182,13 @@ tx_accept_msg(struct cmsg *m) */ con->tx.p_obuf = prev; } +} + +static inline struct iproto_msg * +tx_accept_msg(struct cmsg *m) +{ + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_wpos(msg->connection, &msg->wpos); return msg; } @@ -1179,7 +1234,7 @@ tx_process1(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); - tx_fiber_init(msg->connection->session, msg->header.sync); + tx_fiber_init(msg->connection->session, &msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1213,7 +1268,7 @@ tx_process_select(struct cmsg *m) int rc; struct request *req = &msg->dml; - tx_fiber_init(msg->connection->session, msg->header.sync); + tx_fiber_init(msg->connection->session, &msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1263,7 +1318,7 @@ tx_process_call(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); - tx_fiber_init(msg->connection->session, msg->header.sync); + tx_fiber_init(msg->connection->session, &msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1347,7 +1402,7 @@ tx_process_misc(struct cmsg *m) struct iproto_connection *con = msg->connection; struct obuf *out = con->tx.p_obuf; - tx_fiber_init(con->session, msg->header.sync); + tx_fiber_init(con->session, &msg->header.sync); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1386,7 +1441,7 @@ tx_process_join_subscribe(struct cmsg *m) struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; - tx_fiber_init(con->session, msg->header.sync); + tx_fiber_init(con->session, &msg->header.sync); try { switch (msg->header.type) { @@ -1503,7 +1558,7 @@ tx_process_connect(struct cmsg *m) if (con->session == NULL) diag_raise(); con->session->meta.conn = con; - tx_fiber_init(con->session, 0); + tx_fiber_init(con->session, NULL); static __thread char greeting[IPROTO_GREETING_SIZE]; /* TODO: dirty read from tx thread */ struct tt_uuid uuid = INSTANCE_UUID; @@ -1653,6 +1708,98 @@ iproto_session_sync(struct session *session) return session->meta.sync; } +/** {{{ IPROTO_PUSH implementation. */ + +/** + * Send to IProto thread a notification about new pushes. + * @param conn IProto connection. + */ +static void +tx_begin_push(struct iproto_connection *conn); + +/** + * Create an event to send push. + * @param m IProto push message. + */ +static void +net_push_msg(struct cmsg *m) +{ + struct iproto_push_msg *msg = (struct iproto_push_msg *) m; + struct iproto_connection *conn = + container_of(msg, struct iproto_connection, push_msg); + conn->wend = msg->wpos; + msg->wpos = conn->wpos; + if (evio_has_fd(&conn->output) && !ev_is_active(&conn->output)) + ev_feed_event(conn->loop, &conn->output, EV_WRITE); +} + +/** + * After a message notifies IProto thread about pushed data, TX + * thread can already have a new push in one of obufs. This + * function checks for new pushes and possibly re-sends push + * notification to IProto thread. + */ +static void +tx_check_for_new_push(struct cmsg *m) +{ + struct iproto_push_msg *msg = (struct iproto_push_msg *) m; + struct iproto_connection *conn = + container_of(msg, struct iproto_connection, push_msg); + tx_accept_wpos(conn, &msg->wpos); + conn->is_push_in_progress = false; + if (conn->has_new_pushes) + tx_begin_push(conn); +} + +static const struct cmsg_hop push_route[] = { + { net_push_msg, &tx_pipe }, + { tx_check_for_new_push, NULL } +}; + +static void +tx_begin_push(struct iproto_connection *conn) +{ + assert(! conn->is_push_in_progress); + cmsg_init((struct cmsg *) &conn->push_msg, push_route); + iproto_wpos_create(&conn->push_msg.wpos, conn->tx.p_obuf); + conn->has_new_pushes = false; + conn->is_push_in_progress = true; + cpipe_push(&net_pipe, (struct cmsg *) &conn->push_msg); +} + +/** + * Push a message from @a port to a remote client. + * @param session IProto session. + * @param port Port with data to send. + * + * @retval -1 Memory error. + * @retval 0 Success, a message is wrote to an output buffer. But + * it is not guaranteed, that it will be sent + * successfully. + */ +static int +iproto_session_push(struct session *session, struct port *port) +{ + struct iproto_connection *conn = + (struct iproto_connection *) session->meta.conn; + struct obuf_svp svp; + if (iproto_prepare_select(conn->tx.p_obuf, &svp) != 0) + return -1; + if (port_dump_msgpack(port, conn->tx.p_obuf) != 0) { + obuf_rollback_to_svp(conn->tx.p_obuf, &svp); + return -1; + } + iproto_reply_chunk(conn->tx.p_obuf, &svp, fiber_sync(fiber()), + ::schema_version); + if (! conn->is_push_in_progress) + tx_begin_push(conn); + else + conn->has_new_pushes = true; + return 0; +} + +/** }}} */ + /** Initialize the iproto subsystem and start network io thread */ void iproto_init() @@ -1666,7 +1813,7 @@ iproto_init() cpipe_create(&net_pipe, "net"); cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2); struct session_vtab iproto_session_vtab = { - /* .push = */ generic_session_push, + /* .push = */ iproto_session_push, /* .fd = */ iproto_session_fd, /* .sync = */ iproto_session_sync, }; diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 92534ef6e..67514ca03 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -163,6 +163,9 @@ enum iproto_type { /** Vinyl row index stored in .run file */ VY_RUN_ROW_INDEX = 102, + /** Non-final response type. */ + IPROTO_CHUNK = 128, + /** * Error codes = (IPROTO_TYPE_ERROR | ER_XXX from errcode.h) */ diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 4ed2b375d..814358a2e 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -40,6 +40,8 @@ local IPROTO_SCHEMA_VERSION_KEY = 0x05 local IPROTO_DATA_KEY = 0x30 local IPROTO_ERROR_KEY = 0x31 local IPROTO_GREETING_SIZE = 128 +local IPROTO_CHUNK_KEY = 128 +local IPROTO_OK_KEY = 0 -- select errors from box.error local E_UNKNOWN = box.error.UNKNOWN @@ -267,7 +269,8 @@ local function create_transport(host, port, user, password, callback, end -- REQUEST/RESPONSE -- - local function perform_request(timeout, buffer, method, schema_version, ...) + local function perform_request(timeout, buffer, method, on_push, + schema_version, ...) if state ~= 'active' then return last_errno or E_NO_CONNECTION, last_error end @@ -280,11 +283,12 @@ local function create_transport(host, port, user, password, callback, local id = next_request_id method_codec[method](send_buf, id, schema_version, ...) next_request_id = next_id(id) - local request = table_new(0, 6) -- reserve space for 6 keys + local request = table_new(0, 7) -- reserve space for 7 keys request.client = fiber_self() request.method = method request.schema_version = schema_version request.buffer = buffer + request.on_push = on_push requests[id] = request repeat local timeout = max(0, deadline - fiber_clock()) @@ -308,12 +312,12 @@ local function create_transport(host, port, user, password, callback, if request == nil then -- nobody is waiting for the response return end - requests[id] = nil local status = hdr[IPROTO_STATUS_KEY] local body, body_end_check - if status ~= 0 then + if status > IPROTO_CHUNK_KEY then -- Handle errors + requests[id] = nil body, body_end_check = decode(body_rpos) assert(body_end == body_end_check, "invalid xrow length") request.errno = band(status, IPROTO_ERRNO_MASK) @@ -328,16 +332,27 @@ local function create_transport(host, port, user, password, callback, local body_len = body_end - body_rpos local wpos = buffer:alloc(body_len) ffi.copy(wpos, body_rpos, body_len) - request.response = tonumber(body_len) - wakeup_client(request.client) - return + if status == IPROTO_OK_KEY then + request.response = tonumber(body_len) + requests[id] = nil + wakeup_client(request.client) + elseif request.on_push then + assert(status == IPROTO_CHUNK_KEY) + request.on_push(tonumber(body_len)) + end + else + -- Decode xrow.body[DATA] to Lua objects + body, body_end_check = decode(body_rpos) + assert(body_end == body_end_check, "invalid xrow length") + if status == IPROTO_OK_KEY then + request.response = body[IPROTO_DATA_KEY] + requests[id] = nil + wakeup_client(request.client) + elseif request.on_push then + assert(status == IPROTO_CHUNK_KEY) + request.on_push(body[IPROTO_DATA_KEY][1]) + end end - - -- Decode xrow.body[DATA] to Lua objects - body, body_end_check = decode(body_rpos) - assert(body_end == body_end_check, "invalid xrow length") - request.response = body[IPROTO_DATA_KEY] - wakeup_client(request.client) end local function new_request_id() @@ -834,7 +849,8 @@ function remote_methods:_request(method, opts, ...) timeout = deadline and max(0, deadline - fiber_clock()) end err, res = perform_request(timeout, buffer, method, - self.schema_version, ...) + opts and opts.on_push, self.schema_version, + ...) if not err and buffer ~= nil then return res -- the length of xrow.body elseif not err then @@ -864,7 +880,7 @@ function remote_methods:ping(opts) timeout = deadline and max(0, deadline - fiber_clock()) or (opts and opts.timeout) end - local err = self._transport.perform_request(timeout, nil, 'ping', + local err = self._transport.perform_request(timeout, nil, 'ping', nil, self.schema_version) return not err or err == E_WRONG_SCHEMA_VERSION end @@ -1045,10 +1061,10 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - err, res = pr(timeout, nil, 'eval', nil, loader, {line}) + err, res = pr(timeout, nil, 'eval', nil, nil, loader, {line}) else assert(self.protocol == 'Lua console') - err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n') + err, res = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n') end if err then box.error({code = err, reason = res}) diff --git a/src/box/lua/session.c b/src/box/lua/session.c index 306271809..c3db93627 100644 --- a/src/box/lua/session.c +++ b/src/box/lua/session.c @@ -31,6 +31,7 @@ #include "session.h" #include "lua/utils.h" #include "lua/trigger.h" +#include "lua/msgpack.h" #include #include @@ -43,6 +44,7 @@ #include "box/schema.h" #include "box/port.h" #include "box/lua/console.h" +#include "small/obuf.h" static const char *sessionlib_name = "box.session"; @@ -371,8 +373,11 @@ struct lua_push_port { static const char * lua_push_port_dump_plain(struct port *port, uint32_t *size); +static int +lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf); + static const struct port_vtab lua_push_port_vtab = { - .dump_msgpack = NULL, + .dump_msgpack = lua_push_port_dump_msgpack, /* * Dump_16 has no sense, since push appears since 1.10 * protocol. @@ -403,6 +408,32 @@ lua_push_port_dump_plain(struct port *port, uint32_t *size) return result; } +static void +obuf_error_cb(void *ctx) +{ + *((int *)ctx) = -1; +} + +static int +lua_push_port_dump_msgpack(struct port *port, struct obuf *obuf) +{ + struct lua_push_port *lua_port = (struct lua_push_port *) port; + assert(lua_port->vtab == &lua_push_port_vtab); + struct mpstream stream; + int rc = 0; + /* + * Do not use luamp_error to allow a caller to clear the + * obuf, if it already has allocated something (for + * example, iproto push reserves memory for a header). + */ + mpstream_init(&stream, obuf, obuf_reserve_cb, obuf_alloc_cb, + obuf_error_cb, &rc); + luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1); + if (rc == 0) + mpstream_flush(&stream); + return rc; +} + /** * Push a message using a protocol, depending on a session type. * @param data Data to push, first argument on a stack. diff --git a/src/box/xrow.c b/src/box/xrow.c index f48525645..adb52deeb 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -373,6 +373,19 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body)); } +void +iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t schema_version) +{ + char *pos = (char *) obuf_svp_to_ptr(buf, svp); + iproto_header_encode(pos, IPROTO_CHUNK, sync, schema_version, + obuf_size(buf) - svp->used - IPROTO_HEADER_LEN); + struct iproto_body_bin body = iproto_body_bin; + body.v_data_len = mp_bswap_u32(1); + + memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body)); +} + int xrow_decode_dml(struct xrow_header *row, struct request *request, uint64_t key_map) diff --git a/src/box/xrow.h b/src/box/xrow.h index d407d151b..7fe1debbf 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -390,6 +390,18 @@ int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version); +/** + * Write an IPROTO_CHUNK header from a specified position in a + * buffer. + * @param buf Buffer to write to. + * @param svp Position to write from. + * @param sync Request sync. + * @param schema_version Actual schema version. + */ +void +iproto_reply_chunk(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t schema_version); + /** Write error directly to a socket. */ void iproto_write_error(int fd, const struct error *e, uint32_t schema_version, diff --git a/src/fiber.h b/src/fiber.h index 8231bba24..eb89c48cd 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -105,8 +105,13 @@ enum fiber_key { /** User global privilege and authentication token */ FIBER_KEY_USER = 3, FIBER_KEY_MSG = 4, - /** Storage for lua stack */ + /** + * The storage cell number 5 is shared between lua stack + * for fibers created from Lua, and IProto sync for fibers + * created to execute a binary request. + */ FIBER_KEY_LUA_STACK = 5, + FIBER_KEY_SYNC = 5, FIBER_KEY_MAX = 6 }; @@ -610,6 +615,13 @@ fiber_get_key(struct fiber *fiber, enum fiber_key key) return fiber->fls[key]; } +static inline uint64_t +fiber_sync(struct fiber *fiber) +{ + uint64_t *sync = (uint64_t *) fiber_get_key(fiber, FIBER_KEY_SYNC); + return sync != NULL ? *sync : 0; +} + /** * Finalizer callback * \sa fiber_key_on_gc() diff --git a/test/box/net.box.result b/test/box/net.box.result index cf7b27f0b..93561dc64 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- ... @@ -2363,7 +2363,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') --- ... c.state diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 576b5cfea..b05e1f0be 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); LISTEN = require('uri').parse(box.cfg.listen) @@ -965,7 +965,7 @@ c.space.test:delete{1} -- -- Break a connection to test reconnect_after. -- -_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80') +_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') c.state while not c:is_connected() do fiber.sleep(0.01) end c:ping() diff --git a/test/box/push.result b/test/box/push.result index 816f06e00..91e2981ed 100644 --- a/test/box/push.result +++ b/test/box/push.result @@ -1,5 +1,8 @@ +test_run = require('test_run').new() +--- +... -- --- gh-2677: box.session.push. +-- gh-2677: box.session.push binary protocol tests. -- -- -- Usage. @@ -12,18 +15,34 @@ box.session.push(1, 2) --- - error: 'Usage: box.session.push(data)' ... -ok = nil +fiber = require('fiber') --- ... -err = nil +messages = {} +--- +... +test_run:cmd("setopt delimiter ';'") --- +- true ... -function do_push() ok, err = box.session.push(1) end +function on_push(message) + table.insert(messages, message) +end; --- ... --- --- Test binary protocol. --- +function do_pushes() + for i = 1, 5 do + box.session.push(i) + fiber.sleep(0.01) + end + return 300 +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... netbox = require('net.box') --- ... @@ -37,27 +56,213 @@ c:ping() --- - true ... -c:call('do_push') +c:call('do_pushes', {}, {on_push = on_push}) --- +- 300 ... -ok, err +messages +--- +- - 1 + - 2 + - 3 + - 4 + - 5 +... +-- Add a little stress: many pushes with different syncs, from +-- different fibers and DML/DQL requests. +catchers = {} +--- +... +started = 0 +--- +... +finished = 0 +--- +... +s = box.schema.create_space('test', {format = {{'field1', 'integer'}}}) +--- +... +pk = s:create_index('pk') +--- +... +c:reload_schema() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function dml_push_and_dml(key) + box.session.push('started dml') + s:replace{key} + box.session.push('continued dml') + s:replace{-key} + box.session.push('finished dml') + return key +end; +--- +... +function do_pushes(val) + for i = 1, 5 do + box.session.push(i) + fiber.yield() + end + return val +end; +--- +... +function push_catcher_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = false} + catcher.retval = c:call('do_pushes', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +--- +... +function dml_push_and_dml_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = true} + catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +--- +... +-- At first check that a pushed message can be ignored in a binary +-- protocol too. +c:call('do_pushes', {300}); +--- +- 300 +... +-- Then do stress. +for i = 1, 200 do + fiber.create(dml_push_and_dml_f) + fiber.create(push_catcher_f) +end; +--- +... +while finished ~= 400 do fiber.sleep(0.1) end; +--- +... +for _, c in pairs(catchers) do + if c.is_dml then + assert(#c.messages == 3, 'dml sends 3 messages') + assert(c.messages[1] == 'started dml', 'started') + assert(c.messages[2] == 'continued dml', 'continued') + assert(c.messages[3] == 'finished dml', 'finished') + assert(s:get{c.retval}, 'inserted +') + assert(s:get{-c.retval}, 'inserted -') + else + assert(c.retval, 'something is returned') + assert(#c.messages == 5, 'non-dml sends 5 messages') + for k, v in pairs(c.messages) do + assert(k == v, 'with equal keys and values') + end + end +end; --- -- null -- Session 'binary' does not support push() +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +#s:select{} +--- +- 400 +... +-- +-- Ok to push NULL. +-- +function push_null() box.session.push(box.NULL) end +--- +... +messages = {} +--- +... +c:call('push_null', {}, {on_push = on_push}) +--- +... +messages +--- +- - null +... +-- +-- Test binary pushes. +-- +ibuf = require('buffer').ibuf() +--- +... +msgpack = require('msgpack') +--- +... +messages = {} +--- +... +resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf}) +--- +... +resp_len +--- +- 10 +... +messages +--- +- - 8 + - 8 + - 8 + - 8 + - 8 +... +decoded = {} +--- +... +r = nil +--- +... +for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end +--- +... +decoded +--- +- - {48: [1]} + - {48: [2]} + - {48: [3]} + - {48: [4]} + - {48: [5]} +... +r, _ = msgpack.decode_unchecked(ibuf.rpos) +--- +... +r +--- +- {48: [300]} ... c:close() --- ... +s:drop() +--- +... box.schema.user.revoke('guest', 'read,write,execute', 'universe') --- ... -- -- Ensure can not push in background. -- -fiber = require('fiber') +ok = nil +--- +... +err = nil --- ... -f = fiber.create(do_push) +f = fiber.create(function() ok, err = box.session.push(100) end) --- ... while f:status() ~= 'dead' do fiber.sleep(0.01) end diff --git a/test/box/push.test.lua b/test/box/push.test.lua index a59fe0a4c..e5bd3287f 100644 --- a/test/box/push.test.lua +++ b/test/box/push.test.lua @@ -1,5 +1,6 @@ +test_run = require('test_run').new() -- --- gh-2677: box.session.push. +-- gh-2677: box.session.push binary protocol tests. -- -- @@ -8,28 +9,139 @@ box.session.push() box.session.push(1, 2) -ok = nil -err = nil -function do_push() ok, err = box.session.push(1) end +fiber = require('fiber') +messages = {} +test_run:cmd("setopt delimiter ';'") +function on_push(message) + table.insert(messages, message) +end; + +function do_pushes() + for i = 1, 5 do + box.session.push(i) + fiber.sleep(0.01) + end + return 300 +end; +test_run:cmd("setopt delimiter ''"); --- --- Test binary protocol. --- netbox = require('net.box') box.schema.user.grant('guest', 'read,write,execute', 'universe') c = netbox.connect(box.cfg.listen) c:ping() -c:call('do_push') -ok, err +c:call('do_pushes', {}, {on_push = on_push}) +messages + +-- Add a little stress: many pushes with different syncs, from +-- different fibers and DML/DQL requests. + +catchers = {} +started = 0 +finished = 0 +s = box.schema.create_space('test', {format = {{'field1', 'integer'}}}) +pk = s:create_index('pk') +c:reload_schema() +test_run:cmd("setopt delimiter ';'") +function dml_push_and_dml(key) + box.session.push('started dml') + s:replace{key} + box.session.push('continued dml') + s:replace{-key} + box.session.push('finished dml') + return key +end; +function do_pushes(val) + for i = 1, 5 do + box.session.push(i) + fiber.yield() + end + return val +end; +function push_catcher_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = false} + catcher.retval = c:call('do_pushes', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +function dml_push_and_dml_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = true} + catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +-- At first check that a pushed message can be ignored in a binary +-- protocol too. +c:call('do_pushes', {300}); +-- Then do stress. +for i = 1, 200 do + fiber.create(dml_push_and_dml_f) + fiber.create(push_catcher_f) +end; +while finished ~= 400 do fiber.sleep(0.1) end; + +for _, c in pairs(catchers) do + if c.is_dml then + assert(#c.messages == 3, 'dml sends 3 messages') + assert(c.messages[1] == 'started dml', 'started') + assert(c.messages[2] == 'continued dml', 'continued') + assert(c.messages[3] == 'finished dml', 'finished') + assert(s:get{c.retval}, 'inserted +') + assert(s:get{-c.retval}, 'inserted -') + else + assert(c.retval, 'something is returned') + assert(#c.messages == 5, 'non-dml sends 5 messages') + for k, v in pairs(c.messages) do + assert(k == v, 'with equal keys and values') + end + end +end; +test_run:cmd("setopt delimiter ''"); + +#s:select{} + +-- +-- Ok to push NULL. +-- +function push_null() box.session.push(box.NULL) end +messages = {} +c:call('push_null', {}, {on_push = on_push}) +messages + +-- +-- Test binary pushes. +-- +ibuf = require('buffer').ibuf() +msgpack = require('msgpack') +messages = {} +resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf}) +resp_len +messages +decoded = {} +r = nil +for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end +decoded +r, _ = msgpack.decode_unchecked(ibuf.rpos) +r + c:close() +s:drop() box.schema.user.revoke('guest', 'read,write,execute', 'universe') -- -- Ensure can not push in background. -- -fiber = require('fiber') -f = fiber.create(do_push) +ok = nil +err = nil +f = fiber.create(function() ok, err = box.session.push(100) end) while f:status() ~= 'dead' do fiber.sleep(0.01) end ok, err -- 2.15.1 (Apple Git-101)