From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng2.m.smailru.net (smtpng2.m.smailru.net [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 4B69A4696C3 for ; Wed, 8 Apr 2020 02:20:49 +0300 (MSK) From: Vladislav Shpilevoy Date: Wed, 8 Apr 2020 01:20:47 +0200 Message-Id: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [Tarantool-patches] [PATCH v2 1/1] box: export box_session_push to the public C API List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org, imun@tarantool.org API is different from box.session.push() - sync argument was removed. It will disappear from Lua API as well, because it just is not needed here. Session is omitted as well. Indeed, a user can't push to a foreign session, and the current session can be obtained inside box_session_push(). And anyway session is not in the public C API. Internally dump into iproto is done using obuf_dup(), just like tuple_to_obuf() does. obuf_alloc() would be a bad call here, because it wouldn't be able to split the pushed data into several obuf chunks, and would cause obuf fragmentation. Dump into plain text behaves just like a Lua push - it produces a YAML formatted text or Lua text depending on output format. But to turn MessagePack into YAML or Lua text an intermediate Lua representation is used, because there are no a MessagePack -> YAML and MessagePack -> Lua text translators yet. Closes #4734 @TarantoolBot document Title: box_session_push() C API There is a new function in the public C API: ```C int box_session_push(const char *data, const char *data_end); ``` It takes raw MessagePack, and behaves just like Lua `box.session.push()`. --- Branch: http://github.com/tarantool/tarantool/tree/gerold103/gh-4734-export-box-session-push Issue: https://github.com/tarantool/tarantool/issues/4734 Changes in v2: - Rebased on Chris' patch for box.session.push() Lua text format; - Fixed Igor's comments. @ChangeLog - box_session_push() new public C API function. It takes a const char * MessagePack, and returns it to the client out of order, like Lua analogue (box.session.push()) (gh-4734). extra/exports | 1 + src/box/box.cc | 14 +++++ src/box/box.h | 14 +++++ src/box/call.c | 44 +++++++++++++- src/box/lua/console.c | 75 ++++++++++++++++++++--- src/box/port.h | 15 +++++ test/box/function1.c | 7 +++ test/box/push.result | 133 +++++++++++++++++++++++++++++++++++++++++ test/box/push.test.lua | 51 ++++++++++++++++ 9 files changed, 344 insertions(+), 10 deletions(-) diff --git a/extra/exports b/extra/exports index f71cb7d93..2c19b5445 100644 --- a/extra/exports +++ b/extra/exports @@ -220,6 +220,7 @@ box_sequence_next box_sequence_current box_sequence_set box_sequence_reset +box_session_push box_index_iterator box_iterator_next box_iterator_free diff --git a/src/box/box.cc b/src/box/box.cc index 765d64678..15e79df19 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1455,6 +1455,20 @@ box_sequence_reset(uint32_t seq_id) return sequence_data_delete(seq_id); } +int +box_session_push(const char *data, const char *data_end) +{ + struct session *session = current_session(); + if (session == NULL) + return -1; + struct port_msgpack port; + struct port *base = (struct port *) &port; + port_msgpack_create(base, data, data_end - data); + int rc = session_push(session, session_sync(session), base); + port_msgpack_destroy(base); + return rc; +} + static inline void box_register_replica(uint32_t id, const struct tt_uuid *uuid) { diff --git a/src/box/box.h b/src/box/box.h index 044d929d4..c94e500ab 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -457,6 +457,20 @@ box_sequence_set(uint32_t seq_id, int64_t value); API_EXPORT int box_sequence_reset(uint32_t seq_id); +/** + * Push MessagePack data into a session data channel - socket, + * console or whatever is behind the session. Note, that + * successful push does not guarantee delivery in case it was sent + * into the network. Just like with write()/send() system calls. + * + * \param data begin of MessagePack to push + * \param data_end end of MessagePack to push + * \retval -1 on error (check box_error_last()) + * \retval 0 on success + */ +API_EXPORT int +box_session_push(const char *data, const char *data_end); + /** \endcond public */ /** diff --git a/src/box/call.c b/src/box/call.c index a46a61c3c..be4f71f59 100644 --- a/src/box/call.c +++ b/src/box/call.c @@ -64,17 +64,55 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size) return port->data; } +static int +port_msgpack_dump_msgpack(struct port *base, struct obuf *out) +{ + struct port_msgpack *port = (struct port_msgpack *) base; + assert(port->vtab == &port_msgpack_vtab); + size_t size = port->data_sz; + if (obuf_dup(out, port->data, size) == size) + return 0; + diag_set(OutOfMemory, size, "obuf_dup", "port->data"); + return -1; +} + extern void port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat); +extern const char * +port_msgpack_dump_plain(struct port *base, uint32_t *size); + +void +port_msgpack_destroy(struct port *base) +{ + struct port_msgpack *port = (struct port_msgpack *) base; + assert(port->vtab == &port_msgpack_vtab); + free(port->plain); +} + +char * +port_mspack_set_plain(struct port *base, const char *plain, uint32_t len) +{ + struct port_msgpack *port = (struct port_msgpack *) base; + assert(port->plain == NULL); + port->plain = (char *)malloc(len + 1); + if (port->plain == NULL) { + diag_set(OutOfMemory, len + 1, "malloc", "port->plain"); + return NULL; + } + memcpy(port->plain, plain, len); + port->plain[len] = 0; + return port->plain; +} + static const struct port_vtab port_msgpack_vtab = { - .dump_msgpack = NULL, + .dump_msgpack = port_msgpack_dump_msgpack, .dump_msgpack_16 = NULL, .dump_lua = port_msgpack_dump_lua, - .dump_plain = NULL, + .dump_plain = port_msgpack_dump_plain, .get_msgpack = port_msgpack_get_msgpack, .get_vdbemem = NULL, - .destroy = NULL, + .destroy = port_msgpack_destroy, }; int diff --git a/src/box/lua/console.c b/src/box/lua/console.c index bd454c269..396cb87f2 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -37,6 +37,7 @@ #include "lua/fiber.h" #include "fiber.h" #include "coio.h" +#include "lua/msgpack.h" #include "lua-yaml/lyaml.h" #include #include @@ -390,19 +391,17 @@ console_set_output_format(enum output_format output_format) } /** - * Dump port lua data with respect to output format: + * Dump Lua data to text with respect to output format: * YAML document tagged with !push! global tag or Lua string. - * @param port Port lua. + * @param L Lua state. * @param[out] size Size of the result. * - * @retval not NULL Tagged YAML document. + * @retval not NULL Tagged YAML document or Lua text. * @retval NULL Error. */ -const char * -port_lua_dump_plain(struct port *port, uint32_t *size) +static const char * +lua_dump_plain(struct lua_State *L, uint32_t *size) { - struct port_lua *port_lua = (struct port_lua *) port; - struct lua_State *L = port_lua->L; enum output_format fmt = console_get_output_format(); if (fmt == OUTPUT_FORMAT_YAML) { int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!", @@ -435,6 +434,68 @@ port_lua_dump_plain(struct port *port, uint32_t *size) return result; } +/** Plain text converter for port Lua data. */ +const char * +port_lua_dump_plain(struct port *base, uint32_t *size) +{ + return lua_dump_plain(((struct port_lua *)base)->L, size); +} + +/** + * A helper for port_msgpack_dump_plain() to execute it safely + * regarding Lua errors. + */ +static int +lua_port_msgpack_dump_plain(struct lua_State *L) +{ + char **result = (char **)lua_touserdata(L, lua_upvalueindex(1)); + struct port_msgpack *port = + (struct port_msgpack *)lua_touserdata(L, lua_upvalueindex(2)); + uint32_t *size = (uint32_t *)lua_touserdata(L, lua_upvalueindex(3)); + const char *data = port->data; + /* + * MessagePack -> Lua object -> YAML/Lua text. The middle + * is not really needed here, but there is no + * MessagePack -> YAML encoder yet. Neither + * MessagePack -> Lua text. + */ + luamp_decode(L, luaL_msgpack_default, &data); + data = lua_dump_plain(L, size); + if (data == NULL) { + *result = NULL; + return 0; + } + *result = port_mspack_set_plain((struct port *)port, data, *size); + return 0; + } + +/** Plain text converter for raw MessagePack. */ +const char * +port_msgpack_dump_plain(struct port *base, uint32_t *size) +{ + struct lua_State *L = tarantool_L; + char *result = NULL; + int top = lua_gettop(L); + (void) top; + lua_pushlightuserdata(L, &result); + lua_pushlightuserdata(L, base); + lua_pushlightuserdata(L, size); + lua_pushcclosure(L, lua_port_msgpack_dump_plain, 3); + if (lua_pcall(L, 0, 0, 0) != 0) { + /* + * Error string is pushed in case it was a Lua + * error. + */ + assert(lua_isstring(L, -1)); + diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1)); + lua_pop(L, 1); + assert(lua_gettop(L) == top); + return NULL; + } + assert(lua_gettop(L) == top); + return result; +} + /** * Push a tagged YAML document or a Lua string into a console * socket. diff --git a/src/box/port.h b/src/box/port.h index 9d3d02b3c..7ef5a2c63 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -86,6 +86,11 @@ struct port_msgpack { const struct port_vtab *vtab; const char *data; uint32_t data_sz; + /** + * Buffer for dump_plain() function. It is created during + * dump on demand and is deleted together with the port. + */ + char *plain; }; static_assert(sizeof(struct port_msgpack) <= sizeof(struct port), @@ -95,6 +100,16 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port), void port_msgpack_create(struct port *port, const char *data, uint32_t data_sz); +/** Destroy a MessagePack port. */ +void +port_msgpack_destroy(struct port *base); + +/** + * Set plain text version of data in the given port. It is copied. + */ +char * +port_mspack_set_plain(struct port *base, const char *plain, uint32_t len); + /** Port for storing the result of a Lua CALL/EVAL. */ struct port_lua { const struct port_vtab *vtab; diff --git a/test/box/function1.c b/test/box/function1.c index 87062d6a8..b2ce752a9 100644 --- a/test/box/function1.c +++ b/test/box/function1.c @@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end) fiber_sleep(0); return 0; } + +int +test_push(box_function_ctx_t *ctx, const char *args, const char *args_end) +{ + (void) ctx; + return box_session_push(args, args_end); +} diff --git a/test/box/push.result b/test/box/push.result index aebcb7501..f5d8fe563 100644 --- a/test/box/push.result +++ b/test/box/push.result @@ -563,3 +563,136 @@ box.schema.func.drop('do_long_and_push') box.session.on_disconnect(nil, on_disconnect) --- ... +-- +-- gh-4734: C API for session push. +-- +build_path = os.getenv("BUILDDIR") +--- +... +old_cpath = package.cpath +--- +... +package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath +--- +... +box.schema.func.create('function1.test_push', {language = 'C'}) +--- +... +box.schema.user.grant('guest', 'super') +--- +... +c = netbox.connect(box.cfg.listen) +--- +... +messages = {} +--- +... +c:call('function1.test_push', \ + {1, 2, 3}, \ + {on_push = table.insert, \ + on_push_ctx = messages}) +--- +- [] +... +messages +--- +- - [1, 2, 3] +... +c:close() +--- +... +-- +-- C can push to the console. +-- +ffi = require('ffi') +--- +... +cd = ffi.new('char[3]') +--- +... +cd[0] = 65 +--- +... +cd[1] = 0 +--- +... +cd[2] = 67 +--- +... +-- A string having 0 byte inside. Check that it is handled fine. +s = ffi.string(cd, 3) +--- +... +console = require('console') +--- +... +fio = require('fio') +--- +... +socket = require('socket') +--- +... +sock_path = fio.pathjoin(fio.cwd(), 'console.sock') +--- +... +_ = fio.unlink(sock_path) +--- +... +server = console.listen(sock_path) +--- +... +client = socket.tcp_connect('unix/', sock_path) +--- +... +_ = client:read({chunk = 128}) +--- +... +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +--- +... +client:read("\n...\n") +--- +- '%TAG !push! tag:tarantool.io/push,2018 + + --- [1, 2, 3, "A\0C"] + + ... + + ' +... +_ = client:read("\n...\n") +--- +... +-- Lua output format is supported too. +_ = client:write("\\set output lua\n") +--- +... +_ = client:read(";") +--- +... +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +--- +... +client:read(";") +--- +- '-- Push + + {1, 2, 3, "A\0C"};' +... +_ = client:read(";") +--- +... +client:close() +--- +- true +... +server:close() +--- +- true +... +box.schema.user.revoke('guest', 'super') +--- +... +box.schema.func.drop('function1.test_push') +--- +... diff --git a/test/box/push.test.lua b/test/box/push.test.lua index 7ae6f4a86..d2dc31db6 100644 --- a/test/box/push.test.lua +++ b/test/box/push.test.lua @@ -264,3 +264,54 @@ chan_push:put(true) chan_push:get() box.schema.func.drop('do_long_and_push') box.session.on_disconnect(nil, on_disconnect) + +-- +-- gh-4734: C API for session push. +-- +build_path = os.getenv("BUILDDIR") +old_cpath = package.cpath +package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath + +box.schema.func.create('function1.test_push', {language = 'C'}) +box.schema.user.grant('guest', 'super') +c = netbox.connect(box.cfg.listen) +messages = {} +c:call('function1.test_push', \ + {1, 2, 3}, \ + {on_push = table.insert, \ + on_push_ctx = messages}) +messages +c:close() +-- +-- C can push to the console. +-- +ffi = require('ffi') +cd = ffi.new('char[3]') +cd[0] = 65 +cd[1] = 0 +cd[2] = 67 +-- A string having 0 byte inside. Check that it is handled fine. +s = ffi.string(cd, 3) + +console = require('console') +fio = require('fio') +socket = require('socket') +sock_path = fio.pathjoin(fio.cwd(), 'console.sock') +_ = fio.unlink(sock_path) +server = console.listen(sock_path) +client = socket.tcp_connect('unix/', sock_path) +_ = client:read({chunk = 128}) +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +client:read("\n...\n") +_ = client:read("\n...\n") +-- Lua output format is supported too. +_ = client:write("\\set output lua\n") +_ = client:read(";") +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +client:read(";") +_ = client:read(";") +client:close() +server:close() + +box.schema.user.revoke('guest', 'super') +box.schema.func.drop('function1.test_push') -- 2.21.1 (Apple Git-122.3)