From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id BDD262925D for ; Fri, 1 Jun 2018 16:56:02 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id Z83ZLf0z037G for ; Fri, 1 Jun 2018 16:56:02 -0400 (EDT) Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 45FD929250 for ; Fri, 1 Jun 2018 16:56:02 -0400 (EDT) From: Vladislav Shpilevoy Subject: [tarantool-patches] [PATCH v3 1/4] session: introduce text box.session.push Date: Fri, 1 Jun 2018 23:55:55 +0300 Message-Id: <2b80df4920e268a7d3cdd92e076578230b5f4163.1527886471.git.v.shpilevoy@tarantool.org> In-Reply-To: References: In-Reply-To: References: Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: kostja@tarantool.org box.session.push allows to send some intermediate results in the scope of main request with no finalizing it. Messages can be sent over text and binary protocol. This patch allows to send text pushes. Text push is a YAML document tagged with '!push!' handle and 'tag:tarantool.io/push,2018' prefix. YAML tags is a standard way to define a type of the document. Console received push message just prints it to the stdout (or sends to a next console, if it is remote console too). Part of #2677 --- src/box/lua/call.c | 4 +++ src/box/lua/console.c | 74 ++++++++++++++++++++++++++++++++++++++++++- src/box/lua/console.lua | 27 +++++++++++++--- src/box/lua/session.c | 6 ++-- src/box/port.c | 7 ++++ src/box/port.h | 17 ++++++++++ src/diag.h | 3 ++ src/sio.cc | 16 ++++++++++ test/app-tap/console.test.lua | 6 ++-- 9 files changed, 150 insertions(+), 10 deletions(-) diff --git a/src/box/lua/call.c b/src/box/lua/call.c index 831583d26..e682820df 100644 --- a/src/box/lua/call.c +++ b/src/box/lua/call.c @@ -412,9 +412,13 @@ port_lua_destroy(struct port *base) luaL_unref(tarantool_L, LUA_REGISTRYINDEX, port->ref); } +extern const char * +port_lua_dump_plain(struct port *port, uint32_t *size); + static const struct port_vtab port_lua_vtab = { .dump_msgpack = port_lua_dump, .dump_msgpack_16 = port_lua_dump_16, + .dump_plain = port_lua_dump_plain, .destroy = port_lua_destroy, }; diff --git a/src/box/lua/console.c b/src/box/lua/console.c index fea96bc05..49065a53c 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -31,6 +31,8 @@ #include "box/lua/console.h" #include "box/session.h" +#include "box/port.h" +#include "box/error.h" #include "lua/utils.h" #include "lua/fiber.h" #include "fiber.h" @@ -366,6 +368,76 @@ console_session_fd(struct session *session) return session->meta.fd; } +/** + * Dump port lua data as a YAML document tagged with !push! global + * tag. + * @param port Port lua. + * @param[out] size Size of the result. + * + * @retval not NULL Tagged YAML document. + * @retval NULL Error. + */ +const char * +port_lua_dump_plain(struct port *port, uint32_t *size) +{ + struct port_lua *port_lua = (struct port_lua *) port; + struct lua_State *L = port_lua->L; + int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!", + "tag:tarantool.io/push,2018"); + if (rc == 2) { + /* + * Nil and error object are pushed onto the stack. + */ + assert(lua_isnil(L, -2)); + assert(lua_isstring(L, -1)); + diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1)); + return NULL; + } + assert(rc == 1); + assert(lua_isstring(L, -1)); + size_t len; + const char *result = lua_tolstring(L, -1, &len); + *size = (uint32_t) len; + return result; +} + +/** + * Push a tagged YAML document into a console socket. + * @param session Console session. + * @param port Port with YAML to push. + * + * @retval 0 Success. + * @retval -1 Error. + */ +static int +console_session_push(struct session *session, struct port *port) +{ + assert(session_vtab_registry[session->type].push == + console_session_push); + uint32_t text_len; + const char *text = port_dump_plain(port, &text_len); + if (text == NULL) + return -1; + int fd = session_fd(session); + while (text_len > 0) { + ssize_t rc = write(fd, text, text_len); + if (rc < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + while (coio_wait(fd, COIO_WRITE, + TIMEOUT_INFINITY) != + COIO_WRITE); + } else if (errno != EINTR) { + diag_set(SocketError, fd, strerror(errno)); + return -1; + } + } else { + text_len -= (uint32_t) rc; + text += rc; + } + } + return 0; +} + void tarantool_lua_console_init(struct lua_State *L) { @@ -400,7 +472,7 @@ tarantool_lua_console_init(struct lua_State *L) */ lua_setfield(L, -2, "formatter"); struct session_vtab console_session_vtab = { - /* .push = */ generic_session_push, + /* .push = */ console_session_push, /* .fd = */ console_session_fd, /* .sync = */ generic_session_sync, }; diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua index bc4e02bfc..6271b416b 100644 --- a/src/box/lua/console.lua +++ b/src/box/lua/console.lua @@ -11,6 +11,7 @@ local yaml = require('yaml') local net_box = require('net.box') local YAML_TERM = '\n...\n' +local PUSH_TAG_HANDLE = '!push!' local function format(status, ...) local err @@ -92,13 +93,25 @@ local text_connection_mt = { -- eval = function(self, text) text = text..'$EOF$\n' - if self:write(text) then + if not self:write(text) then + error(self:set_error()) + end + while true do local rc = self:read() - if rc then + if not rc then + break + end + local handle, prefix = yaml.decode(rc, {tag_only = true}) + if not handle then + -- Can not fail - tags are encoded with no + -- user participation and are correct always. return rc end + if handle == PUSH_TAG_HANDLE and self.print_f then + self.print_f(rc) + end end - error(self:set_error()) + return rc end, -- -- Make the connection be in error state, set error @@ -121,15 +134,18 @@ local text_connection_mt = { -- netbox-like object. -- @param connection Socket to wrap. -- @param url Parsed destination URL. +-- @param print_f Function to print push messages. +-- -- @retval nil, err Error, and err contains an error message. -- @retval not nil Netbox-like object. -- -local function wrap_text_socket(connection, url) +local function wrap_text_socket(connection, url, print_f) local conn = setmetatable({ _socket = connection, state = 'active', host = url.host or 'localhost', port = url.service, + print_f = print_f, }, text_connection_mt) if not conn:write('require("console").delimiter("$EOF$")\n') or not conn:read() then @@ -369,7 +385,8 @@ local function connect(uri, opts) end local remote if greeting.protocol == 'Lua console' then - remote = wrap_text_socket(connection, u) + remote = wrap_text_socket(connection, u, + function(msg) self:print(msg) end) else opts = { connect_timeout = opts.timeout, diff --git a/src/box/lua/session.c b/src/box/lua/session.c index 05010c4c3..627f62f59 100644 --- a/src/box/lua/session.c +++ b/src/box/lua/session.c @@ -41,6 +41,7 @@ #include "box/session.h" #include "box/user.h" #include "box/schema.h" +#include "box/port.h" static const char *sessionlib_name = "box.session"; @@ -367,8 +368,9 @@ lbox_session_push(struct lua_State *L) { if (lua_gettop(L) != 1) return luaL_error(L, "Usage: box.session.push(data)"); - - if (session_push(current_session(), NULL) != 0) { + struct port port; + port_lua_create(&port, L); + if (session_push(current_session(), &port) != 0) { lua_pushnil(L); luaT_pusherror(L, box_error_last()); return 2; diff --git a/src/box/port.c b/src/box/port.c index 255eb732c..03ca323d7 100644 --- a/src/box/port.c +++ b/src/box/port.c @@ -143,6 +143,12 @@ port_dump_msgpack_16(struct port *port, struct obuf *out) return port->vtab->dump_msgpack_16(port, out); } +const char * +port_dump_plain(struct port *port, uint32_t *size) +{ + return port->vtab->dump_plain(port, size); +} + void port_init(void) { @@ -159,5 +165,6 @@ port_free(void) const struct port_vtab port_tuple_vtab = { .dump_msgpack = port_tuple_dump_msgpack, .dump_msgpack_16 = port_tuple_dump_msgpack_16, + .dump_plain = NULL, .destroy = port_tuple_destroy, }; diff --git a/src/box/port.h b/src/box/port.h index 6f1e5f364..882bb3791 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -77,6 +77,11 @@ struct port_vtab { * 1.6 format. */ int (*dump_msgpack_16)(struct port *port, struct obuf *out); + /** + * Dump a port content as a plain text into a buffer, + * allocated inside. + */ + const char *(*dump_plain)(struct port *port, uint32_t *size); /** * Destroy a port and release associated resources. */ @@ -179,6 +184,18 @@ port_dump_msgpack(struct port *port, struct obuf *out); int port_dump_msgpack_16(struct port *port, struct obuf *out); +/** + * Dump a port content as a plain text into a buffer, + * allocated inside. + * @param port Port with data to dump. + * @param[out] size Length of a result plain text. + * + * @retval nil Error. + * @retval not nil Plain text. + */ +const char * +port_dump_plain(struct port *port, uint32_t *size); + void port_init(void); diff --git a/src/diag.h b/src/diag.h index bd5a539b0..24b6383b6 100644 --- a/src/diag.h +++ b/src/diag.h @@ -251,6 +251,9 @@ struct error * BuildXlogError(const char *file, unsigned line, const char *format, ...); struct error * BuildCollationError(const char *file, unsigned line, const char *format, ...); +struct error * +BuildSocketError(const char *file, unsigned line, int fd, const char *format, + ...); struct index_def; diff --git a/src/sio.cc b/src/sio.cc index c906a97a8..8d71f0382 100644 --- a/src/sio.cc +++ b/src/sio.cc @@ -67,6 +67,22 @@ SocketError::SocketError(const char *file, unsigned line, int fd, errno = save_errno; } +struct error * +BuildSocketError(const char *file, unsigned line, int fd, const char *format, + ...) +{ + try { + SocketError *e = new SocketError(file, line, fd, ""); + va_list ap; + va_start(ap, format); + error_vformat_msg(e, format, ap); + va_end(ap); + return e; + } catch (OutOfMemory *e) { + return e; + } +} + /** Pretty print socket name and peer (for exceptions) */ const char * sio_socketname(int fd) diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua index a5b3061a9..237b3d002 100755 --- a/test/app-tap/console.test.lua +++ b/test/app-tap/console.test.lua @@ -21,7 +21,7 @@ local EOL = "\n...\n" test = tap.test("console") -test:plan(60) +test:plan(61) -- Start console and connect to it local server = console.listen(CONSOLE_SOCKET) @@ -35,7 +35,9 @@ test:ok(client ~= nil, "connect to console") -- gh-2677: box.session.push, text protocol support. -- client:write('box.session.push(200)\n') -test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work") +test:is(client:read(EOL), '%TAG !push! tag:tarantool.io/push,2018\n--- 200\n...\n', + "pushed message") +test:is(client:read(EOL), '---\n- true\n...\n', "pushed message") -- Execute some command client:write("1\n") -- 2.15.1 (Apple Git-101)