From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladislav Shpilevoy Subject: [PATCH v2 08/10] session: introduce text box.session.push Date: Fri, 20 Apr 2018 16:24:33 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: tarantool-patches@freelists.org Cc: vdavydov.dev@gmail.com List-ID: box.session.push allows to send some intermediate results in the scope of main request with no finalizing it. Messages can be sent over text and binary protocol. This patch allows to send text pushes. Text push is a YAML document tagged with '!push!' handle and 'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way to define a type of the document. Console received push message just prints it to the stdout (or sends to a next console, if it is remote console too). Part of #2677 --- src/box/lua/console.c | 45 +++++++++++++++++++++++++++++++++++- src/box/lua/console.h | 10 ++++++++ src/box/lua/console.lua | 29 +++++++++++++++++++---- src/box/lua/session.c | 54 ++++++++++++++++++++++++++++++++++++++++++- src/box/port.c | 6 +++++ src/box/port.h | 17 ++++++++++++++ src/fio.c | 2 +- src/fio.h | 2 +- test/app-tap/console.test.lua | 6 +++-- 9 files changed, 160 insertions(+), 11 deletions(-) diff --git a/src/box/lua/console.c b/src/box/lua/console.c index a3bf83cb1..c4886db8c 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -31,10 +31,12 @@ #include "box/lua/console.h" #include "box/session.h" +#include "box/port.h" #include "lua/utils.h" #include "lua/fiber.h" #include "fiber.h" #include "coio.h" +#include "fio.h" #include "lua-yaml/lyaml.h" #include #include @@ -364,6 +366,47 @@ console_session_fd(struct session *session) return session->meta.fd; } +int +console_encode_push(struct lua_State *L) +{ + return lua_yaml_encode_tagged(L, luaL_yaml_default, "!push!", + "tag:tarantool.io/push,2018"); +} + +/** + * Push a tagged YAML document into a console socket. + * @param session Console session. + * @param port Port with YAML to push. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +console_session_push(struct session *session, struct port *port) +{ + assert(session_vtab_registry[session->type].push == + console_session_push); + uint32_t text_len; + const char *text = port_dump_plain(port, &text_len); + if (text == NULL) + return -1; + int fd = session_fd(session); + while (text_len > 0) { + while (coio_wait(fd, COIO_WRITE, + TIMEOUT_INFINITY) != COIO_WRITE); + const struct iovec iov = { + .iov_base = (void *) text, + .iov_len = text_len + }; + ssize_t rc = fio_writev(fd, &iov, 1); + if (rc < 0) + return -1; + text_len -= rc; + text += rc; + } + return 0; +} + void tarantool_lua_console_init(struct lua_State *L) { @@ -400,7 +443,7 @@ tarantool_lua_console_init(struct lua_State *L) */ lua_setfield(L, -2, "formatter"); struct session_vtab console_session_vtab = { - /* .push = */ generic_session_push, + /* .push = */ console_session_push, /* .fd = */ console_session_fd, /* .sync = */ generic_session_sync, }; diff --git a/src/box/lua/console.h b/src/box/lua/console.h index 208b31490..6d1449810 100644 --- a/src/box/lua/console.h +++ b/src/box/lua/console.h @@ -36,6 +36,16 @@ extern "C" { struct lua_State; +/** + * Encode a single value on top of the stack into YAML document + * tagged as push message. + * @param object Any lua object on top of the stack. + * @retval nil, error Error occured. + * @retval not nil Tagged YAML document. + */ +int +console_encode_push(struct lua_State *L); + void tarantool_lua_console_init(struct lua_State *L); diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index bc4e02bfc..b8ae5ba59 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -11,6 +11,7 @@ local yaml = require('yaml') local net_box = require('net.box') local YAML_TERM = '\n...\n' +local PUSH_TAG_HANDLE = '!push!' local function format(status, ...) local err @@ -92,13 +93,27 @@ local text_connection_mt = { -- eval = function(self, text) text = text..'$EOF$\n' - if self:write(text) then + if not self:write(text) then + error(self:set_error()) + end + while true do local rc = self:read() - if rc then + if not rc then + break + end + local handle, prefix = yaml.decode_tag(rc) + assert(handle or not prefix) + if not handle then + -- Can not fail - tags are encoded with no + -- user participation and are correct always. + assert(not prefix) return rc end + if handle == PUSH_TAG_HANDLE and self.print_f then + self.print_f(rc) + end end - error(self:set_error()) + return rc end, -- -- Make the connection be in error state, set error @@ -121,15 +136,18 @@ local text_connection_mt = { -- netbox-like object. -- @param connection Socket to wrap. -- @param url Parsed destination URL. +-- @param print_f Function to print push messages. +-- -- @retval nil, err Error, and err contains an error message. -- @retval not nil Netbox-like object. -- -local function wrap_text_socket(connection, url) +local function wrap_text_socket(connection, url, print_f) local conn = setmetatable({ _socket = connection, state = 'active', host = url.host or 'localhost', port = url.service, + print_f = print_f, }, text_connection_mt) if not conn:write('require("console").delimiter("$EOF$")\n') or not conn:read() then @@ -369,7 +387,8 @@ local function connect(uri, opts) end local remote if greeting.protocol == 'Lua console' then - remote = wrap_text_socket(connection, u) + remote = wrap_text_socket(connection, u, + function(msg) self:print(msg) end) else opts = { connect_timeout = opts.timeout, diff --git a/src/box/lua/session.c b/src/box/lua/session.c index 5fe5f08d4..306271809 100644 --- a/src/box/lua/session.c +++ b/src/box/lua/session.c @@ -41,6 +41,8 @@ #include "box/session.h" #include "box/user.h" #include "box/schema.h" +#include "box/port.h" +#include "box/lua/console.h" static const char *sessionlib_name = "box.session"; @@ -355,6 +357,52 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event) return 3; } +/** + * Port to push a message from Lua. + */ +struct lua_push_port { + const struct port_vtab *vtab; + /** + * Lua state, containing data to dump on top of the stack. + */ + struct lua_State *L; +}; + +static const char * +lua_push_port_dump_plain(struct port *port, uint32_t *size); + +static const struct port_vtab lua_push_port_vtab = { + .dump_msgpack = NULL, + /* + * Dump_16 has no sense, since push appears since 1.10 + * protocol. + */ + .dump_msgpack_16 = NULL, + .dump_plain = lua_push_port_dump_plain, + .destroy = NULL, +}; + +static const char * +lua_push_port_dump_plain(struct port *port, uint32_t *size) +{ + struct lua_push_port *lua_port = (struct lua_push_port *) port; + assert(lua_port->vtab == &lua_push_port_vtab); + struct lua_State *L = lua_port->L; + int rc = console_encode_push(L); + if (rc == 2) { + assert(lua_isnil(L, -2)); + assert(lua_isstring(L, -1)); + diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1)); + return NULL; + } + assert(rc == 1); + assert(lua_isstring(L, -1)); + size_t len; + const char *result = lua_tolstring(L, -1, &len); + *size = (uint32_t) len; + return result; +} + /** * Push a message using a protocol, depending on a session type. * @param data Data to push, first argument on a stack. @@ -367,7 +415,11 @@ lbox_session_push(struct lua_State *L) if (lua_gettop(L) != 1) return luaL_error(L, "Usage: box.session.push(data)"); - if (session_push(current_session(), NULL) != 0) { + struct lua_push_port port; + port.vtab = &lua_push_port_vtab; + port.L = L; + + if (session_push(current_session(), (struct port *) &port) != 0) { lua_pushnil(L); luaT_pusherror(L, box_error_last()); return 2; diff --git a/src/box/port.c b/src/box/port.c index 255eb732c..f9b655840 100644 --- a/src/box/port.c +++ b/src/box/port.c @@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out) return port->vtab->dump_msgpack_16(port, out); } +const char * +port_dump_plain(struct port *port, uint32_t *size) +{ + return port->vtab->dump_plain(port, size); +} + void port_init(void) { diff --git a/src/box/port.h b/src/box/port.h index 1c44b9b00..7fc1b8972 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -76,6 +76,11 @@ struct port_vtab { * 1.6 format. */ int (*dump_msgpack_16)(struct port *port, struct obuf *out); + /** + * Dump a port content as a plain text into a buffer, + * allocated inside. + */ + const char *(*dump_plain)(struct port *port, uint32_t *size); /** * Destroy a port and release associated resources. */ @@ -158,6 +163,18 @@ port_dump_msgpack(struct port *port, struct obuf *out); int port_dump_msgpack_16(struct port *port, struct obuf *out); +/** + * Dump a port content as a plain text into a buffer, + * allocated inside. + * @param port Port with data to dump. + * @param[out] size Length of a result plain text. + * + * @retval nil Error. + * @retval not nil Plain text. + */ +const char * +port_dump_plain(struct port *port, uint32_t *size); + void port_init(void); diff --git a/src/fio.c b/src/fio.c index b79d3d058..b1d9ecf44 100644 --- a/src/fio.c +++ b/src/fio.c @@ -135,7 +135,7 @@ fio_writen(int fd, const void *buf, size_t count) } ssize_t -fio_writev(int fd, struct iovec *iov, int iovcnt) +fio_writev(int fd, const struct iovec *iov, int iovcnt) { assert(iov && iovcnt >= 0); ssize_t nwr; diff --git a/src/fio.h b/src/fio.h index 12749afcb..fb6383c81 100644 --- a/src/fio.h +++ b/src/fio.h @@ -133,7 +133,7 @@ fio_writen(int fd, const void *buf, size_t count); * returns the total number of bytes written, or -1 if error. */ ssize_t -fio_writev(int fd, struct iovec *iov, int iovcnt); +fio_writev(int fd, const struct iovec *iov, int iovcnt); /** * A wrapper around writev, but retries for partial writes diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua index d2e88b55b..b1d7166d4 100755 --- a/test/app-tap/console.test.lua +++ b/test/app-tap/console.test.lua @@ -21,7 +21,7 @@ local EOL = "\n...\n" test = tap.test("console") -test:plan(60) +test:plan(61) -- Start console and connect to it local server = console.listen(CONSOLE_SOCKET) @@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console") -- gh-2677: box.session.push, text protocol support. -- client:write('box.session.push(200)\n') -test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work") +test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n', + "pushed message") +test:is(client:read(EOL), '---\n- true\n...\n', "pushed message") -- Execute some command client:write("1\n") -- 2.15.1 (Apple Git-101)