From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: Re: [tarantool-patches] Re: [PATCH v2 08/10] session: introduce text box.session.push References: <20180510192754.GH30593@atlas> Message-ID: <1ed89043-c832-a27f-593a-f230984831f6@tarantool.org> Date: Thu, 24 May 2018 23:50:43 +0300 MIME-Version: 1.0 In-Reply-To: <20180510192754.GH30593@atlas> Content-Type: text/plain; charset=utf-8; format=flowed Content-Language: en-US Content-Transfer-Encoding: 7bit To: tarantool-patches@freelists.org, Konstantin Osipov Cc: vdavydov.dev@gmail.com List-ID: On 10/05/2018 22:27, Konstantin Osipov wrote: > * Vladislav Shpilevoy [18/04/20 16:31]: >> +/** >> + * Push a tagged YAML document into a console socket. >> + * @param session Console session. >> + * @param port Port with YAML to push. >> + * >> + * @retval 0 Success. >> + * @retval -1 Error. >> + */ >> +static int >> +console_session_push(struct session *session, struct port *port) >> +{ >> + assert(session_vtab_registry[session->type].push == >> + console_session_push); >> + uint32_t text_len; >> + const char *text = port_dump_plain(port, &text_len); >> + if (text == NULL) >> + return -1; >> + int fd = session_fd(session); >> + while (text_len > 0) { >> + while (coio_wait(fd, COIO_WRITE, >> + TIMEOUT_INFINITY) != COIO_WRITE); > > Nitpick: the socket is ready in 99% of cases, there is no reason > to call coio_wait() unless you get EINTR. > Second, why the choice of fio_writev() rather than write() or > send? We have discussed that verbally, but I duplicate it here for the record. I used fio_writev since it does not return -1 on EINTR, but it appeared to return -1 on EWOULDBLOCK/EAGAIN, so I was wrong - I can not use it here, you are right. > > What is wrong with coio_writev or similar? Because console socket has no ev_io object. You asked me investigate if lua socket has encapsulated ev_io - it does not. Lua socket always uses file descriptor only, and does coio_wait(int fd). So I can not pass it to session meta directly from socket.lua with no big refactoring of Lua socket. If you really want it, I can open a separate issue for that, label 'good first issue' and assign a student. It is a simple but big task, that does not block push. I refactored console push to write into a socket, and only then do coio_wait if necessary. diff --git a/src/box/lua/console.c b/src/box/lua/console.c index 65a2192da..5ded99c98 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -394,17 +394,20 @@ console_session_push(struct session *session, struct port *port) return -1; int fd = session_fd(session); while (text_len > 0) { - while (coio_wait(fd, COIO_WRITE, - TIMEOUT_INFINITY) != COIO_WRITE); - const struct iovec iov = { - .iov_base = (void *) text, - .iov_len = text_len - }; - ssize_t rc = fio_writev(fd, &iov, 1); - if (rc < 0) - return -1; - text_len -= rc; - text += rc; + ssize_t rc = write(fd, text, text_len); + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + while (coio_wait(fd, COIO_WRITE, + TIMEOUT_INFINITY) != + COIO_WRITE); + } else if (errno != EINTR) { + diag_set(SocketError, fd, strerror(errno)); + return -1; + } + } else { + text_len -= (uint32_t) rc; + text += rc; + } } return 0; } > >> + const struct iovec iov = { >> + .iov_base = (void *) text, >> + .iov_len = text_len >> + }; >> + ssize_t rc = fio_writev(fd, &iov, 1); >> + if (rc < 0) >> + return -1; >> + text_len -= rc; >> + text += rc; >> + } >> + return 0; >> +} >> @@ -92,13 +93,27 @@ local text_connection_mt = { >> -- >> eval = function(self, text) >> text = text..'$EOF$\n' >> - if self:write(text) then >> + if not self:write(text) then >> + error(self:set_error()) >> + end >> + while true do >> local rc = self:read() >> - if rc then >> + if not rc then >> + break >> + end >> + local handle, prefix = yaml.decode_tag(rc) >> + assert(handle or not prefix) >> + if not handle then >> + -- Can not fail - tags are encoded with no >> + -- user participation and are correct always. >> + assert(not prefix) > > In Lua, asserts take CPU time. Please don't use them unless in a > test. diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index b8ae5ba59..574882188 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -102,11 +102,9 @@ local text_connection_mt = { break end local handle, prefix = yaml.decode_tag(rc) - assert(handle or not prefix) if not handle then -- Can not fail - tags are encoded with no -- user participation and are correct always. - assert(not prefix) return rc end if handle == PUSH_TAG_HANDLE and self.print_f then > >> return rc >> end >> + if handle == PUSH_TAG_HANDLE and self.print_f then >> + self.print_f(rc) >> + end >> end >> - error(self:set_error()) >> + return rc >> end, >> -- >> -- Make the connection be in error state, set error >> @@ -121,15 +136,18 @@ local text_connection_mt = { >> -- netbox-like object. >> -- @param connection Socket to wrap. >> -- @param url Parsed destination URL. >> +-- @param print_f Function to print push messages. >> +-- >> -- @retval nil, err Error, and err contains an error message. >> -- @retval not nil Netbox-like object. >> -- >> -local function wrap_text_socket(connection, url) >> +local function wrap_text_socket(connection, url, print_f) >> local conn = setmetatable({ >> _socket = connection, >> state = 'active', >> host = url.host or 'localhost', >> port = url.service, >> + print_f = print_f, >> }, text_connection_mt) >> if not conn:write('require("console").delimiter("$EOF$")\n') or >> not conn:read() then >> @@ -369,7 +387,8 @@ local function connect(uri, opts) >> end >> local remote >> if greeting.protocol == 'Lua console' then >> - remote = wrap_text_socket(connection, u) >> + remote = wrap_text_socket(connection, u, >> + function(msg) self:print(msg) end) >> else >> opts = { >> connect_timeout = opts.timeout, >> diff --git a/src/box/lua/session.c b/src/box/lua/session.c >> index 5fe5f08d4..306271809 100644 >> --- a/src/box/lua/session.c >> +++ b/src/box/lua/session.c >> @@ -367,7 +415,11 @@ lbox_session_push(struct lua_State *L) >> if (lua_gettop(L) != 1) >> return luaL_error(L, "Usage: box.session.push(data)"); >> >> - if (session_push(current_session(), NULL) != 0) { >> + struct lua_push_port port; >> + port.vtab = &lua_push_port_vtab; > > Why do you need a separate port? I need a separate port, because origin struct port does not have struct lua_State *L. I need the lua stack in console push to get the object, encode it into YAML and send to a socket. > > And why do you need to create a port for every push? Can'st you > reuse the same port as is used for the Lua call itself? Lua call does not reuse port. The port is created on each call in iproto.cc on stack, exactly like here. I can not understand why do you think that creating an object on the stack and setting a pair of its fields makes noticeable performance input. Especially taking into account that Lua does most of work here. > >> + port.L = L; >> + >> + if (session_push(current_session(), (struct port *) &port) != 0) { >> lua_pushnil(L); >> luaT_pusherror(L, box_error_last()); >> return 2; >> diff --git a/src/box/port.c b/src/box/port.c >> index 255eb732c..f9b655840 100644 > The patch is below: ============================================================================= commit 9c626685ce752c03029f49fc8b8298fe6a2e2f9f Author: Vladislav Shpilevoy Date: Thu Apr 19 19:50:25 2018 +0300 session: introduce text box.session.push box.session.push allows to send some intermediate results in the scope of main request with no finalizing it. Messages can be sent over text and binary protocol. This patch allows to send text pushes. Text push is a YAML document tagged with '!push!' handle and 'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way to define a type of the document. Console received push message just prints it to the stdout (or sends to a next console, if it is remote console too). Part of #2677 diff --git a/src/box/lua/console.c b/src/box/lua/console.c index 7e17fa30a..5ded99c98 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -31,10 +31,12 @@ #include "box/lua/console.h" #include "box/session.h" +#include "box/port.h" #include "lua/utils.h" #include "lua/fiber.h" #include "fiber.h" #include "coio.h" #include "lua-yaml/lyaml.h" #include #include @@ -366,6 +368,50 @@ console_session_fd(struct session *session) return session->meta.fd; } +int +console_encode_push(struct lua_State *L) +{ + return lua_yaml_encode_tagged(L, luaL_yaml_default, "!push!", + "tag:tarantool.io/push,2018"); +} + +/** + * Push a tagged YAML document into a console socket. + * @param session Console session. + * @param port Port with YAML to push. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +console_session_push(struct session *session, struct port *port) +{ + assert(session_vtab_registry[session->type].push == + console_session_push); + uint32_t text_len; + const char *text = port_dump_plain(port, &text_len); + if (text == NULL) + return -1; + int fd = session_fd(session); + while (text_len > 0) { + ssize_t rc = write(fd, text, text_len); + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + while (coio_wait(fd, COIO_WRITE, + TIMEOUT_INFINITY) != + COIO_WRITE); + } else if (errno != EINTR) { + diag_set(SocketError, fd, strerror(errno)); + return -1; + } + } else { + text_len -= (uint32_t) rc; + text += rc; + } + } + return 0; +} + void tarantool_lua_console_init(struct lua_State *L) { @@ -400,7 +446,7 @@ tarantool_lua_console_init(struct lua_State *L) */ lua_setfield(L, -2, "formatter"); struct session_vtab console_session_vtab = { - /* .push = */ generic_session_push, + /* .push = */ console_session_push, /* .fd = */ console_session_fd, /* .sync = */ generic_session_sync, }; diff --git a/src/box/lua/console.h b/src/box/lua/console.h index 208b31490..6d1449810 100644 --- a/src/box/lua/console.h +++ b/src/box/lua/console.h @@ -36,6 +36,16 @@ extern "C" { struct lua_State; +/** + * Encode a single value on top of the stack into YAML document + * tagged as push message. + * @param object Any lua object on top of the stack. + * @retval nil, error Error occured. + * @retval not nil Tagged YAML document. + */ +int +console_encode_push(struct lua_State *L); + void tarantool_lua_console_init(struct lua_State *L); diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index bc4e02bfc..574882188 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -11,6 +11,7 @@ local yaml = require('yaml') local net_box = require('net.box') local YAML_TERM = '\n...\n' +local PUSH_TAG_HANDLE = '!push!' local function format(status, ...) local err @@ -92,13 +93,25 @@ local text_connection_mt = { -- eval = function(self, text) text = text..'$EOF$\n' - if self:write(text) then + if not self:write(text) then + error(self:set_error()) + end + while true do local rc = self:read() - if rc then + if not rc then + break + end + local handle, prefix = yaml.decode_tag(rc) + if not handle then + -- Can not fail - tags are encoded with no + -- user participation and are correct always. return rc end + if handle == PUSH_TAG_HANDLE and self.print_f then + self.print_f(rc) + end end - error(self:set_error()) + return rc end, -- -- Make the connection be in error state, set error @@ -121,15 +134,18 @@ local text_connection_mt = { -- netbox-like object. -- @param connection Socket to wrap. -- @param url Parsed destination URL. +-- @param print_f Function to print push messages. +-- -- @retval nil, err Error, and err contains an error message. -- @retval not nil Netbox-like object. -- -local function wrap_text_socket(connection, url) +local function wrap_text_socket(connection, url, print_f) local conn = setmetatable({ _socket = connection, state = 'active', host = url.host or 'localhost', port = url.service, + print_f = print_f, }, text_connection_mt) if not conn:write('require("console").delimiter("$EOF$")\n') or not conn:read() then @@ -369,7 +385,8 @@ local function connect(uri, opts) end local remote if greeting.protocol == 'Lua console' then - remote = wrap_text_socket(connection, u) + remote = wrap_text_socket(connection, u, + function(msg) self:print(msg) end) else opts = { connect_timeout = opts.timeout, diff --git a/src/box/lua/session.c b/src/box/lua/session.c index 05010c4c3..ade85491f 100644 --- a/src/box/lua/session.c +++ b/src/box/lua/session.c @@ -41,6 +41,8 @@ #include "box/session.h" #include "box/user.h" #include "box/schema.h" +#include "box/port.h" +#include "box/lua/console.h" static const char *sessionlib_name = "box.session"; @@ -355,6 +357,52 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event) return 3; } +/** + * Port to push a message from Lua. + */ +struct lua_push_port { + const struct port_vtab *vtab; + /** + * Lua state, containing data to dump on top of the stack. + */ + struct lua_State *L; +}; + +static const char * +lua_push_port_dump_plain(struct port *port, uint32_t *size); + +static const struct port_vtab lua_push_port_vtab = { + .dump_msgpack = NULL, + /* + * Dump_16 has no sense, since push appears since 1.10 + * protocol. + */ + .dump_msgpack_16 = NULL, + .dump_plain = lua_push_port_dump_plain, + .destroy = NULL, +}; + +static const char * +lua_push_port_dump_plain(struct port *port, uint32_t *size) +{ + struct lua_push_port *lua_port = (struct lua_push_port *) port; + assert(lua_port->vtab == &lua_push_port_vtab); + struct lua_State *L = lua_port->L; + int rc = console_encode_push(L); + if (rc == 2) { + assert(lua_isnil(L, -2)); + assert(lua_isstring(L, -1)); + diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1)); + return NULL; + } + assert(rc == 1); + assert(lua_isstring(L, -1)); + size_t len; + const char *result = lua_tolstring(L, -1, &len); + *size = (uint32_t) len; + return result; +} + /** * Push a message using a protocol, depending on a session type. * @param L Lua state. First argument on the stack is data to @@ -368,7 +416,11 @@ lbox_session_push(struct lua_State *L) if (lua_gettop(L) != 1) return luaL_error(L, "Usage: box.session.push(data)"); - if (session_push(current_session(), NULL) != 0) { + struct lua_push_port port; + port.vtab = &lua_push_port_vtab; + port.L = L; + + if (session_push(current_session(), (struct port *) &port) != 0) { lua_pushnil(L); luaT_pusherror(L, box_error_last()); return 2; diff --git a/src/box/port.c b/src/box/port.c index 255eb732c..f9b655840 100644 --- a/src/box/port.c +++ b/src/box/port.c @@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out) return port->vtab->dump_msgpack_16(port, out); } +const char * +port_dump_plain(struct port *port, uint32_t *size) +{ + return port->vtab->dump_plain(port, size); +} + void port_init(void) { diff --git a/src/box/port.h b/src/box/port.h index 1c44b9b00..7fc1b8972 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -76,6 +76,11 @@ struct port_vtab { * 1.6 format. */ int (*dump_msgpack_16)(struct port *port, struct obuf *out); + /** + * Dump a port content as a plain text into a buffer, + * allocated inside. + */ + const char *(*dump_plain)(struct port *port, uint32_t *size); /** * Destroy a port and release associated resources. */ @@ -158,6 +163,18 @@ port_dump_msgpack(struct port *port, struct obuf *out); int port_dump_msgpack_16(struct port *port, struct obuf *out); +/** + * Dump a port content as a plain text into a buffer, + * allocated inside. + * @param port Port with data to dump. + * @param[out] size Length of a result plain text. + * + * @retval nil Error. + * @retval not nil Plain text. + */ +const char * +port_dump_plain(struct port *port, uint32_t *size); + void port_init(void); diff --git a/src/diag.h b/src/diag.h index dc6c132d5..4fcbab5c3 100644 --- a/src/diag.h +++ b/src/diag.h @@ -249,6 +249,9 @@ struct error * BuildSystemError(const char *file, unsigned line, const char *format, ...); struct error * BuildXlogError(const char *file, unsigned line, const char *format, ...); +struct error * +BuildSocketError(const char *file, unsigned line, int fd, const char *format, + ...); struct index_def; diff --git a/src/sio.cc b/src/sio.cc index c906a97a8..8d71f0382 100644 --- a/src/sio.cc +++ b/src/sio.cc @@ -67,6 +67,22 @@ SocketError::SocketError(const char *file, unsigned line, int fd, errno = save_errno; } +struct error * +BuildSocketError(const char *file, unsigned line, int fd, const char *format, + ...) +{ + try { + SocketError *e = new SocketError(file, line, fd, ""); + va_list ap; + va_start(ap, format); + error_vformat_msg(e, format, ap); + va_end(ap); + return e; + } catch (OutOfMemory *e) { + return e; + } +} + /** Pretty print socket name and peer (for exceptions) */ const char * sio_socketname(int fd) diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua index a5b3061a9..237b3d002 100755 --- a/test/app-tap/console.test.lua +++ b/test/app-tap/console.test.lua @@ -21,7 +21,7 @@ local EOL = "\n...\n" test = tap.test("console") -test:plan(60) +test:plan(61) -- Start console and connect to it local server = console.listen(CONSOLE_SOCKET) @@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console") -- gh-2677: box.session.push, text protocol support. -- client:write('box.session.push(200)\n') -test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work") +test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n', + "pushed message") +test:is(client:read(EOL), '---\n- true\n...\n', "pushed message") -- Execute some command client:write("1\n")