From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH 5/5] session: introduce box.session.push Date: Mon, 19 Mar 2018 16:34:52 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com, Vladislav Shpilevoy List-ID: Box.session.push() allows to send a message to a client with no finishing a main request. Tarantool supports two push types: via text protocol and via binary protocol. Text protocol push message contains "push:" prefix followed by a YAML formatted text, where as a final response always has '---' prefix. To catch pushed messages on a client side use console.connect() on_push options which takes a function with a single argument - message. IProto message is encoded just like a regular response, but instead of IPROTO_DATA it has IPROTO_PUSH with a single element - pushed MessagePack encoded data. Text push is trivial - it is just blocking write into a socket with a prefix. Binary push is more complex. TX thread to notify IProto thread about new data in obuf sends a message 'push_msg'. IProto thread, got this message, notifies libev about new data, and then sends 'push_msg' back with updated write position. TX thread, received the message back, updates its version of a write position. If IProto will not send a write position, then TX will write to the same obuf again and again, because it can not know that IProto already flushed another obuf. To avoid multiple 'push_msg' in fly between IProto and TX, the only one 'push_msg' per connection is used. To deliver pushes, appeared when 'push_msg' was in fly, TX thread sets a flag every time when sees, that 'push_msg' is sent, and there is a new push. When 'push_msg' returns, it checks this flag, and if it is set, then the IProto is notified again. Closes #2677 Signed-off-by: Vladislav Shpilevoy --- src/box/iproto.cc | 235 +++++++++++++++++--- src/box/iproto_constants.c | 3 +- src/box/iproto_constants.h | 8 + src/box/lua/call.c | 1 + src/box/lua/console.c | 2 +- src/box/lua/console.h | 8 + src/box/lua/console.lua | 6 +- src/box/lua/net_box.c | 37 ++++ src/box/lua/net_box.lua | 97 ++++++-- src/box/lua/session.c | 171 +++++++++++++++ src/box/port.c | 7 + src/box/port.h | 15 ++ src/box/session.cc | 14 ++ src/box/session.h | 17 ++ src/box/xrow.c | 40 +++- src/box/xrow.h | 26 ++- src/fio.c | 12 + src/fio.h | 16 ++ test/app-tap/console.test.lua | 9 +- test/box/net.box.result | 2 +- test/box/net.box.test.lua | 2 +- test/box/push.result | 364 +++++++++++++++++++++++++++++++ test/box/push.test.lua | 163 ++++++++++++++ test/replication/before_replace.result | 14 ++ test/replication/before_replace.test.lua | 11 + 25 files changed, 1211 insertions(+), 69 deletions(-) create mode 100644 test/box/push.result create mode 100644 test/box/push.test.lua diff --git a/src/box/iproto.cc b/src/box/iproto.cc index ad4b1a757..4408293c1 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -63,6 +63,45 @@ enum { IPROTO_SALT_SIZE = 32 }; +/** + * This structure represents a position in the output. + * Since we use rotating buffers to recycle memory, + * it includes not only a position in obuf, but also + * a pointer to obuf the position is for. + */ +struct iproto_wpos { + struct obuf *obuf; + struct obuf_svp svp; +}; + +static void +iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) +{ + wpos->obuf = out; + wpos->svp = obuf_create_svp(out); +} + +/* {{{ IPROTO_PUSH declarations. */ + +/** + * Message to notify IProto thread about new data in an output + * buffer. Struct iproto_msg is not used here, because push + * notification can be much more compact: it does not have + * request, ibuf, length, flags ... + */ +struct iproto_push_msg { + struct cmsg base; + /** + * Before sending to IProto thread, the wpos is set to a + * current position in an output buffer. Before IProto + * returns the message to TX, it sets wpos to the last + * flushed position (works like iproto_msg.wpos). + */ + struct iproto_wpos wpos; + /** IProto connection to push into. */ + struct iproto_connection *connection; +}; + /** Owner of binary IProto sessions. */ struct iproto_session_owner { struct session_owner base; @@ -70,6 +109,37 @@ struct iproto_session_owner { char salt[IPROTO_SALT_SIZE]; /** IProto connection. */ struct iproto_connection *connection; + /** + * Is_push_in_progress is set, when a push_msg is sent to + * IProto thread, and reset, when the message is returned + * to TX. If a new push sees, that a push_msg is already + * sent to IProto, then has_new_pushes is set. After push + * notification is returned to TX, it checks + * has_new_pushes. If it is set, then the notification is + * sent again. This ping-pong continues, until TX stopped + * pushing. It allows to + * 1) avoid multiple push_msg from one session in fly, + * 2) do not block push() until a previous push() is + * finished. + * + * IProto TX + * ------------------------------------------------------- + * + [push message] + * start socket <--- notification ---- + * write + * + [push message] + * + [push message] + * ... + * end socket + * write ----------------> check for new + * pushes - found + * <--- notification --- + * .... + */ + bool has_new_pushes; + bool is_push_in_progress; + /** Push notification for IProto thread. */ + struct iproto_push_msg push_msg; }; static struct session_owner * @@ -78,10 +148,27 @@ iproto_session_owner_dup(struct session_owner *owner); static int iproto_session_owner_fd(const struct session_owner *owner); +/** + * Push a message from @a port to a remote client. + * @param owner IProto session owner. + * @param sync Message sync. Must be the same as a request sync to + * be able to detect their tie on a client side. + * @param port Port with data to send. + * + * @retval -1 Memory error. + * @retval 0 Success, a message is wrote to an output buffer. But + * it is not guaranteed, that it will be sent + * successfully. + */ +static int +iproto_session_owner_push(struct session_owner *owner, uint64_t sync, + struct port *port); + static const struct session_owner_vtab iproto_session_owner_vtab = { /* .dup = */ iproto_session_owner_dup, /* .delete = */ (void (*)(struct session_owner *)) free, /* .fd = */ iproto_session_owner_fd, + /* .push = */ iproto_session_owner_push, }; static struct session_owner * @@ -105,6 +192,9 @@ iproto_session_owner_create(struct iproto_session_owner *owner, owner->base.type = SESSION_TYPE_BINARY; owner->base.vtab = &iproto_session_owner_vtab; owner->connection = connection; + owner->has_new_pushes = false; + owner->is_push_in_progress = false; + owner->push_msg.connection = connection; random_bytes(owner->salt, IPROTO_SALT_SIZE); } @@ -117,6 +207,8 @@ iproto_session_salt(struct session *session) return session_owner->salt; } +/** }}} */ + /* The number of iproto messages in flight */ enum { IPROTO_MSG_MAX = 768 }; @@ -164,24 +256,6 @@ iproto_reset_input(struct ibuf *ibuf) } } -/** - * This structure represents a position in the output. - * Since we use rotating buffers to recycle memory, - * it includes not only a position in obuf, but also - * a pointer to obuf the position is for. - */ -struct iproto_wpos { - struct obuf *obuf; - struct obuf_svp svp; -}; - -static void -iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out) -{ - wpos->obuf = out; - wpos->svp = obuf_create_svp(out); -} - /* {{{ iproto_msg - declaration */ /** @@ -1168,15 +1242,16 @@ tx_discard_input(struct iproto_msg *msg) * not, the empty buffer is selected. * - if neither of the buffers are empty, the function * does not rotate the buffer. + * + * @param con IProto connection. + * @param wpos Last flushed write position, received from IProto + * thread. */ -static struct iproto_msg * -tx_accept_msg(struct cmsg *m) +static void +tx_accept_msg(struct iproto_connection *con, struct iproto_wpos *wpos) { - struct iproto_msg *msg = (struct iproto_msg *) m; - struct iproto_connection *con = msg->connection; - struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf]; - if (msg->wpos.obuf == con->tx.p_obuf) { + if (wpos->obuf == con->tx.p_obuf) { /* * We got a message advancing the buffer which * is being appended to. The previous buffer is @@ -1194,7 +1269,6 @@ tx_accept_msg(struct cmsg *m) */ con->tx.p_obuf = prev; } - return msg; } /** @@ -1217,7 +1291,8 @@ tx_reply_error(struct iproto_msg *msg) static void tx_reply_iproto_error(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_msg(msg->connection, &msg->wpos); struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&msg->diag), msg->header.sync, ::schema_version); @@ -1227,8 +1302,8 @@ tx_reply_iproto_error(struct cmsg *m) static void tx_process1(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); - struct obuf *out = msg->connection->tx.p_obuf; + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_msg(msg->connection, &msg->wpos); tx_fiber_init(msg->connection->session, msg->header.sync); if (tx_check_schema(msg->header.schema_version)) @@ -1236,8 +1311,11 @@ tx_process1(struct cmsg *m) struct tuple *tuple; struct obuf_svp svp; - if (box_process1(&msg->dml, &tuple) || - iproto_prepare_select(out, &svp)) + struct obuf *out; + if (box_process1(&msg->dml, &tuple) != 0) + goto error; + out = msg->connection->tx.p_obuf; + if (iproto_prepare_select(out, &svp) != 0) goto error; if (tuple && tuple_to_obuf(tuple, out)) goto error; @@ -1252,8 +1330,9 @@ error: static void tx_process_select(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); - struct obuf *out = msg->connection->tx.p_obuf; + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_msg(msg->connection, &msg->wpos); + struct obuf *out; struct obuf_svp svp; struct port port; int count; @@ -1270,6 +1349,7 @@ tx_process_select(struct cmsg *m) req->key, req->key_end, &port); if (rc < 0) goto error; + out = msg->connection->tx.p_obuf; if (iproto_prepare_select(out, &svp) != 0) { port_destroy(&port); goto error; @@ -1305,7 +1385,8 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) static void tx_process_call(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_msg(msg->connection, &msg->wpos); tx_fiber_init(msg->connection->session, msg->header.sync); @@ -1387,7 +1468,8 @@ error: static void tx_process_misc(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg = (struct iproto_msg *) m; + tx_accept_msg(msg->connection, &msg->wpos); struct obuf *out = msg->connection->tx.p_obuf; struct session *session = msg->connection->session; @@ -1428,8 +1510,9 @@ error: static void tx_process_join_subscribe(struct cmsg *m) { - struct iproto_msg *msg = tx_accept_msg(m); + struct iproto_msg *msg = (struct iproto_msg *) m; struct iproto_connection *con = msg->connection; + tx_accept_msg(con, &msg->wpos); tx_fiber_init(con->session, msg->header.sync); @@ -1612,6 +1695,88 @@ static const struct cmsg_hop connect_route[] = { /** }}} */ +/** {{{ IPROTO_PUSH implementation. */ + +/** + * Send to IProto thread a notification about new pushes. + * @param owner IProto session owner. + */ +static void +tx_begin_push(struct iproto_session_owner *owner); + +/** + * Create an event to send push. + * @param m IProto push message. + */ +static void +net_push_msg(struct cmsg *m) +{ + struct iproto_push_msg *msg = (struct iproto_push_msg *) m; + struct iproto_connection *con = msg->connection; + con->wend = msg->wpos; + msg->wpos = con->wpos; + if (evio_has_fd(&con->output) && !ev_is_active(&con->output)) + ev_feed_event(con->loop, &con->output, EV_WRITE); +} + +/** + * After a message notifies IProto thread about pushed data, TX + * thread can already have a new push in one of obufs. This + * function checks for new pushes and possibly re-sends push + * notification to IProto thread. + */ +static void +tx_check_for_new_push(struct cmsg *m) +{ + struct iproto_push_msg *msg = (struct iproto_push_msg *) m; + struct iproto_session_owner *owner = + container_of(msg, struct iproto_session_owner, push_msg); + tx_accept_msg(msg->connection, &msg->wpos); + owner->is_push_in_progress = false; + if (owner->has_new_pushes) + tx_begin_push(owner); +} + +static const struct cmsg_hop push_route[] = { + { net_push_msg, &tx_pipe }, + { tx_check_for_new_push, NULL } +}; + +static void +tx_begin_push(struct iproto_session_owner *owner) +{ + assert(! owner->is_push_in_progress); + cmsg_init((struct cmsg *) &owner->push_msg, push_route); + iproto_wpos_create(&owner->push_msg.wpos, owner->connection->tx.p_obuf); + owner->has_new_pushes = false; + owner->is_push_in_progress = true; + cpipe_push(&net_pipe, (struct cmsg *) &owner->push_msg); +} + +static int +iproto_session_owner_push(struct session_owner *session_owner, uint64_t sync, + struct port *port) +{ + struct iproto_session_owner *owner = + (struct iproto_session_owner *) session_owner; + struct iproto_connection *con = owner->connection; + struct obuf_svp svp; + if (iproto_prepare_push(con->tx.p_obuf, &svp) != 0) + return -1; + if (port_dump(port, con->tx.p_obuf) != 0) { + obuf_rollback_to_svp(con->tx.p_obuf, &svp); + return -1; + } + iproto_reply_push(con->tx.p_obuf, &svp, sync, ::schema_version); + if (! owner->is_push_in_progress) + tx_begin_push(owner); + else + owner->has_new_pushes = true; + return 0; +} + +/** }}} */ + /** * Create a connection and start input. */ diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index cd7b1d03b..893275c7b 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -174,7 +174,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { NULL, /* 0x2e */ NULL, /* 0x2f */ "data", /* 0x30 */ - "error" /* 0x31 */ + "error", /* 0x31 */ + "push", /* 0x32 */ }; const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = { diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 951842485..05d262a0b 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -79,6 +79,14 @@ enum iproto_key { /* Leave a gap between request keys and response keys */ IPROTO_DATA = 0x30, IPROTO_ERROR = 0x31, + /** + * Tarantool supports two push types: binary and text. + * A text push can be distinguished from a response by a + * prefix "push:". + * Binary push is encoded using IPROTO_PUSH key in a + * message body, which replaces IPROTO_DATA. + */ + IPROTO_PUSH = 0x32, IPROTO_KEY_MAX }; diff --git a/src/box/lua/call.c b/src/box/lua/call.c index be13812aa..fc70ed430 100644 --- a/src/box/lua/call.c +++ b/src/box/lua/call.c @@ -418,6 +418,7 @@ port_lua_destroy(struct port *base) static const struct port_vtab port_lua_vtab = { .dump = port_lua_dump, .dump_16 = port_lua_dump_16, + .dump_raw = NULL, .destroy = port_lua_destroy, }; diff --git a/src/box/lua/console.c b/src/box/lua/console.c index 450745c90..3bc4b6425 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -329,7 +329,7 @@ lbox_console_add_history(struct lua_State *L) return 0; } -static int +int lbox_console_format(struct lua_State *L) { int arg_count = lua_gettop(L); diff --git a/src/box/lua/console.h b/src/box/lua/console.h index 208b31490..92a4af035 100644 --- a/src/box/lua/console.h +++ b/src/box/lua/console.h @@ -39,6 +39,14 @@ struct lua_State; void tarantool_lua_console_init(struct lua_State *L); +/** + * Encode Lua object into YAML string. + * @param Lua object to encode on top of a stack. + * @retval Lua string. + */ +int +lbox_console_format(struct lua_State *L); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index b4199ef85..295bfcbaa 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -59,7 +59,7 @@ end -- -- Evaluate command on remote instance -- -local function remote_eval(self, line) +local function remote_eval(self, line, opts) if not line or self.remote.state ~= 'active' then local err = self.remote.error self.remote:close() @@ -74,7 +74,7 @@ local function remote_eval(self, line) -- -- execute line -- - local ok, res = pcall(self.remote.eval, self.remote, line) + local ok, res = pcall(self.remote.eval, self.remote, line, opts) return ok and res or format(false, res) end @@ -310,7 +310,7 @@ local function connect(uri, opts) -- override methods self.remote = remote - self.eval = remote_eval + self.eval = function(s, l) return remote_eval(s, l, {on_push = opts.on_push}) end self.prompt = string.format("%s:%s", self.remote.host, self.remote.port) self.completion = function (str, pos1, pos2) local c = string.format( diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index db2d2dbb4..9fa7935f7 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -554,6 +554,41 @@ handle_error: return 2; } +/** + * Search for a "push:" prefix in a message, received from a + * server using a text protocol. + */ +static int +netbox_text_is_push(struct lua_State *L) +{ + assert(lua_gettop(L) == 2); + uint32_t ctypeid; + const char *text = *(const char **)luaL_checkcdata(L, 1, &ctypeid); + assert(ctypeid == luaL_ctypeid(L, "char *")); + uint32_t len = (uint32_t) lua_tonumber(L, 2); + uint32_t push_len = strlen("push:"); + lua_pushboolean(L, len >= 5 && memcmp(text, "push:", push_len) == 0); + return 1; +} + +/** + * Search for IPROTO_PUSH key in a MessagePack encoded response + * body. It is needed without entire message decoding, when a user + * wants to store raw responses and pushes in its own buffer. + */ +static int +netbox_body_is_push(struct lua_State *L) +{ + uint32_t ctypeid; + const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid); + assert(ctypeid == luaL_ctypeid(L, "char *")); + assert(mp_typeof(*body) == MP_MAP); + lua_pushboolean(L, mp_decode_map(&body) == 1 && + mp_typeof(*body) == MP_UINT && + mp_decode_uint(&body) == IPROTO_PUSH); + return 1; +} + int luaopen_net_box(struct lua_State *L) { @@ -571,6 +606,8 @@ luaopen_net_box(struct lua_State *L) { "encode_auth", netbox_encode_auth }, { "decode_greeting",netbox_decode_greeting }, { "communicate", netbox_communicate }, + { "text_is_push", netbox_text_is_push }, + { "body_is_push", netbox_body_is_push }, { NULL, NULL} }; /* luaL_register_module polutes _G */ diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua index 87c8c548b..7ef6a6a2d 100644 --- a/src/box/lua/net_box.lua +++ b/src/box/lua/net_box.lua @@ -26,6 +26,8 @@ local communicate = internal.communicate local encode_auth = internal.encode_auth local encode_select = internal.encode_select local decode_greeting = internal.decode_greeting +local text_is_push = internal.text_is_push +local body_is_push = internal.body_is_push local sequence_mt = { __serialize = 'sequence' } local TIMEOUT_INFINITY = 500 * 365 * 86400 @@ -38,6 +40,7 @@ local IPROTO_SYNC_KEY = 0x01 local IPROTO_SCHEMA_VERSION_KEY = 0x05 local IPROTO_DATA_KEY = 0x30 local IPROTO_ERROR_KEY = 0x31 +local IPROTO_PUSH_KEY = 0x32 local IPROTO_GREETING_SIZE = 128 -- select errors from box.error @@ -229,7 +232,8 @@ local function create_transport(host, port, user, password, callback) end -- REQUEST/RESPONSE -- - local function perform_request(timeout, buffer, method, schema_version, ...) + local function perform_request(timeout, buffer, method, on_push, + schema_version, ...) if state ~= 'active' then return last_errno or E_NO_CONNECTION, last_error end @@ -242,7 +246,7 @@ local function create_transport(host, port, user, password, callback) local id = next_request_id method_codec[method](send_buf, id, schema_version, ...) next_request_id = next_id(id) - local request = table_new(0, 6) -- reserve space for 6 keys + local request = table_new(0, 7) -- reserve space for 7 keys request.client = fiber_self() request.method = method request.schema_version = schema_version @@ -254,6 +258,17 @@ local function create_transport(host, port, user, password, callback) requests[id] = nil return E_TIMEOUT, 'Timeout exceeded' end + if request.messages then + -- Multiple push messages can appear on a single + -- event loop iteration. + local messages = request.messages + request.messages = nil + if on_push then + for _, m in pairs(messages) do + on_push(m) + end + end + end until requests[id] == nil -- i.e. completed (beware spurious wakeups) return request.errno, request.response end @@ -270,12 +285,12 @@ local function create_transport(host, port, user, password, callback) if request == nil then -- nobody is waiting for the response return end - requests[id] = nil local status = hdr[IPROTO_STATUS_KEY] local body, body_end_check if status ~= 0 then -- Handle errors + requests[id] = nil body, body_end_check = decode(body_rpos) assert(body_end == body_end_check, "invalid xrow length") request.errno = band(status, IPROTO_ERRNO_MASK) @@ -290,7 +305,16 @@ local function create_transport(host, port, user, password, callback) local body_len = body_end - body_rpos local wpos = buffer:alloc(body_len) ffi.copy(wpos, body_rpos, body_len) - request.response = tonumber(body_len) + if body_is_push(body_rpos) then + if request.messages then + table.insert(request.messages, tonumber(body_len)) + else + request.messages = {tonumber(body_len)} + end + else + requests[id] = nil + request.response = tonumber(body_len) + end wakeup_client(request.client) return end @@ -298,7 +322,18 @@ local function create_transport(host, port, user, password, callback) -- Decode xrow.body[DATA] to Lua objects body, body_end_check = decode(body_rpos) assert(body_end == body_end_check, "invalid xrow length") - request.response = body[IPROTO_DATA_KEY] + if body[IPROTO_PUSH_KEY] then + assert(#body[IPROTO_PUSH_KEY] == 1) + assert(not body[IPROTO_DATA_KEY]) + if request.messages then + table.insert(request.messages, body[IPROTO_PUSH_KEY][1]) + else + request.messages = {body[IPROTO_PUSH_KEY][1]} + end + else + requests[id] = nil + request.response = body[IPROTO_DATA_KEY] + end wakeup_client(request.client) end @@ -345,9 +380,16 @@ local function create_transport(host, port, user, password, callback) if err then return err, delim_pos else - local response = ffi.string(recv_buf.rpos, delim_pos + #delim) + local response + local is_push = text_is_push(recv_buf.rpos, delim_pos + #delim) + if not is_push then + response = ffi.string(recv_buf.rpos, delim_pos + #delim) + else + -- 5 - len of 'push:' prefix of a message. + response = ffi.string(recv_buf.rpos + 5, delim_pos + #delim - 5) + end recv_buf.rpos = recv_buf.rpos + delim_pos + #delim - return nil, response + return nil, response, is_push end end @@ -408,18 +450,26 @@ local function create_transport(host, port, user, password, callback) console_sm = function(rid) local delim = '\n...\n' - local err, response = send_and_recv_console() + local err, response, is_push = send_and_recv_console() if err then return error_sm(err, response) else local request = requests[rid] - if request == nil then -- nobody is waiting for the response - return + if request then + if is_push then + -- In a console mode it is impossible to + -- get multiple pushes on a single event loop + -- iteration. + assert(not request.messages) + request.messages = {response} + else + requests[rid] = nil + request.response = response + rid = next_id(rid) + end + wakeup_client(request.client) + return console_sm(rid) end - requests[rid] = nil - request.response = response - wakeup_client(request.client) - return console_sm(next_id(rid)) end end @@ -761,7 +811,8 @@ function remote_methods:_request(method, opts, ...) timeout = deadline and max(0, deadline - fiber_clock()) end err, res = perform_request(timeout, buffer, method, - self.schema_version, ...) + opts and opts.on_push, self.schema_version, + ...) if not err and buffer ~= nil then return res -- the length of xrow.body elseif not err then @@ -791,7 +842,7 @@ function remote_methods:ping(opts) timeout = deadline and max(0, deadline - fiber_clock()) or (opts and opts.timeout) end - local err = self._transport.perform_request(timeout, nil, 'ping', + local err = self._transport.perform_request(timeout, nil, 'ping', nil, self.schema_version) return not err or err == E_WRONG_SCHEMA_VERSION end @@ -956,8 +1007,16 @@ console_methods.on_disconnect = remote_methods.on_disconnect console_methods.on_connect = remote_methods.on_connect console_methods.is_connected = remote_methods.is_connected console_methods.wait_state = remote_methods.wait_state -function console_methods:eval(line, timeout) +function console_methods:eval(line, opts) check_remote_arg(self, 'eval') + local timeout + local on_push + if type(opts) == 'table' then + timeout = opts.timeout or TIMEOUT_INFINITY + on_push = opts.on_push + else + timeout = opts + end local err, res local transport = self._transport local pr = transport.perform_request @@ -968,10 +1027,10 @@ function console_methods:eval(line, timeout) end if self.protocol == 'Binary' then local loader = 'return require("console").eval(...)' - err, res = pr(timeout, nil, 'eval', nil, loader, {line}) + err, res = pr(timeout, nil, 'eval', on_push, nil, loader, {line}) else assert(self.protocol == 'Lua console') - err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n') + err, res = pr(timeout, nil, 'inject', on_push, nil, line..'$EOF$\n') end if err then box.error({code = err, reason = res}) diff --git a/src/box/lua/session.c b/src/box/lua/session.c index af8411068..73cc9da8f 100644 --- a/src/box/lua/session.c +++ b/src/box/lua/session.c @@ -31,6 +31,7 @@ #include "session.h" #include "lua/utils.h" #include "lua/trigger.h" +#include "lua/msgpack.h" #include #include @@ -41,6 +42,10 @@ #include "box/session.h" #include "box/user.h" #include "box/schema.h" +#include "box/port.h" +#include "box/lua/console.h" +#include "fio.h" +#include "small/obuf.h" /** Owner of a console session. */ struct console_session_owner { @@ -55,10 +60,26 @@ console_session_owner_dup(struct session_owner *owner); static int console_session_owner_fd(const struct session_owner *owner); +/** + * Send "push:" prefix + message in a blocking mode, with no + * yields, to a console socket. + * @param owner Console session owner. + * @param sync Sync. It is unused since a text protocol has no + * syncs. + * @param port Port with text to dump. + * + * @retval -1 Memory or IO error. + * @retval 0 Success. + */ +static int +console_session_owner_push(struct session_owner *owner, uint64_t sync, + struct port *port); + static const struct session_owner_vtab console_session_owner_vtab = { /* .dup = */ console_session_owner_dup, /* .delete = */ (void (*)(struct session_owner *)) free, /* .fd = */ console_session_owner_fd, + /* .push = */ console_session_owner_push, }; static struct session_owner * @@ -90,6 +111,45 @@ console_session_owner_create(struct console_session_owner *owner, int fd) owner->fd = fd; } +/** + * Write @a text into @a fd in a blocking mode, ignoring transient + * socket errors. + * @param fd Console descriptor. + * @param text Text to send. + * @param len Length of @a text. + */ +static inline int +console_do_push(int fd, const char *text, uint32_t len) +{ + while (len > 0) { + int written = fio_write_silent(fd, text, len); + if (written < 0) + return -1; + assert((uint32_t) written <= len); + len -= written; + text += written; + } + return 0; +} + +static int +console_session_owner_push(struct session_owner *owner, uint64_t sync, + struct port *port) +{ + /* Console has no sync. */ + (void) sync; + assert(owner->vtab == &console_session_owner_vtab); + int fd = console_session_owner_fd(owner); + uint32_t text_len; + const char *text = port_dump_raw(port, &text_len); + if (text == NULL || + console_do_push(fd, "push:", strlen("push:")) != 0 || + console_do_push(fd, text, text_len) != 0) + return -1; + else + return 0; +} + static const char *sessionlib_name = "box.session"; /* Create session and pin it to fiber */ @@ -418,6 +478,116 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event) return 3; } +/** + * Port to push a message from Lua. + */ +struct lua_push_port { + const struct port_vtab *vtab; + /** + * Lua state, containing data to dump on top of the stack. + */ + struct lua_State *L; +}; + +/** + * Lua push port supports two dump types: usual and raw. Raw dump + * encodes a message as a YAML formatted text, usual dump encodes + * the message as MessagePack right into an output buffer. + */ +static int +lua_push_port_dump_msgpack(struct port *port, struct obuf *out); + +static const char * +lua_push_port_dump_text(struct port *port, uint32_t *size); + +static const struct port_vtab lua_push_port_vtab = { + .dump = lua_push_port_dump_msgpack, + /* + * Dump_16 has no sense, since push appears since 1.10 + * protocol. + */ + .dump_16 = NULL, + .dump_raw = lua_push_port_dump_text, + .destroy = NULL, +}; + +static void +obuf_error_cb(void *ctx) +{ + *((int *)ctx) = -1; +} + +static int +lua_push_port_dump_msgpack(struct port *port, struct obuf *out) +{ + struct lua_push_port *lua_port = (struct lua_push_port *) port; + assert(lua_port->vtab == &lua_push_port_vtab); + struct mpstream stream; + int rc = 0; + /* + * Do not use luamp_error to allow a caller to clear the + * obuf, if it already has allocated something (for + * example, iproto push reserves memory for a header). + */ + mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb, + obuf_error_cb, &rc); + luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1); + if (rc != 0) + return -1; + mpstream_flush(&stream); + return 0; +} + +static const char * +lua_push_port_dump_text(struct port *port, uint32_t *size) +{ + struct lua_push_port *lua_port = (struct lua_push_port *) port; + assert(lua_port->vtab == &lua_push_port_vtab); + lbox_console_format(lua_port->L); + assert(lua_isstring(lua_port->L, -1)); + size_t len; + const char *result = lua_tolstring(lua_port->L, -1, &len); + *size = (uint32_t) len; + return result; +} + +/** + * Push a message using a protocol, depending on a session type. + * @param data Data to push, first argument on a stack. + * @param opts Options. Now requires a single possible option - + * sync. Second argument on a stack. + */ +static int +lbox_session_push(struct lua_State *L) +{ + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) { +usage_error: + return luaL_error(L, "Usage: box.session.push(data, opts)"); + } + lua_getfield(L, 2, "sync"); + if (! lua_isnumber(L, 3)) + goto usage_error; + double lua_sync = lua_tonumber(L, 3); + lua_pop(L, 1); + uint64_t sync = (uint64_t) lua_sync; + if (lua_sync != sync) + goto usage_error; + struct lua_push_port port; + port.vtab = &lua_push_port_vtab; + port.L = L; + /* + * Pop the opts - they must not be pushed. Leave only data + * on a stack. + */ + lua_remove(L, 2); + if (session_push(current_session(), sync, (struct port *) &port) != 0) { + return luaT_error(L); + } else { + lua_pushboolean(L, true); + return 1; + } +} + /** * Sets trigger on_access_denied. * For test purposes only. @@ -489,6 +659,7 @@ box_lua_session_init(struct lua_State *L) {"on_disconnect", lbox_session_on_disconnect}, {"on_auth", lbox_session_on_auth}, {"on_access_denied", lbox_session_on_access_denied}, + {"push", lbox_session_push}, {NULL, NULL} }; luaL_register_module(L, sessionlib_name, sessionlib); diff --git a/src/box/port.c b/src/box/port.c index 03f6be79d..7f3cea5e7 100644 --- a/src/box/port.c +++ b/src/box/port.c @@ -143,6 +143,12 @@ port_dump_16(struct port *port, struct obuf *out) return port->vtab->dump_16(port, out); } +const char * +port_dump_raw(struct port *port, uint32_t *size) +{ + return port->vtab->dump_raw(port, size); +} + void port_init(void) { @@ -159,5 +165,6 @@ port_free(void) const struct port_vtab port_tuple_vtab = { .dump = port_tuple_dump, .dump_16 = port_tuple_dump_16, + .dump_raw = NULL, .destroy = port_tuple_destroy, }; diff --git a/src/box/port.h b/src/box/port.h index 7cf3339b5..d1db74909 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -76,6 +76,11 @@ struct port_vtab { * format. */ int (*dump_16)(struct port *port, struct obuf *out); + /** + * Same as dump, but find a memory for an output buffer + * for itself. + */ + const char *(*dump_raw)(struct port *port, uint32_t *size); /** * Destroy a port and release associated resources. */ @@ -158,6 +163,16 @@ port_dump(struct port *port, struct obuf *out); int port_dump_16(struct port *port, struct obuf *out); +/** + * Same as port_dump(), but find a memory for an output buffer for + * itself. + * @param port Port to dump. + * @param[out] size Size of the data. + * @retval Data. + */ +const char * +port_dump_raw(struct port *port, uint32_t *size); + void port_init(void); diff --git a/src/box/session.cc b/src/box/session.cc index 908ec9c4e..793393fc8 100644 --- a/src/box/session.cc +++ b/src/box/session.cc @@ -63,6 +63,7 @@ static const struct session_owner_vtab generic_session_owner_vtab = { /* .dup = */ generic_session_owner_dup, /* .delete = */ (void (*)(struct session_owner *)) free, /* .fd = */ generic_session_owner_fd, + /* .push = */ generic_session_owner_push, }; static struct session_owner * @@ -95,6 +96,19 @@ session_owner_create(struct session_owner *owner, enum session_type type) owner->vtab = &generic_session_owner_vtab; } +int +generic_session_owner_push(struct session_owner *owner, uint64_t sync, + struct port *port) +{ + (void) owner; + (void) sync; + (void) port; + const char *session = + tt_sprintf("Session '%s'", session_type_strs[owner->type]); + diag_set(ClientError, ER_UNSUPPORTED, session, "push()"); + return -1; +} + static inline uint64_t sid_max() { diff --git a/src/box/session.h b/src/box/session.h index 105dcab17..b5f4955d8 100644 --- a/src/box/session.h +++ b/src/box/session.h @@ -59,6 +59,7 @@ enum session_type { extern const char *session_type_strs[]; struct session_owner_vtab; +struct port; /** * Object to store session type specific data. For example, IProto @@ -78,8 +79,18 @@ struct session_owner_vtab { void (*free)(struct session_owner *); /** Get the descriptor of an owner, if has. Else -1. */ int (*fd)(const struct session_owner *); + /** Push a port data into a session owner's channel. */ + int (*push)(struct session_owner *, uint64_t, struct port *); }; +/** + * In a common case, a session does not support push. This + * function always returns -1 and sets ER_UNSUPPORTED error. + */ +int +generic_session_owner_push(struct session_owner *owner, uint64_t sync, + struct port *port); + static inline struct session_owner * session_owner_dup(struct session_owner *owner) { @@ -142,6 +153,12 @@ session_fd(const struct session *session) return session->owner->vtab->fd(session->owner); } +static inline int +session_push(struct session *session, uint64_t sync, struct port *port) +{ + return session->owner->vtab->push(session->owner, sync, port); +} + /** * Find a session by id. */ diff --git a/src/box/xrow.c b/src/box/xrow.c index f48525645..f3e929f69 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -43,6 +43,9 @@ #include "scramble.h" #include "iproto_constants.h" +static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f, + "encoded IPROTO_BODY keys must fit into one byte"); + int xrow_header_decode(struct xrow_header *header, const char **pos, const char *end) @@ -231,6 +234,9 @@ struct PACKED iproto_body_bin { uint32_t v_data_len; /* string length of array size */ }; +static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN == + IPROTO_SELECT_HEADER_LEN, "size of the prepared select"); + static const struct iproto_body_bin iproto_body_bin = { 0x81, IPROTO_DATA, 0xdd, 0 }; @@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = { 0x81, IPROTO_ERROR, 0xdb, 0 }; +struct PACKED iproto_body_push_bin { + uint8_t m_body; /* MP_MAP */ + uint8_t k_data; /* IPROTO_PUSH */ + uint8_t v_data; /* 1-size MP_ARRAY */ +}; + +static_assert(sizeof(struct iproto_body_push_bin) + IPROTO_HEADER_LEN == + IPROTO_PUSH_HEADER_LEN, "size of the prepared push"); + +static const struct iproto_body_push_bin iproto_push_bin = { + 0x81, IPROTO_PUSH, 0x91 +}; + /** Return a 4-byte numeric error code, with status flags. */ static inline uint32_t iproto_encode_error(uint32_t error) @@ -337,23 +356,21 @@ iproto_write_error(int fd, const struct error *e, uint32_t schema_version, (void) write(fd, e->errmsg, msg_len); } -enum { SVP_SIZE = IPROTO_HEADER_LEN + sizeof(iproto_body_bin) }; - int -iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp) +iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size) { /** * Reserve memory before taking a savepoint. * This ensures that we get a contiguous chunk of memory * and the savepoint is pointing at the beginning of it. */ - void *ptr = obuf_reserve(buf, SVP_SIZE); + void *ptr = obuf_reserve(buf, size); if (ptr == NULL) { - diag_set(OutOfMemory, SVP_SIZE, "obuf", "reserve"); + diag_set(OutOfMemory, size, "obuf_reserve", "ptr"); return -1; } *svp = obuf_create_svp(buf); - ptr = obuf_alloc(buf, SVP_SIZE); + ptr = obuf_alloc(buf, size); assert(ptr != NULL); return 0; } @@ -373,6 +390,17 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body)); } +void +iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t schema_version) +{ + char *pos = (char *) obuf_svp_to_ptr(buf, svp); + iproto_header_encode(pos, IPROTO_OK, sync, schema_version, + obuf_size(buf) - svp->used - IPROTO_HEADER_LEN); + memcpy(pos + IPROTO_HEADER_LEN, &iproto_push_bin, + sizeof(iproto_push_bin)); +} + int xrow_decode_dml(struct xrow_header *row, struct request *request, uint64_t key_map) diff --git a/src/box/xrow.h b/src/box/xrow.h index d407d151b..d2ec394c5 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -50,6 +50,10 @@ enum { XROW_HEADER_LEN_MAX = 40, XROW_BODY_LEN_MAX = 128, IPROTO_HEADER_LEN = 28, + /** 7 = sizeof(iproto_body_bin). */ + IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7, + /** 3 = sizeof(iproto_body_push_bin). */ + IPROTO_PUSH_HEADER_LEN = IPROTO_HEADER_LEN + 3, }; struct xrow_header { @@ -344,8 +348,18 @@ iproto_header_encode(char *data, uint32_t type, uint64_t sync, struct obuf; struct obuf_svp; +/** + * Reserve obuf space for a header, which depends on a response + * size. + */ int -iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp); +iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size); + +static inline int +iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp) +{ + return iproto_prepare_header(buf, svp, IPROTO_SELECT_HEADER_LEN); +} /** * Write select header to a preallocated buffer. @@ -355,6 +369,16 @@ void iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, uint32_t schema_version, uint32_t count); +static inline int +iproto_prepare_push(struct obuf *buf, struct obuf_svp *svp) +{ + return iproto_prepare_header(buf, svp, IPROTO_PUSH_HEADER_LEN); +} + +void +iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t schema_version); + /** * Encode iproto header with IPROTO_OK response code. * @param out Encode to. diff --git a/src/fio.c b/src/fio.c index b79d3d058..a813b7760 100644 --- a/src/fio.c +++ b/src/fio.c @@ -134,6 +134,18 @@ fio_writen(int fd, const void *buf, size_t count) return 0; } +int +fio_write_silent(int fd, const void *buf, size_t count) +{ + ssize_t nwr = write(fd, buf, count); + if (nwr >= 0) + return nwr; + if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN) + return 0; + else + return -1; +} + ssize_t fio_writev(int fd, struct iovec *iov, int iovcnt) { diff --git a/src/fio.h b/src/fio.h index 12749afcb..c88e19258 100644 --- a/src/fio.h +++ b/src/fio.h @@ -108,6 +108,22 @@ fio_pread(int fd, void *buf, size_t count, off_t offset); int fio_writen(int fd, const void *buf, size_t count); +/** + * Write the given buffer with no retrying for partial writes, + * and ignore transient write errors: EINTR, EWOULDBLOCK and + * EAGAIN - these errors are not logged, and the function returns + * 0. + * @param fd File descriptor to write to. + * @param buf Buffer to write. + * @param count Size of @a buf. + * + * @retval -1 Non-transient error. + * @retval 0 Nothing to write, or a transient error. + * @retval > 0 Written byte count. + */ +int +fio_write_silent(int fd, const void *buf, size_t count); + /** * A simple wrapper around writev(). * Re-tries write in case of EINTR. diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua index 48d28bd6d..c91576dc2 100755 --- a/test/app-tap/console.test.lua +++ b/test/app-tap/console.test.lua @@ -21,7 +21,7 @@ local EOL = "\n...\n" test = tap.test("console") -test:plan(59) +test:plan(61) -- Start console and connect to it local server = console.listen(CONSOLE_SOCKET) @@ -31,6 +31,13 @@ local handshake = client:read{chunk = 128} test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake') test:ok(client ~= nil, "connect to console") +-- +-- gh-2677: box.session.push, text protocol support. +-- +client:write('box.session.push(200, {sync = 0})\n') +test:is(client:read(EOL):find('push:'), 1, "read pushed value") +test:is(client:read(EOL):find('true'), 7, "read box.session.push result") + -- Execute some command client:write("1\n") test:is(yaml.decode(client:read(EOL))[1], 1, "eval") diff --git a/test/box/net.box.result b/test/box/net.box.result index 46d85b327..ecf86af0d 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); --- ... diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index 87e26f84c..2c19ee479 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts) return cn:_request('select', opts, space_id, index_id, iterator, offset, limit, key) end -function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end +function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end test_run:cmd("setopt delimiter ''"); LISTEN = require('uri').parse(box.cfg.listen) diff --git a/test/box/push.result b/test/box/push.result new file mode 100644 index 000000000..747503f59 --- /dev/null +++ b/test/box/push.result @@ -0,0 +1,364 @@ +-- +-- gh-2677: box.session.push. +-- +-- +-- Usage. +-- +box.session.push() +--- +- error: 'Usage: box.session.push(data, opts)' +... +box.session.push(100) +--- +- error: 'Usage: box.session.push(data, opts)' +... +box.session.push(100, {sync = -200}) +--- +- error: 'Usage: box.session.push(data, opts)' +... +-- +-- Test text protocol. +-- +test_run = require('test_run').new() +--- +... +console = require('console') +--- +... +netbox = require('net.box') +--- +... +fiber = require('fiber') +--- +... +s = console.listen(3434) +--- +... +c = netbox.connect(3434, {console = true}) +--- +... +c:eval('100') +--- +- '--- + + - 100 + + ... + +' +... +messages = {} +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function on_push(message) + table.insert(messages, message) +end; +--- +... +function do_pushes() + local sync = box.session.sync() + for i = 1, 5 do + box.session.push(i, {sync = sync}) + fiber.sleep(0.01) + end + return 300 +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Ensure a pushed message can ignored on a client. +c:eval('do_pushes()') +--- +- '--- + + - 300 + + ... + +' +... +-- Now start catching pushes. +c:eval('do_pushes()', {on_push = on_push}) +--- +- '--- + + - 300 + + ... + +' +... +messages +--- +- - '--- + + - 1 + + ... + +' + - '--- + + - 2 + + ... + +' + - '--- + + - 3 + + ... + +' + - '--- + + - 4 + + ... + +' + - '--- + + - 5 + + ... + +' +... +c:close() +--- +... +s:close() +--- +- true +... +-- +-- Test binary protocol. +-- +box.schema.user.grant('guest','read,write,execute','universe') +--- +... +c = netbox.connect(box.cfg.listen) +--- +... +c:ping() +--- +- true +... +messages = {} +--- +... +c:call('do_pushes', {}, {on_push = on_push}) +--- +- 300 +... +messages +--- +- - 1 + - 2 + - 3 + - 4 + - 5 +... +-- Add a little stress: many pushes with different syncs, from +-- different fibers and DML/DQL requests. +catchers = {} +--- +... +started = 0 +--- +... +finished = 0 +--- +... +s = box.schema.create_space('test', {format = {{'field1', 'integer'}}}) +--- +... +pk = s:create_index('pk') +--- +... +c:reload_schema() +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function dml_push_and_dml(key) + local sync = box.session.sync() + box.session.push('started dml', {sync = sync}) + s:replace{key} + box.session.push('continued dml', {sync = sync}) + s:replace{-key} + box.session.push('finished dml', {sync = sync}) + return key +end; +--- +... +function do_pushes(val) + local sync = box.session.sync() + for i = 1, 5 do + box.session.push(i, {sync = sync}) + fiber.yield() + end + return val +end; +--- +... +function push_catcher_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = false} + catcher.retval = c:call('do_pushes', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +--- +... +function dml_push_and_dml_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = true} + catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +--- +... +-- At first check that a pushed message can be ignored in a binary +-- protocol too. +c:call('do_pushes', {300}); +--- +- 300 +... +-- Then do stress. +for i = 1, 200 do + fiber.create(dml_push_and_dml_f) + fiber.create(push_catcher_f) +end; +--- +... +while finished ~= 400 do fiber.sleep(0.1) end; +--- +... +for _, c in pairs(catchers) do + if c.is_dml then + assert(#c.messages == 3) + assert(c.messages[1] == 'started dml') + assert(c.messages[2] == 'continued dml') + assert(c.messages[3] == 'finished dml') + assert(s:get{c.retval}) + assert(s:get{-c.retval}) + else + assert(c.retval) + assert(#c.messages == 5) + for k, v in pairs(c.messages) do + assert(k == v) + end + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +#s:select{} +--- +- 400 +... +-- +-- Test binary pushes. +-- +ibuf = require('buffer').ibuf() +--- +... +msgpack = require('msgpack') +--- +... +messages = {} +--- +... +resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf}) +--- +... +resp_len +--- +- 10 +... +messages +--- +- - 4 + - 4 + - 4 + - 4 + - 4 +... +decoded = {} +--- +... +r = nil +--- +... +for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end +--- +... +decoded +--- +- - {50: [1]} + - {50: [2]} + - {50: [3]} + - {50: [4]} + - {50: [5]} +... +r, _ = msgpack.decode_unchecked(ibuf.rpos) +--- +... +r +--- +- {48: [300]} +... +c:close() +--- +... +s:drop() +--- +... +box.schema.user.revoke('guest', 'read,write,execute', 'universe') +--- +... +-- +-- Ensure can not push in background. +-- +ok = nil +--- +... +err = nil +--- +... +function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end +--- +... +f = fiber.create(back_push_f) +--- +... +while f:status() ~= 'dead' do fiber.sleep(0.01) end +--- +... +ok, err +--- +- false +- Session 'background' does not support push() +... diff --git a/test/box/push.test.lua b/test/box/push.test.lua new file mode 100644 index 000000000..02d571876 --- /dev/null +++ b/test/box/push.test.lua @@ -0,0 +1,163 @@ +-- +-- gh-2677: box.session.push. +-- + +-- +-- Usage. +-- +box.session.push() +box.session.push(100) +box.session.push(100, {sync = -200}) + +-- +-- Test text protocol. +-- +test_run = require('test_run').new() +console = require('console') +netbox = require('net.box') +fiber = require('fiber') +s = console.listen(3434) +c = netbox.connect(3434, {console = true}) +c:eval('100') +messages = {} +test_run:cmd("setopt delimiter ';'") +function on_push(message) + table.insert(messages, message) +end; + +function do_pushes() + local sync = box.session.sync() + for i = 1, 5 do + box.session.push(i, {sync = sync}) + fiber.sleep(0.01) + end + return 300 +end; +test_run:cmd("setopt delimiter ''"); +-- Ensure a pushed message can ignored on a client. +c:eval('do_pushes()') +-- Now start catching pushes. +c:eval('do_pushes()', {on_push = on_push}) +messages + +c:close() +s:close() + +-- +-- Test binary protocol. +-- +box.schema.user.grant('guest','read,write,execute','universe') + +c = netbox.connect(box.cfg.listen) +c:ping() +messages = {} +c:call('do_pushes', {}, {on_push = on_push}) +messages + +-- Add a little stress: many pushes with different syncs, from +-- different fibers and DML/DQL requests. + +catchers = {} +started = 0 +finished = 0 +s = box.schema.create_space('test', {format = {{'field1', 'integer'}}}) +pk = s:create_index('pk') +c:reload_schema() +test_run:cmd("setopt delimiter ';'") +function dml_push_and_dml(key) + local sync = box.session.sync() + box.session.push('started dml', {sync = sync}) + s:replace{key} + box.session.push('continued dml', {sync = sync}) + s:replace{-key} + box.session.push('finished dml', {sync = sync}) + return key +end; +function do_pushes(val) + local sync = box.session.sync() + for i = 1, 5 do + box.session.push(i, {sync = sync}) + fiber.yield() + end + return val +end; +function push_catcher_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = false} + catcher.retval = c:call('do_pushes', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +function dml_push_and_dml_f() + fiber.yield() + started = started + 1 + local catcher = {messages = {}, retval = nil, is_dml = true} + catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message) + table.insert(catcher.messages, message) + end}) + table.insert(catchers, catcher) + finished = finished + 1 +end; +-- At first check that a pushed message can be ignored in a binary +-- protocol too. +c:call('do_pushes', {300}); +-- Then do stress. +for i = 1, 200 do + fiber.create(dml_push_and_dml_f) + fiber.create(push_catcher_f) +end; +while finished ~= 400 do fiber.sleep(0.1) end; + +for _, c in pairs(catchers) do + if c.is_dml then + assert(#c.messages == 3) + assert(c.messages[1] == 'started dml') + assert(c.messages[2] == 'continued dml') + assert(c.messages[3] == 'finished dml') + assert(s:get{c.retval}) + assert(s:get{-c.retval}) + else + assert(c.retval) + assert(#c.messages == 5) + for k, v in pairs(c.messages) do + assert(k == v) + end + end +end; +test_run:cmd("setopt delimiter ''"); + +#s:select{} + +-- +-- Test binary pushes. +-- +ibuf = require('buffer').ibuf() +msgpack = require('msgpack') +messages = {} +resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf}) +resp_len +messages +decoded = {} +r = nil +for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end +decoded +r, _ = msgpack.decode_unchecked(ibuf.rpos) +r + +c:close() +s:drop() + +box.schema.user.revoke('guest', 'read,write,execute', 'universe') + +-- +-- Ensure can not push in background. +-- +ok = nil +err = nil +function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end +f = fiber.create(back_push_f) +while f:status() ~= 'dead' do fiber.sleep(0.01) end +ok, err diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result index d561b4813..7b0f2993f 100644 --- a/test/replication/before_replace.result +++ b/test/replication/before_replace.result @@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3"); --- - true ... +-- +-- gh-2677 - test that an applier can not push() messages. Applier +-- session is available in Lua, so the test is here instead of +-- box/push.test.lua. +-- +push_ok = nil +push_err = nil _ = box.space.test:before_replace(function(old, new) + if box.session.type() == 'applier' and not push_err then + push_ok, push_err = pcall(box.session.push, 100, {sync = 100}) + end if old ~= nil and new ~= nil then return new[2] > old[2] and new or old end @@ -187,6 +197,10 @@ box.space.test:select() - [9, 90] - [10, 100] ... +push_err +--- +- Session 'applier' does not support push() +... test_run:cmd('restart server autobootstrap3') box.space.test:select() --- diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua index 2c6912d06..04e5a2172 100644 --- a/test/replication/before_replace.test.lua +++ b/test/replication/before_replace.test.lua @@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new) end end); test_run:cmd("switch autobootstrap3"); +-- +-- gh-2677 - test that an applier can not push() messages. Applier +-- session is available in Lua, so the test is here instead of +-- box/push.test.lua. +-- +push_ok = nil +push_err = nil _ = box.space.test:before_replace(function(old, new) + if box.session.type() == 'applier' and not push_err then + push_ok, push_err = pcall(box.session.push, 100, {sync = 100}) + end if old ~= nil and new ~= nil then return new[2] > old[2] and new or old end @@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2') box.space.test:select() test_run:cmd("switch autobootstrap3") box.space.test:select() +push_err test_run:cmd('restart server autobootstrap3') box.space.test:select() -- 2.14.3 (Apple Git-98)