* [PATCH 0/5] session: introduce box.session.push
@ 2018-03-19 13:34 Vladislav Shpilevoy
2018-03-19 13:34 ` [PATCH 1/5] session: forbid creation from Lua binary and applier sessions Vladislav Shpilevoy
` (5 more replies)
0 siblings, 6 replies; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Branch: http://github.com/tarantool/tarantool/tree/gh-2677-box-session-push
Issue: https://github.com/tarantool/tarantool/issues/2677
Vladislav Shpilevoy (5):
session: forbid creation from Lua binary and applier sessions
lua: port console yaml formatting to C
Remove empty function declaration
session: introduce session_owner
session: introduce box.session.push
src/box/applier.cc | 4 +-
src/box/authentication.cc | 4 +-
src/box/authentication.h | 3 +-
src/box/box.cc | 4 +-
src/box/box.h | 2 +-
src/box/iproto.cc | 321 +++++++++++++++++++++++----
src/box/iproto_constants.c | 3 +-
src/box/iproto_constants.h | 8 +
src/box/lua/call.c | 1 +
src/box/lua/console.c | 42 ++++
src/box/lua/console.h | 8 +
src/box/lua/console.lua | 40 +---
src/box/lua/net_box.c | 37 ++++
src/box/lua/net_box.lua | 97 ++++++--
src/box/lua/session.c | 252 ++++++++++++++++++++-
src/box/port.c | 7 +
src/box/port.h | 15 ++
src/box/session.cc | 86 +++++++-
src/box/session.h | 106 +++++++--
src/box/vinyl.c | 3 +-
src/box/xrow.c | 40 +++-
src/box/xrow.h | 26 ++-
src/fio.c | 12 +
src/fio.h | 16 ++
src/lua/socket.h | 2 -
test/app-tap/console.test.lua | 9 +-
test/box-tap/session.test.lua | 12 +-
test/box/net.box.result | 2 +-
test/box/net.box.test.lua | 2 +-
test/box/push.result | 364 +++++++++++++++++++++++++++++++
test/box/push.test.lua | 163 ++++++++++++++
test/replication/before_replace.result | 14 ++
test/replication/before_replace.test.lua | 11 +
third_party/lua-yaml/lyaml.cc | 9 +-
third_party/lua-yaml/lyaml.h | 3 +
35 files changed, 1576 insertions(+), 152 deletions(-)
create mode 100644 test/box/push.result
create mode 100644 test/box/push.test.lua
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 1/5] session: forbid creation from Lua binary and applier sessions
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-20 13:20 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 2/5] lua: port console yaml formatting to C Vladislav Shpilevoy
` (4 subsequent siblings)
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Lua has no access to applier or binary sockets, and these session
types must be forbidden.
And after #2667 applier, binary, console and background session
owners will be incapsulated inside corresponding modules.
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/lua/session.c | 10 ++++++++--
test/box-tap/session.test.lua | 12 ++++++++++--
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index ad1c6cc25..d8e91bf1f 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -48,6 +48,13 @@ static const char *sessionlib_name = "box.session";
static int
lbox_session_create(struct lua_State *L)
{
+ enum session_type type =
+ STR2ENUM(session_type, luaL_optstring(L, 2, "console"));
+ if (type != SESSION_TYPE_CONSOLE && type != SESSION_TYPE_REPL &&
+ type != SESSION_TYPE_BACKGROUND) {
+ return luaL_error(L, "Can not start non-console or non-REPL "\
+ "session from Lua");
+ }
struct session *session = fiber_get_session(fiber());
if (session == NULL) {
int fd = luaL_optinteger(L, 1, -1);
@@ -56,8 +63,7 @@ lbox_session_create(struct lua_State *L)
return luaT_error(L);
}
/* If a session already exists, simply reset its type */
- session->type = STR2ENUM(session_type, luaL_optstring(L, 2, "console"));
-
+ session->type = type;
lua_pushnumber(L, session->id);
return 1;
}
diff --git a/test/box-tap/session.test.lua b/test/box-tap/session.test.lua
index 6fddced3c..ff952fd45 100755
--- a/test/box-tap/session.test.lua
+++ b/test/box-tap/session.test.lua
@@ -15,14 +15,22 @@ session = box.session
space = box.schema.space.create('tweedledum')
index = space:create_index('primary', { type = 'hash' })
-test:plan(53)
+test:plan(55)
+
+--
+-- Check that can start from Lua only either console or REPL.
+--
+local ok, err = pcall(box.internal.session.create, 100, "binary")
+test:is(err, "Can not start non-console or non-REPL session from Lua", "bad session type")
+ok, err = pcall(box.internal.session.create, 100, "applier")
+test:is(err, "Can not start non-console or non-REPL session from Lua", "bad session type")
---
--- Check that Tarantool creates ADMIN session for #! script
---
test:ok(session.exists(session.id()), "session is created")
test:isnil(session.peer(session.id()), "session.peer")
-local ok, err = pcall(session.exists)
+ok, err = pcall(session.exists)
test:is(err, "session.exists(sid): bad arguments", "exists bad args #1")
ok, err = pcall(session.exists, 1, 2, 3)
test:is(err, "session.exists(sid): bad arguments", "exists bad args #2")
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 2/5] lua: port console yaml formatting to C
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-19 13:34 ` [PATCH 1/5] session: forbid creation from Lua binary and applier sessions Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-20 17:51 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 3/5] Remove empty function declaration Vladislav Shpilevoy
` (3 subsequent siblings)
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Box.session.push() will be implemented in C lbox_session_push()
function, which will use port to encapsulate different session
types (binary, text) push() logic. And push() must be able to
either encode an argument into message pack, or format it as a
string using yaml. This formatting can not be done in Lua before
push() call, since it breaks push() virtualization.
Needed for #2677
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/lua/console.c | 42 ++++++++++++++++++++++++++++++++++++++++++
src/box/lua/console.lua | 34 ++++++++--------------------------
third_party/lua-yaml/lyaml.cc | 9 +++------
third_party/lua-yaml/lyaml.h | 3 +++
4 files changed, 56 insertions(+), 32 deletions(-)
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index d27d7ecac..450745c90 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -32,6 +32,7 @@
#include "box/lua/console.h"
#include "lua/utils.h"
#include "lua/fiber.h"
+#include "lua-yaml/lyaml.h"
#include "fiber.h"
#include "coio.h"
#include <lua.h>
@@ -328,6 +329,32 @@ lbox_console_add_history(struct lua_State *L)
return 0;
}
+static int
+lbox_console_format(struct lua_State *L)
+{
+ int arg_count = lua_gettop(L);
+ if (arg_count == 0) {
+ lua_pushstring(L, "---\n...\n");
+ return 1;
+ }
+ lua_getfield(L, LUA_REGISTRYINDEX, "_LOADED");
+ lua_getfield(L, -1, "console");
+ lua_getfield(L, -1, "formatter");
+ lua_getfield(L, -1, "encode");
+ lua_createtable(L, arg_count, 0);
+ for (int i = 0; i < arg_count; ++i) {
+ if (lua_isnil(L, i + 1))
+ lua_getfield(L, -3, "NULL");
+ else
+ lua_pushvalue(L, i + 1);
+ lua_rawseti(L, -2, i + 1);
+ }
+ lua_call(L, 1, 1);
+ lua_insert(L, -4);
+ lua_pop(L, 3);
+ return 1;
+}
+
void
tarantool_lua_console_init(struct lua_State *L)
{
@@ -336,6 +363,7 @@ tarantool_lua_console_init(struct lua_State *L)
{"save_history", lbox_console_save_history},
{"add_history", lbox_console_add_history},
{"completion_handler", lbox_console_completion_handler},
+ {"format", lbox_console_format},
{NULL, NULL}
};
luaL_register_module(L, "console", consolelib);
@@ -344,6 +372,20 @@ tarantool_lua_console_init(struct lua_State *L)
lua_getfield(L, -1, "completion_handler");
lua_pushcclosure(L, lbox_console_readline, 1);
lua_setfield(L, -2, "readline");
+
+ lua_yaml_new_formatter(L);
+ lua_getfield(L, -1, "cfg");
+ lua_createtable(L, 0, 4);
+ lua_pushboolean(L, true);
+ lua_setfield(L, -2, "encode_invalid_numbers");
+ lua_pushboolean(L, true);
+ lua_setfield(L, -2, "encode_load_metatables");
+ lua_pushboolean(L, true);
+ lua_setfield(L, -2, "encode_use_tostring");
+ lua_pushboolean(L, true);
+ lua_setfield(L, -2, "encode_invalid_as_nil");
+ lua_call(L, 1, 0);
+ lua_setfield(L, -2, "formatter");
}
/*
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index d49cf42be..b4199ef85 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -9,34 +9,11 @@ local errno = require('errno')
local urilib = require('uri')
local yaml = require('yaml')
--- admin formatter must be able to encode any Lua variable
-local formatter = yaml.new()
-formatter.cfg{
- encode_invalid_numbers = true;
- encode_load_metatables = true;
- encode_use_tostring = true;
- encode_invalid_as_nil = true;
-}
-
local function format(status, ...)
- -- When storing a nil in a Lua table, there is no way to
- -- distinguish nil value from no value. This is a trick to
- -- make sure yaml converter correctly
- local function wrapnull(v)
- return v == nil and formatter.NULL or v
- end
local err
if status then
- local count = select('#', ...)
- if count == 0 then
- return "---\n...\n"
- end
- local res = {}
- for i=1,count,1 do
- table.insert(res, wrapnull(select(i, ...)))
- end
-- serializer can raise an exception
- status, err = pcall(formatter.encode, res)
+ status, err = pcall(internal.format, ...)
if status then
return err
else
@@ -44,9 +21,12 @@ local function format(status, ...)
tostring(err)
end
else
- err = wrapnull(...)
+ err = ...
+ if err == nil then
+ err = box.NULL
+ end
end
- return formatter.encode({{error = err }})
+ return internal.format({ error = err })
end
--
@@ -395,4 +375,6 @@ package.loaded['console'] = {
on_start = on_start;
on_client_disconnect = on_client_disconnect;
completion_handler = internal.completion_handler;
+ formatter = internal.formatter;
+ format = internal.format;
}
diff --git a/third_party/lua-yaml/lyaml.cc b/third_party/lua-yaml/lyaml.cc
index 4d875fab4..25d29e01c 100644
--- a/third_party/lua-yaml/lyaml.cc
+++ b/third_party/lua-yaml/lyaml.cc
@@ -679,18 +679,15 @@ static int l_dump(lua_State *L) {
return 1;
}
-static int
-l_new(lua_State *L);
-
static const luaL_Reg yamllib[] = {
{ "encode", l_dump },
{ "decode", l_load },
- { "new", l_new },
+ { "new", lua_yaml_new_formatter },
{ NULL, NULL}
};
-static int
-l_new(lua_State *L)
+int
+lua_yaml_new_formatter(lua_State *L)
{
struct luaL_serializer *s = luaL_newserializer(L, NULL, yamllib);
s->has_compact = 1;
diff --git a/third_party/lua-yaml/lyaml.h b/third_party/lua-yaml/lyaml.h
index 650a2d747..9f9989f0c 100644
--- a/third_party/lua-yaml/lyaml.h
+++ b/third_party/lua-yaml/lyaml.h
@@ -10,6 +10,9 @@ extern "C" {
LUALIB_API int
luaopen_yaml(lua_State *L);
+int
+lua_yaml_new_formatter(lua_State *L);
+
#ifdef __cplusplus
}
#endif
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 3/5] Remove empty function declaration
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-19 13:34 ` [PATCH 1/5] session: forbid creation from Lua binary and applier sessions Vladislav Shpilevoy
2018-03-19 13:34 ` [PATCH 2/5] lua: port console yaml formatting to C Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-20 17:55 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 4/5] session: introduce session_owner Vladislav Shpilevoy
` (2 subsequent siblings)
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/lua/socket.h | 2 --
1 file changed, 2 deletions(-)
diff --git a/src/lua/socket.h b/src/lua/socket.h
index f669b5ada..ec4e79d87 100644
--- a/src/lua/socket.h
+++ b/src/lua/socket.h
@@ -45,8 +45,6 @@ lbox_socket_local_resolve(const char *host, const char *port,
struct sockaddr *addr, socklen_t *socklen);
int
lbox_socket_nonblock(int fh, int mode);
-int bsdsocket_sendto(int fh, const char *host, const char *port,
- const void *octets, size_t len, int flags);
#if defined(__cplusplus)
} /* extern "C" */
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 4/5] session: introduce session_owner
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-03-19 13:34 ` [PATCH 3/5] Remove empty function declaration Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-20 18:29 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 5/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-19 13:41 ` [tarantool-patches] [PATCH 0/5] " v.shpilevoy
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Session owner stores a session type specific data. For example,
IProto session has authentication salt, console session has
file descriptor.
For #2677 session owner of IProto and console will have push()
virtual function to do box.session.push, which implementation
depends on a session type.
Needed for #2677
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/applier.cc | 4 ++-
src/box/authentication.cc | 4 +--
src/box/authentication.h | 3 +-
src/box/box.cc | 4 +--
src/box/box.h | 2 +-
src/box/iproto.cc | 86 +++++++++++++++++++++++++++++++++++++++------
src/box/lua/session.c | 73 +++++++++++++++++++++++++++++++++-----
src/box/session.cc | 72 +++++++++++++++++++++++++++++++++-----
src/box/session.h | 89 ++++++++++++++++++++++++++++++++++++++++-------
src/box/vinyl.c | 3 +-
10 files changed, 293 insertions(+), 47 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6bfe5a99a..581139509 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -533,7 +533,9 @@ applier_f(va_list ap)
* Set correct session type for use in on_replace()
* triggers.
*/
- current_session()->type = SESSION_TYPE_APPLIER;
+ struct session_owner applier_owner;
+ session_owner_create(&applier_owner, SESSION_TYPE_APPLIER);
+ session_set_owner(current_session(), &applier_owner);
/* Re-connect loop */
while (!fiber_is_cancelled()) {
diff --git a/src/box/authentication.cc b/src/box/authentication.cc
index fef549c55..811974cb9 100644
--- a/src/box/authentication.cc
+++ b/src/box/authentication.cc
@@ -37,7 +37,7 @@
static char zero_hash[SCRAMBLE_SIZE];
void
-authenticate(const char *user_name, uint32_t len,
+authenticate(const char *user_name, uint32_t len, const char *salt,
const char *tuple)
{
struct user *user = user_find_by_name_xc(user_name, len);
@@ -84,7 +84,7 @@ authenticate(const char *user_name, uint32_t len,
"invalid scramble size");
}
- if (scramble_check(scramble, session->salt, user->def->hash2)) {
+ if (scramble_check(scramble, salt, user->def->hash2)) {
auth_res.is_authenticated = false;
if (session_run_on_auth_triggers(&auth_res) != 0)
diag_raise();
diff --git a/src/box/authentication.h b/src/box/authentication.h
index e91fe0a0e..9935e3548 100644
--- a/src/box/authentication.h
+++ b/src/box/authentication.h
@@ -45,6 +45,7 @@ struct on_auth_trigger_ctx {
void
-authenticate(const char *user_name, uint32_t len, const char *tuple);
+authenticate(const char *user_name, uint32_t len, const char *salt,
+ const char *tuple);
#endif /* INCLUDES_TARANTOOL_BOX_AUTHENTICATION_H */
diff --git a/src/box/box.cc b/src/box/box.cc
index cb3199624..9fb85c04f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1233,7 +1233,7 @@ box_on_join(const tt_uuid *instance_uuid)
}
void
-box_process_auth(struct auth_request *request)
+box_process_auth(struct auth_request *request, const char *salt)
{
rmean_collect(rmean_box, IPROTO_AUTH, 1);
@@ -1243,7 +1243,7 @@ box_process_auth(struct auth_request *request)
const char *user = request->user_name;
uint32_t len = mp_decode_strl(&user);
- authenticate(user, len, request->scramble);
+ authenticate(user, len, salt, request->scramble);
}
void
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..84899cc13 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -150,7 +150,7 @@ box_reset_stat(void);
} /* extern "C" */
void
-box_process_auth(struct auth_request *request);
+box_process_auth(struct auth_request *request, const char *salt);
void
box_process_join(struct ev_io *io, struct xrow_header *header);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a75127a90..ad4b1a757 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -59,6 +59,63 @@
#include "replication.h" /* instance_uuid */
#include "iproto_constants.h"
#include "rmean.h"
+#include "random.h"
+
+enum { IPROTO_SALT_SIZE = 32 };
+
+/** Owner of binary IProto sessions. */
+struct iproto_session_owner {
+ struct session_owner base;
+ /** Authentication salt. */
+ char salt[IPROTO_SALT_SIZE];
+ /** IProto connection. */
+ struct iproto_connection *connection;
+};
+
+static struct session_owner *
+iproto_session_owner_dup(struct session_owner *owner);
+
+static int
+iproto_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab iproto_session_owner_vtab = {
+ /* .dup = */ iproto_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ iproto_session_owner_fd,
+};
+
+static struct session_owner *
+iproto_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &iproto_session_owner_vtab);
+ size_t size = sizeof(struct iproto_session_owner);
+ struct session_owner *dup = (struct session_owner *) malloc(size);
+ if (dup == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "iproto_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, size);
+ return dup;
+}
+
+static inline void
+iproto_session_owner_create(struct iproto_session_owner *owner,
+ struct iproto_connection *connection)
+{
+ owner->base.type = SESSION_TYPE_BINARY;
+ owner->base.vtab = &iproto_session_owner_vtab;
+ owner->connection = connection;
+ random_bytes(owner->salt, IPROTO_SALT_SIZE);
+}
+
+static inline char *
+iproto_session_salt(struct session *session)
+{
+ struct iproto_session_owner *session_owner =
+ (struct iproto_session_owner *) session->owner;
+ assert(session_owner->base.vtab == &iproto_session_owner_vtab);
+ return session_owner->salt;
+}
/* The number of iproto messages in flight */
enum { IPROTO_MSG_MAX = 768 };
@@ -368,6 +425,15 @@ struct iproto_connection
static struct mempool iproto_connection_pool;
static RLIST_HEAD(stopped_connections);
+static int
+iproto_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &iproto_session_owner_vtab);
+ const struct iproto_session_owner *session_owner =
+ ((const struct iproto_session_owner *) owner);
+ return session_owner->connection->input.fd;
+}
+
/**
* Return true if we have not enough spare messages
* in the message pool. Disconnect messages are
@@ -1033,11 +1099,6 @@ tx_process_disconnect(struct cmsg *m)
struct iproto_connection *con = msg->connection;
if (con->session) {
tx_fiber_init(con->session, 0);
- /*
- * The socket is already closed in iproto thread,
- * prevent box.session.peer() from using it.
- */
- con->session->fd = -1;
if (! rlist_empty(&session_on_disconnect))
session_run_on_disconnect_triggers(con->session);
session_destroy(con->session);
@@ -1328,8 +1389,9 @@ tx_process_misc(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out = msg->connection->tx.p_obuf;
+ struct session *session = msg->connection->session;
- tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_fiber_init(session, msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1337,7 +1399,8 @@ tx_process_misc(struct cmsg *m)
try {
switch (msg->header.type) {
case IPROTO_AUTH:
- box_process_auth(&msg->auth);
+ box_process_auth(&msg->auth,
+ iproto_session_salt(session));
iproto_reply_ok_xc(out, msg->header.sync,
::schema_version);
break;
@@ -1481,15 +1544,18 @@ tx_process_connect(struct cmsg *m)
struct iproto_connection *con = msg->connection;
struct obuf *out = msg->connection->tx.p_obuf;
try { /* connect. */
- con->session = session_create(con->input.fd, SESSION_TYPE_BINARY);
+ struct iproto_session_owner owner;
+ iproto_session_owner_create(&owner, con);
+ con->session = session_create((struct session_owner *) &owner);
if (con->session == NULL)
diag_raise();
tx_fiber_init(con->session, 0);
static __thread char greeting[IPROTO_GREETING_SIZE];
/* TODO: dirty read from tx thread */
struct tt_uuid uuid = INSTANCE_UUID;
- greeting_encode(greeting, tarantool_version_id(),
- &uuid, con->session->salt, SESSION_SEED_SIZE);
+ greeting_encode(greeting, tarantool_version_id(), &uuid,
+ iproto_session_salt(con->session),
+ IPROTO_SALT_SIZE);
obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
if (! rlist_empty(&session_on_connect)) {
if (session_run_on_connect_triggers(con->session) != 0)
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index d8e91bf1f..af8411068 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -42,6 +42,54 @@
#include "box/user.h"
#include "box/schema.h"
+/** Owner of a console session. */
+struct console_session_owner {
+ struct session_owner base;
+ /** Console socket descriptor. Expects text data. */
+ int fd;
+};
+
+static struct session_owner *
+console_session_owner_dup(struct session_owner *owner);
+
+static int
+console_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab console_session_owner_vtab = {
+ /* .dup = */ console_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ console_session_owner_fd,
+};
+
+static struct session_owner *
+console_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &console_session_owner_vtab);
+ size_t size = sizeof(struct console_session_owner);
+ struct session_owner *dup = (struct session_owner *) malloc(size);
+ if (dup == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "console_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, size);
+ return dup;
+}
+
+static int
+console_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &console_session_owner_vtab);
+ return ((const struct console_session_owner *) owner)->fd;
+}
+
+static inline void
+console_session_owner_create(struct console_session_owner *owner, int fd)
+{
+ owner->base.type = SESSION_TYPE_CONSOLE;
+ owner->base.vtab = &console_session_owner_vtab;
+ owner->fd = fd;
+}
+
static const char *sessionlib_name = "box.session";
/* Create session and pin it to fiber */
@@ -56,14 +104,24 @@ lbox_session_create(struct lua_State *L)
"session from Lua");
}
struct session *session = fiber_get_session(fiber());
+ int fd = luaL_optinteger(L, 1, -1);
if (session == NULL) {
- int fd = luaL_optinteger(L, 1, -1);
- session = session_create_on_demand(fd);
+ struct session_owner owner;
+ session_owner_create(&owner, type);
+ session = session_create_on_demand(&owner);
if (session == NULL)
return luaT_error(L);
}
- /* If a session already exists, simply reset its type */
- session->type = type;
+ /* If a session already exists, simply reset its owner. */
+ if (type == SESSION_TYPE_CONSOLE) {
+ struct console_session_owner owner;
+ console_session_owner_create(&owner, fd);
+ session_set_owner(session, (struct session_owner *) &owner);
+ } else {
+ struct session_owner owner;
+ session_owner_create(&owner, type);
+ session_set_owner(session, &owner);
+ }
lua_pushnumber(L, session->id);
return 1;
}
@@ -90,7 +148,7 @@ lbox_session_id(struct lua_State *L)
static int
lbox_session_type(struct lua_State *L)
{
- lua_pushstring(L, session_type_strs[current_session()->type]);
+ lua_pushstring(L, session_type_strs[session_type(current_session())]);
return 1;
}
@@ -237,7 +295,7 @@ lbox_session_fd(struct lua_State *L)
struct session *session = session_find(sid);
if (session == NULL)
luaL_error(L, "session.fd(): session does not exist");
- lua_pushinteger(L, session->fd);
+ lua_pushinteger(L, session_fd(session));
return 1;
}
@@ -251,7 +309,6 @@ lbox_session_peer(struct lua_State *L)
if (lua_gettop(L) > 1)
luaL_error(L, "session.peer(sid): bad arguments");
- int fd;
struct session *session;
if (lua_gettop(L) == 1)
session = session_find(luaL_checkint(L, 1));
@@ -259,7 +316,7 @@ lbox_session_peer(struct lua_State *L)
session = current_session();
if (session == NULL)
luaL_error(L, "session.peer(): session does not exist");
- fd = session->fd;
+ int fd = session_fd(session);
if (fd < 0) {
lua_pushnil(L); /* no associated peer */
return 1;
diff --git a/src/box/session.cc b/src/box/session.cc
index 0b0c5ae44..908ec9c4e 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -33,7 +33,6 @@
#include "memory.h"
#include "assoc.h"
#include "trigger.h"
-#include "random.h"
#include "user.h"
#include "error.h"
@@ -54,6 +53,48 @@ RLIST_HEAD(session_on_connect);
RLIST_HEAD(session_on_disconnect);
RLIST_HEAD(session_on_auth);
+static struct session_owner *
+generic_session_owner_dup(struct session_owner *owner);
+
+static int
+generic_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab generic_session_owner_vtab = {
+ /* .dup = */ generic_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ generic_session_owner_fd,
+};
+
+static struct session_owner *
+generic_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &generic_session_owner_vtab);
+ struct session_owner *dup =
+ (struct session_owner *) malloc(sizeof(*dup));
+ if (dup == NULL) {
+ diag_set(OutOfMemory, sizeof(*dup), "malloc",
+ "default_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, sizeof(*dup));
+ return dup;
+}
+
+static int
+generic_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &generic_session_owner_vtab);
+ (void) owner;
+ return -1;
+}
+
+void
+session_owner_create(struct session_owner *owner, enum session_type type)
+{
+ owner->type = type;
+ owner->vtab = &generic_session_owner_vtab;
+}
+
static inline uint64_t
sid_max()
{
@@ -80,7 +121,7 @@ session_on_stop(struct trigger *trigger, void * /* event */)
}
struct session *
-session_create(int fd, enum session_type type)
+session_create(struct session_owner *owner)
{
struct session *session =
(struct session *) mempool_alloc(&session_pool);
@@ -90,14 +131,15 @@ session_create(int fd, enum session_type type)
return NULL;
}
session->id = sid_max();
- session->fd = fd;
+ session->owner = session_owner_dup(owner);
+ if (session->owner == NULL) {
+ mempool_free(&session_pool, session);
+ return NULL;
+ }
session->sync = 0;
- session->type = type;
/* For on_connect triggers. */
credentials_init(&session->credentials, guest_user->auth_token,
guest_user->def->uid);
- if (fd >= 0)
- random_bytes(session->salt, SESSION_SEED_SIZE);
struct mh_i64ptr_node_t node;
node.key = session->id;
node.val = session;
@@ -105,6 +147,7 @@ session_create(int fd, enum session_type type)
mh_int_t k = mh_i64ptr_put(session_registry, &node, NULL, NULL);
if (k == mh_end(session_registry)) {
+ session_owner_delete(owner);
mempool_free(&session_pool, session);
diag_set(OutOfMemory, 0, "session hash", "new session");
return NULL;
@@ -112,13 +155,25 @@ session_create(int fd, enum session_type type)
return session;
}
+int
+session_set_owner(struct session *session, struct session_owner *new_owner)
+{
+ struct session_owner *dup = session_owner_dup(new_owner);
+ if (dup == NULL)
+ return -1;
+ if (session->owner != NULL)
+ session_owner_delete(session->owner);
+ session->owner = dup;
+ return 0;
+}
+
struct session *
-session_create_on_demand(int fd)
+session_create_on_demand(struct session_owner *owner)
{
assert(fiber_get_session(fiber()) == NULL);
/* Create session on demand */
- struct session *s = session_create(fd, SESSION_TYPE_BACKGROUND);
+ struct session *s = session_create(owner);
if (s == NULL)
return NULL;
s->fiber_on_stop = {
@@ -186,6 +241,7 @@ session_destroy(struct session *session)
{
struct mh_i64ptr_node_t node = { session->id, NULL };
mh_i64ptr_remove(session_registry, &node, NULL);
+ session_owner_delete(session->owner);
mempool_free(&session_pool, session);
}
diff --git a/src/box/session.h b/src/box/session.h
index 4f9235ea8..105dcab17 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -47,8 +47,6 @@ session_init();
void
session_free();
-enum { SESSION_SEED_SIZE = 32, SESSION_DELIM_SIZE = 16 };
-
enum session_type {
SESSION_TYPE_BACKGROUND = 0,
SESSION_TYPE_BINARY,
@@ -60,6 +58,49 @@ enum session_type {
extern const char *session_type_strs[];
+struct session_owner_vtab;
+
+/**
+ * Object to store session type specific data. For example, IProto
+ * stores iproto_connection, console stores file descriptor.
+ */
+struct session_owner {
+ /** Session type. */
+ enum session_type type;
+ /** Virtual session owner methods. */
+ const struct session_owner_vtab *vtab;
+};
+
+struct session_owner_vtab {
+ /** Allocate a duplicate of an owner. */
+ struct session_owner *(*dup)(struct session_owner *);
+ /** Destroy an owner, and free its memory. */
+ void (*free)(struct session_owner *);
+ /** Get the descriptor of an owner, if has. Else -1. */
+ int (*fd)(const struct session_owner *);
+};
+
+static inline struct session_owner *
+session_owner_dup(struct session_owner *owner)
+{
+ return owner->vtab->dup(owner);
+}
+
+static inline void
+session_owner_delete(struct session_owner *owner)
+{
+ owner->vtab->free(owner);
+}
+
+/**
+ * Initialize a session owner with @a type and with default
+ * virtual methods.
+ * @param owner Session owner to initialize. Is copied inside.
+ * @param type Session type.
+ */
+void
+session_owner_create(struct session_owner *owner, enum session_type type);
+
/**
* Abstraction of a single user session:
* for now, only provides accounting of established
@@ -72,10 +113,8 @@ extern const char *session_type_strs[];
struct session {
/** Session id. */
uint64_t id;
- /** File descriptor - socket of the connected peer.
- * Only if the session has a peer.
- */
- int fd;
+ /** Session owner with type specific data. */
+ struct session_owner *owner;
/**
* For iproto requests, we set this field
* to the value of packet sync. Since the
@@ -85,15 +124,24 @@ struct session {
* the first yield.
*/
uint64_t sync;
- enum session_type type;
- /** Authentication salt. */
- char salt[SESSION_SEED_SIZE];
/** Session user id and global grants */
struct credentials credentials;
/** Trigger for fiber on_stop to cleanup created on-demand session */
struct trigger fiber_on_stop;
};
+static inline enum session_type
+session_type(const struct session *session)
+{
+ return session->owner->type;
+}
+
+static inline int
+session_fd(const struct session *session)
+{
+ return session->owner->vtab->fd(session->owner);
+}
+
/**
* Find a session by id.
*/
@@ -154,7 +202,7 @@ extern struct credentials admin_credentials;
* trigger to destroy it when this fiber ends.
*/
struct session *
-session_create_on_demand(int fd);
+session_create_on_demand(struct session_owner *owner);
/*
* When creating a new fiber, the database (box)
@@ -171,7 +219,9 @@ current_session()
{
struct session *session = fiber_get_session(fiber());
if (session == NULL) {
- session = session_create_on_demand(-1);
+ struct session_owner owner;
+ session_owner_create(&owner, SESSION_TYPE_BACKGROUND);
+ session = session_create_on_demand(&owner);
if (session == NULL)
diag_raise();
}
@@ -191,7 +241,9 @@ effective_user()
(struct credentials *) fiber_get_key(fiber(),
FIBER_KEY_USER);
if (u == NULL) {
- session_create_on_demand(-1);
+ struct session_owner owner;
+ session_owner_create(&owner, SESSION_TYPE_BACKGROUND);
+ session_create_on_demand(&owner);
u = (struct credentials *) fiber_get_key(fiber(),
FIBER_KEY_USER);
}
@@ -216,7 +268,18 @@ session_storage_cleanup(int sid);
* trigger fails or runs out of resources.
*/
struct session *
-session_create(int fd, enum session_type type);
+session_create(struct session_owner *owner);
+
+/**
+ * Set new owner of a session.
+ * @param session Session to change owner.
+ * @param new_owner New session owner. Is duplicated inside.
+ *
+ * @retval -1 Memory error.
+ * @retval 0 Success.
+ */
+int
+session_set_owner(struct session *session, struct session_owner *new_owner);
/**
* Destroy a session.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e0c30757c..7ae12f597 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2453,7 +2453,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
* available for the admin to track the lag so let the applier
* wait as long as necessary for memory dump to complete.
*/
- double timeout = (current_session()->type != SESSION_TYPE_APPLIER ?
+ double timeout = (session_type(current_session()) !=
+ SESSION_TYPE_APPLIER ?
env->timeout : TIMEOUT_INFINITY);
/*
* Reserve quota needed by the transaction before allocating
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 5/5] session: introduce box.session.push
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-03-19 13:34 ` [PATCH 4/5] session: introduce session_owner Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-21 9:10 ` Vladimir Davydov
2018-03-19 13:41 ` [tarantool-patches] [PATCH 0/5] " v.shpilevoy
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Box.session.push() allows to send a message to a client with no
finishing a main request.
Tarantool supports two push types: via text protocol and via
binary protocol. Text protocol push message contains "push:"
prefix followed by a YAML formatted text, where as a final
response always has '---' prefix. To catch pushed messages on a
client side use console.connect() on_push options which takes
a function with a single argument - message.
IProto message is encoded just like a regular response, but
instead of IPROTO_DATA it has IPROTO_PUSH with a single element -
pushed MessagePack encoded data.
Text push is trivial - it is just blocking write into a socket
with a prefix. Binary push is more complex.
TX thread to notify IProto thread about new data in obuf sends
a message 'push_msg'. IProto thread, got this message, notifies
libev about new data, and then sends 'push_msg' back with
updated write position. TX thread, received the message back,
updates its version of a write position. If IProto will not send
a write position, then TX will write to the same obuf again
and again, because it can not know that IProto already flushed
another obuf.
To avoid multiple 'push_msg' in fly between IProto and TX, the
only one 'push_msg' per connection is used. To deliver pushes,
appeared when 'push_msg' was in fly, TX thread sets a flag every
time when sees, that 'push_msg' is sent, and there is a new push.
When 'push_msg' returns, it checks this flag, and if it is set,
then the IProto is notified again.
Closes #2677
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/iproto.cc | 235 +++++++++++++++++---
src/box/iproto_constants.c | 3 +-
src/box/iproto_constants.h | 8 +
src/box/lua/call.c | 1 +
src/box/lua/console.c | 2 +-
src/box/lua/console.h | 8 +
src/box/lua/console.lua | 6 +-
src/box/lua/net_box.c | 37 ++++
src/box/lua/net_box.lua | 97 ++++++--
src/box/lua/session.c | 171 +++++++++++++++
src/box/port.c | 7 +
src/box/port.h | 15 ++
src/box/session.cc | 14 ++
src/box/session.h | 17 ++
src/box/xrow.c | 40 +++-
src/box/xrow.h | 26 ++-
src/fio.c | 12 +
src/fio.h | 16 ++
test/app-tap/console.test.lua | 9 +-
test/box/net.box.result | 2 +-
test/box/net.box.test.lua | 2 +-
test/box/push.result | 364 +++++++++++++++++++++++++++++++
test/box/push.test.lua | 163 ++++++++++++++
test/replication/before_replace.result | 14 ++
test/replication/before_replace.test.lua | 11 +
25 files changed, 1211 insertions(+), 69 deletions(-)
create mode 100644 test/box/push.result
create mode 100644 test/box/push.test.lua
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index ad4b1a757..4408293c1 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,6 +63,45 @@
enum { IPROTO_SALT_SIZE = 32 };
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+ struct obuf *obuf;
+ struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+ wpos->obuf = out;
+ wpos->svp = obuf_create_svp(out);
+}
+
+/* {{{ IPROTO_PUSH declarations. */
+
+/**
+ * Message to notify IProto thread about new data in an output
+ * buffer. Struct iproto_msg is not used here, because push
+ * notification can be much more compact: it does not have
+ * request, ibuf, length, flags ...
+ */
+struct iproto_push_msg {
+ struct cmsg base;
+ /**
+ * Before sending to IProto thread, the wpos is set to a
+ * current position in an output buffer. Before IProto
+ * returns the message to TX, it sets wpos to the last
+ * flushed position (works like iproto_msg.wpos).
+ */
+ struct iproto_wpos wpos;
+ /** IProto connection to push into. */
+ struct iproto_connection *connection;
+};
+
/** Owner of binary IProto sessions. */
struct iproto_session_owner {
struct session_owner base;
@@ -70,6 +109,37 @@ struct iproto_session_owner {
char salt[IPROTO_SALT_SIZE];
/** IProto connection. */
struct iproto_connection *connection;
+ /**
+ * Is_push_in_progress is set, when a push_msg is sent to
+ * IProto thread, and reset, when the message is returned
+ * to TX. If a new push sees, that a push_msg is already
+ * sent to IProto, then has_new_pushes is set. After push
+ * notification is returned to TX, it checks
+ * has_new_pushes. If it is set, then the notification is
+ * sent again. This ping-pong continues, until TX stopped
+ * pushing. It allows to
+ * 1) avoid multiple push_msg from one session in fly,
+ * 2) do not block push() until a previous push() is
+ * finished.
+ *
+ * IProto TX
+ * -------------------------------------------------------
+ * + [push message]
+ * start socket <--- notification ----
+ * write
+ * + [push message]
+ * + [push message]
+ * ...
+ * end socket
+ * write ----------------> check for new
+ * pushes - found
+ * <--- notification ---
+ * ....
+ */
+ bool has_new_pushes;
+ bool is_push_in_progress;
+ /** Push notification for IProto thread. */
+ struct iproto_push_msg push_msg;
};
static struct session_owner *
@@ -78,10 +148,27 @@ iproto_session_owner_dup(struct session_owner *owner);
static int
iproto_session_owner_fd(const struct session_owner *owner);
+/**
+ * Push a message from @a port to a remote client.
+ * @param owner IProto session owner.
+ * @param sync Message sync. Must be the same as a request sync to
+ * be able to detect their tie on a client side.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval 0 Success, a message is wrote to an output buffer. But
+ * it is not guaranteed, that it will be sent
+ * successfully.
+ */
+static int
+iproto_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static const struct session_owner_vtab iproto_session_owner_vtab = {
/* .dup = */ iproto_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ iproto_session_owner_fd,
+ /* .push = */ iproto_session_owner_push,
};
static struct session_owner *
@@ -105,6 +192,9 @@ iproto_session_owner_create(struct iproto_session_owner *owner,
owner->base.type = SESSION_TYPE_BINARY;
owner->base.vtab = &iproto_session_owner_vtab;
owner->connection = connection;
+ owner->has_new_pushes = false;
+ owner->is_push_in_progress = false;
+ owner->push_msg.connection = connection;
random_bytes(owner->salt, IPROTO_SALT_SIZE);
}
@@ -117,6 +207,8 @@ iproto_session_salt(struct session *session)
return session_owner->salt;
}
+/** }}} */
+
/* The number of iproto messages in flight */
enum { IPROTO_MSG_MAX = 768 };
@@ -164,24 +256,6 @@ iproto_reset_input(struct ibuf *ibuf)
}
}
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
- struct obuf *obuf;
- struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
- wpos->obuf = out;
- wpos->svp = obuf_create_svp(out);
-}
-
/* {{{ iproto_msg - declaration */
/**
@@ -1168,15 +1242,16 @@ tx_discard_input(struct iproto_msg *msg)
* not, the empty buffer is selected.
* - if neither of the buffers are empty, the function
* does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ * thread.
*/
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_msg(struct iproto_connection *con, struct iproto_wpos *wpos)
{
- struct iproto_msg *msg = (struct iproto_msg *) m;
- struct iproto_connection *con = msg->connection;
-
struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
- if (msg->wpos.obuf == con->tx.p_obuf) {
+ if (wpos->obuf == con->tx.p_obuf) {
/*
* We got a message advancing the buffer which
* is being appended to. The previous buffer is
@@ -1194,7 +1269,6 @@ tx_accept_msg(struct cmsg *m)
*/
con->tx.p_obuf = prev;
}
- return msg;
}
/**
@@ -1217,7 +1291,8 @@ tx_reply_error(struct iproto_msg *msg)
static void
tx_reply_iproto_error(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&msg->diag),
msg->header.sync, ::schema_version);
@@ -1227,8 +1302,8 @@ tx_reply_iproto_error(struct cmsg *m)
static void
tx_process1(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
- struct obuf *out = msg->connection->tx.p_obuf;
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
@@ -1236,8 +1311,11 @@ tx_process1(struct cmsg *m)
struct tuple *tuple;
struct obuf_svp svp;
- if (box_process1(&msg->dml, &tuple) ||
- iproto_prepare_select(out, &svp))
+ struct obuf *out;
+ if (box_process1(&msg->dml, &tuple) != 0)
+ goto error;
+ out = msg->connection->tx.p_obuf;
+ if (iproto_prepare_select(out, &svp) != 0)
goto error;
if (tuple && tuple_to_obuf(tuple, out))
goto error;
@@ -1252,8 +1330,9 @@ error:
static void
tx_process_select(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
- struct obuf *out = msg->connection->tx.p_obuf;
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
+ struct obuf *out;
struct obuf_svp svp;
struct port port;
int count;
@@ -1270,6 +1349,7 @@ tx_process_select(struct cmsg *m)
req->key, req->key_end, &port);
if (rc < 0)
goto error;
+ out = msg->connection->tx.p_obuf;
if (iproto_prepare_select(out, &svp) != 0) {
port_destroy(&port);
goto error;
@@ -1305,7 +1385,8 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
static void
tx_process_call(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1387,7 +1468,8 @@ error:
static void
tx_process_misc(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
struct obuf *out = msg->connection->tx.p_obuf;
struct session *session = msg->connection->session;
@@ -1428,8 +1510,9 @@ error:
static void
tx_process_join_subscribe(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
+ tx_accept_msg(con, &msg->wpos);
tx_fiber_init(con->session, msg->header.sync);
@@ -1612,6 +1695,88 @@ static const struct cmsg_hop connect_route[] = {
/** }}} */
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param owner IProto session owner.
+ */
+static void
+tx_begin_push(struct iproto_session_owner *owner);
+
+/**
+ * Create an event to send push.
+ * @param m IProto push message.
+ */
+static void
+net_push_msg(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_connection *con = msg->connection;
+ con->wend = msg->wpos;
+ msg->wpos = con->wpos;
+ if (evio_has_fd(&con->output) && !ev_is_active(&con->output))
+ ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * After a message notifies IProto thread about pushed data, TX
+ * thread can already have a new push in one of obufs. This
+ * function checks for new pushes and possibly re-sends push
+ * notification to IProto thread.
+ */
+static void
+tx_check_for_new_push(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_session_owner *owner =
+ container_of(msg, struct iproto_session_owner, push_msg);
+ tx_accept_msg(msg->connection, &msg->wpos);
+ owner->is_push_in_progress = false;
+ if (owner->has_new_pushes)
+ tx_begin_push(owner);
+}
+
+static const struct cmsg_hop push_route[] = {
+ { net_push_msg, &tx_pipe },
+ { tx_check_for_new_push, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_session_owner *owner)
+{
+ assert(! owner->is_push_in_progress);
+ cmsg_init((struct cmsg *) &owner->push_msg, push_route);
+ iproto_wpos_create(&owner->push_msg.wpos, owner->connection->tx.p_obuf);
+ owner->has_new_pushes = false;
+ owner->is_push_in_progress = true;
+ cpipe_push(&net_pipe, (struct cmsg *) &owner->push_msg);
+}
+
+static int
+iproto_session_owner_push(struct session_owner *session_owner, uint64_t sync,
+ struct port *port)
+{
+ struct iproto_session_owner *owner =
+ (struct iproto_session_owner *) session_owner;
+ struct iproto_connection *con = owner->connection;
+ struct obuf_svp svp;
+ if (iproto_prepare_push(con->tx.p_obuf, &svp) != 0)
+ return -1;
+ if (port_dump(port, con->tx.p_obuf) != 0) {
+ obuf_rollback_to_svp(con->tx.p_obuf, &svp);
+ return -1;
+ }
+ iproto_reply_push(con->tx.p_obuf, &svp, sync, ::schema_version);
+ if (! owner->is_push_in_progress)
+ tx_begin_push(owner);
+ else
+ owner->has_new_pushes = true;
+ return 0;
+}
+
+/** }}} */
+
/**
* Create a connection and start input.
*/
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index cd7b1d03b..893275c7b 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -174,7 +174,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
NULL, /* 0x2e */
NULL, /* 0x2f */
"data", /* 0x30 */
- "error" /* 0x31 */
+ "error", /* 0x31 */
+ "push", /* 0x32 */
};
const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 951842485..05d262a0b 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -79,6 +79,14 @@ enum iproto_key {
/* Leave a gap between request keys and response keys */
IPROTO_DATA = 0x30,
IPROTO_ERROR = 0x31,
+ /**
+ * Tarantool supports two push types: binary and text.
+ * A text push can be distinguished from a response by a
+ * prefix "push:".
+ * Binary push is encoded using IPROTO_PUSH key in a
+ * message body, which replaces IPROTO_DATA.
+ */
+ IPROTO_PUSH = 0x32,
IPROTO_KEY_MAX
};
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index be13812aa..fc70ed430 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -418,6 +418,7 @@ port_lua_destroy(struct port *base)
static const struct port_vtab port_lua_vtab = {
.dump = port_lua_dump,
.dump_16 = port_lua_dump_16,
+ .dump_raw = NULL,
.destroy = port_lua_destroy,
};
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 450745c90..3bc4b6425 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -329,7 +329,7 @@ lbox_console_add_history(struct lua_State *L)
return 0;
}
-static int
+int
lbox_console_format(struct lua_State *L)
{
int arg_count = lua_gettop(L);
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..92a4af035 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -39,6 +39,14 @@ struct lua_State;
void
tarantool_lua_console_init(struct lua_State *L);
+/**
+ * Encode Lua object into YAML string.
+ * @param Lua object to encode on top of a stack.
+ * @retval Lua string.
+ */
+int
+lbox_console_format(struct lua_State *L);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index b4199ef85..295bfcbaa 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -59,7 +59,7 @@ end
--
-- Evaluate command on remote instance
--
-local function remote_eval(self, line)
+local function remote_eval(self, line, opts)
if not line or self.remote.state ~= 'active' then
local err = self.remote.error
self.remote:close()
@@ -74,7 +74,7 @@ local function remote_eval(self, line)
--
-- execute line
--
- local ok, res = pcall(self.remote.eval, self.remote, line)
+ local ok, res = pcall(self.remote.eval, self.remote, line, opts)
return ok and res or format(false, res)
end
@@ -310,7 +310,7 @@ local function connect(uri, opts)
-- override methods
self.remote = remote
- self.eval = remote_eval
+ self.eval = function(s, l) return remote_eval(s, l, {on_push = opts.on_push}) end
self.prompt = string.format("%s:%s", self.remote.host, self.remote.port)
self.completion = function (str, pos1, pos2)
local c = string.format(
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index db2d2dbb4..9fa7935f7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -554,6 +554,41 @@ handle_error:
return 2;
}
+/**
+ * Search for a "push:" prefix in a message, received from a
+ * server using a text protocol.
+ */
+static int
+netbox_text_is_push(struct lua_State *L)
+{
+ assert(lua_gettop(L) == 2);
+ uint32_t ctypeid;
+ const char *text = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == luaL_ctypeid(L, "char *"));
+ uint32_t len = (uint32_t) lua_tonumber(L, 2);
+ uint32_t push_len = strlen("push:");
+ lua_pushboolean(L, len >= 5 && memcmp(text, "push:", push_len) == 0);
+ return 1;
+}
+
+/**
+ * Search for IPROTO_PUSH key in a MessagePack encoded response
+ * body. It is needed without entire message decoding, when a user
+ * wants to store raw responses and pushes in its own buffer.
+ */
+static int
+netbox_body_is_push(struct lua_State *L)
+{
+ uint32_t ctypeid;
+ const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == luaL_ctypeid(L, "char *"));
+ assert(mp_typeof(*body) == MP_MAP);
+ lua_pushboolean(L, mp_decode_map(&body) == 1 &&
+ mp_typeof(*body) == MP_UINT &&
+ mp_decode_uint(&body) == IPROTO_PUSH);
+ return 1;
+}
+
int
luaopen_net_box(struct lua_State *L)
{
@@ -571,6 +606,8 @@ luaopen_net_box(struct lua_State *L)
{ "encode_auth", netbox_encode_auth },
{ "decode_greeting",netbox_decode_greeting },
{ "communicate", netbox_communicate },
+ { "text_is_push", netbox_text_is_push },
+ { "body_is_push", netbox_body_is_push },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 87c8c548b..7ef6a6a2d 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,8 @@ local communicate = internal.communicate
local encode_auth = internal.encode_auth
local encode_select = internal.encode_select
local decode_greeting = internal.decode_greeting
+local text_is_push = internal.text_is_push
+local body_is_push = internal.body_is_push
local sequence_mt = { __serialize = 'sequence' }
local TIMEOUT_INFINITY = 500 * 365 * 86400
@@ -38,6 +40,7 @@ local IPROTO_SYNC_KEY = 0x01
local IPROTO_SCHEMA_VERSION_KEY = 0x05
local IPROTO_DATA_KEY = 0x30
local IPROTO_ERROR_KEY = 0x31
+local IPROTO_PUSH_KEY = 0x32
local IPROTO_GREETING_SIZE = 128
-- select errors from box.error
@@ -229,7 +232,8 @@ local function create_transport(host, port, user, password, callback)
end
-- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ local function perform_request(timeout, buffer, method, on_push,
+ schema_version, ...)
if state ~= 'active' then
return last_errno or E_NO_CONNECTION, last_error
end
@@ -242,7 +246,7 @@ local function create_transport(host, port, user, password, callback)
local id = next_request_id
method_codec[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
+ local request = table_new(0, 7) -- reserve space for 7 keys
request.client = fiber_self()
request.method = method
request.schema_version = schema_version
@@ -254,6 +258,17 @@ local function create_transport(host, port, user, password, callback)
requests[id] = nil
return E_TIMEOUT, 'Timeout exceeded'
end
+ if request.messages then
+ -- Multiple push messages can appear on a single
+ -- event loop iteration.
+ local messages = request.messages
+ request.messages = nil
+ if on_push then
+ for _, m in pairs(messages) do
+ on_push(m)
+ end
+ end
+ end
until requests[id] == nil -- i.e. completed (beware spurious wakeups)
return request.errno, request.response
end
@@ -270,12 +285,12 @@ local function create_transport(host, port, user, password, callback)
if request == nil then -- nobody is waiting for the response
return
end
- requests[id] = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
if status ~= 0 then
-- Handle errors
+ requests[id] = nil
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -290,7 +305,16 @@ local function create_transport(host, port, user, password, callback)
local body_len = body_end - body_rpos
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
- request.response = tonumber(body_len)
+ if body_is_push(body_rpos) then
+ if request.messages then
+ table.insert(request.messages, tonumber(body_len))
+ else
+ request.messages = {tonumber(body_len)}
+ end
+ else
+ requests[id] = nil
+ request.response = tonumber(body_len)
+ end
wakeup_client(request.client)
return
end
@@ -298,7 +322,18 @@ local function create_transport(host, port, user, password, callback)
-- Decode xrow.body[DATA] to Lua objects
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
+ if body[IPROTO_PUSH_KEY] then
+ assert(#body[IPROTO_PUSH_KEY] == 1)
+ assert(not body[IPROTO_DATA_KEY])
+ if request.messages then
+ table.insert(request.messages, body[IPROTO_PUSH_KEY][1])
+ else
+ request.messages = {body[IPROTO_PUSH_KEY][1]}
+ end
+ else
+ requests[id] = nil
+ request.response = body[IPROTO_DATA_KEY]
+ end
wakeup_client(request.client)
end
@@ -345,9 +380,16 @@ local function create_transport(host, port, user, password, callback)
if err then
return err, delim_pos
else
- local response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+ local response
+ local is_push = text_is_push(recv_buf.rpos, delim_pos + #delim)
+ if not is_push then
+ response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+ else
+ -- 5 - len of 'push:' prefix of a message.
+ response = ffi.string(recv_buf.rpos + 5, delim_pos + #delim - 5)
+ end
recv_buf.rpos = recv_buf.rpos + delim_pos + #delim
- return nil, response
+ return nil, response, is_push
end
end
@@ -408,18 +450,26 @@ local function create_transport(host, port, user, password, callback)
console_sm = function(rid)
local delim = '\n...\n'
- local err, response = send_and_recv_console()
+ local err, response, is_push = send_and_recv_console()
if err then
return error_sm(err, response)
else
local request = requests[rid]
- if request == nil then -- nobody is waiting for the response
- return
+ if request then
+ if is_push then
+ -- In a console mode it is impossible to
+ -- get multiple pushes on a single event loop
+ -- iteration.
+ assert(not request.messages)
+ request.messages = {response}
+ else
+ requests[rid] = nil
+ request.response = response
+ rid = next_id(rid)
+ end
+ wakeup_client(request.client)
+ return console_sm(rid)
end
- requests[rid] = nil
- request.response = response
- wakeup_client(request.client)
- return console_sm(next_id(rid))
end
end
@@ -761,7 +811,8 @@ function remote_methods:_request(method, opts, ...)
timeout = deadline and max(0, deadline - fiber_clock())
end
err, res = perform_request(timeout, buffer, method,
- self.schema_version, ...)
+ opts and opts.on_push, self.schema_version,
+ ...)
if not err and buffer ~= nil then
return res -- the length of xrow.body
elseif not err then
@@ -791,7 +842,7 @@ function remote_methods:ping(opts)
timeout = deadline and max(0, deadline - fiber_clock())
or (opts and opts.timeout)
end
- local err = self._transport.perform_request(timeout, nil, 'ping',
+ local err = self._transport.perform_request(timeout, nil, 'ping', nil,
self.schema_version)
return not err or err == E_WRONG_SCHEMA_VERSION
end
@@ -956,8 +1007,16 @@ console_methods.on_disconnect = remote_methods.on_disconnect
console_methods.on_connect = remote_methods.on_connect
console_methods.is_connected = remote_methods.is_connected
console_methods.wait_state = remote_methods.wait_state
-function console_methods:eval(line, timeout)
+function console_methods:eval(line, opts)
check_remote_arg(self, 'eval')
+ local timeout
+ local on_push
+ if type(opts) == 'table' then
+ timeout = opts.timeout or TIMEOUT_INFINITY
+ on_push = opts.on_push
+ else
+ timeout = opts
+ end
local err, res
local transport = self._transport
local pr = transport.perform_request
@@ -968,10 +1027,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+ err, res = pr(timeout, nil, 'eval', on_push, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+ err, res = pr(timeout, nil, 'inject', on_push, nil, line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index af8411068..73cc9da8f 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
#include "session.h"
#include "lua/utils.h"
#include "lua/trigger.h"
+#include "lua/msgpack.h"
#include <lua.h>
#include <lauxlib.h>
@@ -41,6 +42,10 @@
#include "box/session.h"
#include "box/user.h"
#include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
+#include "fio.h"
+#include "small/obuf.h"
/** Owner of a console session. */
struct console_session_owner {
@@ -55,10 +60,26 @@ console_session_owner_dup(struct session_owner *owner);
static int
console_session_owner_fd(const struct session_owner *owner);
+/**
+ * Send "push:" prefix + message in a blocking mode, with no
+ * yields, to a console socket.
+ * @param owner Console session owner.
+ * @param sync Sync. It is unused since a text protocol has no
+ * syncs.
+ * @param port Port with text to dump.
+ *
+ * @retval -1 Memory or IO error.
+ * @retval 0 Success.
+ */
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static const struct session_owner_vtab console_session_owner_vtab = {
/* .dup = */ console_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ console_session_owner_fd,
+ /* .push = */ console_session_owner_push,
};
static struct session_owner *
@@ -90,6 +111,45 @@ console_session_owner_create(struct console_session_owner *owner, int fd)
owner->fd = fd;
}
+/**
+ * Write @a text into @a fd in a blocking mode, ignoring transient
+ * socket errors.
+ * @param fd Console descriptor.
+ * @param text Text to send.
+ * @param len Length of @a text.
+ */
+static inline int
+console_do_push(int fd, const char *text, uint32_t len)
+{
+ while (len > 0) {
+ int written = fio_write_silent(fd, text, len);
+ if (written < 0)
+ return -1;
+ assert((uint32_t) written <= len);
+ len -= written;
+ text += written;
+ }
+ return 0;
+}
+
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port)
+{
+ /* Console has no sync. */
+ (void) sync;
+ assert(owner->vtab == &console_session_owner_vtab);
+ int fd = console_session_owner_fd(owner);
+ uint32_t text_len;
+ const char *text = port_dump_raw(port, &text_len);
+ if (text == NULL ||
+ console_do_push(fd, "push:", strlen("push:")) != 0 ||
+ console_do_push(fd, text, text_len) != 0)
+ return -1;
+ else
+ return 0;
+}
+
static const char *sessionlib_name = "box.session";
/* Create session and pin it to fiber */
@@ -418,6 +478,116 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
return 3;
}
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+ const struct port_vtab *vtab;
+ /**
+ * Lua state, containing data to dump on top of the stack.
+ */
+ struct lua_State *L;
+};
+
+/**
+ * Lua push port supports two dump types: usual and raw. Raw dump
+ * encodes a message as a YAML formatted text, usual dump encodes
+ * the message as MessagePack right into an output buffer.
+ */
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out);
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+ .dump = lua_push_port_dump_msgpack,
+ /*
+ * Dump_16 has no sense, since push appears since 1.10
+ * protocol.
+ */
+ .dump_16 = NULL,
+ .dump_raw = lua_push_port_dump_text,
+ .destroy = NULL,
+};
+
+static void
+obuf_error_cb(void *ctx)
+{
+ *((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out)
+{
+ struct lua_push_port *lua_port = (struct lua_push_port *) port;
+ assert(lua_port->vtab == &lua_push_port_vtab);
+ struct mpstream stream;
+ int rc = 0;
+ /*
+ * Do not use luamp_error to allow a caller to clear the
+ * obuf, if it already has allocated something (for
+ * example, iproto push reserves memory for a header).
+ */
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ obuf_error_cb, &rc);
+ luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+ if (rc != 0)
+ return -1;
+ mpstream_flush(&stream);
+ return 0;
+}
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size)
+{
+ struct lua_push_port *lua_port = (struct lua_push_port *) port;
+ assert(lua_port->vtab == &lua_push_port_vtab);
+ lbox_console_format(lua_port->L);
+ assert(lua_isstring(lua_port->L, -1));
+ size_t len;
+ const char *result = lua_tolstring(lua_port->L, -1, &len);
+ *size = (uint32_t) len;
+ return result;
+}
+
+/**
+ * Push a message using a protocol, depending on a session type.
+ * @param data Data to push, first argument on a stack.
+ * @param opts Options. Now requires a single possible option -
+ * sync. Second argument on a stack.
+ */
+static int
+lbox_session_push(struct lua_State *L)
+{
+ if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
+usage_error:
+ return luaL_error(L, "Usage: box.session.push(data, opts)");
+ }
+ lua_getfield(L, 2, "sync");
+ if (! lua_isnumber(L, 3))
+ goto usage_error;
+ double lua_sync = lua_tonumber(L, 3);
+ lua_pop(L, 1);
+ uint64_t sync = (uint64_t) lua_sync;
+ if (lua_sync != sync)
+ goto usage_error;
+ struct lua_push_port port;
+ port.vtab = &lua_push_port_vtab;
+ port.L = L;
+ /*
+ * Pop the opts - they must not be pushed. Leave only data
+ * on a stack.
+ */
+ lua_remove(L, 2);
+ if (session_push(current_session(), sync, (struct port *) &port) != 0) {
+ return luaT_error(L);
+ } else {
+ lua_pushboolean(L, true);
+ return 1;
+ }
+}
+
/**
* Sets trigger on_access_denied.
* For test purposes only.
@@ -489,6 +659,7 @@ box_lua_session_init(struct lua_State *L)
{"on_disconnect", lbox_session_on_disconnect},
{"on_auth", lbox_session_on_auth},
{"on_access_denied", lbox_session_on_access_denied},
+ {"push", lbox_session_push},
{NULL, NULL}
};
luaL_register_module(L, sessionlib_name, sessionlib);
diff --git a/src/box/port.c b/src/box/port.c
index 03f6be79d..7f3cea5e7 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_16(struct port *port, struct obuf *out)
return port->vtab->dump_16(port, out);
}
+const char *
+port_dump_raw(struct port *port, uint32_t *size)
+{
+ return port->vtab->dump_raw(port, size);
+}
+
void
port_init(void)
{
@@ -159,5 +165,6 @@ port_free(void)
const struct port_vtab port_tuple_vtab = {
.dump = port_tuple_dump,
.dump_16 = port_tuple_dump_16,
+ .dump_raw = NULL,
.destroy = port_tuple_destroy,
};
diff --git a/src/box/port.h b/src/box/port.h
index 7cf3339b5..d1db74909 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
* format.
*/
int (*dump_16)(struct port *port, struct obuf *out);
+ /**
+ * Same as dump, but find a memory for an output buffer
+ * for itself.
+ */
+ const char *(*dump_raw)(struct port *port, uint32_t *size);
/**
* Destroy a port and release associated resources.
*/
@@ -158,6 +163,16 @@ port_dump(struct port *port, struct obuf *out);
int
port_dump_16(struct port *port, struct obuf *out);
+/**
+ * Same as port_dump(), but find a memory for an output buffer for
+ * itself.
+ * @param port Port to dump.
+ * @param[out] size Size of the data.
+ * @retval Data.
+ */
+const char *
+port_dump_raw(struct port *port, uint32_t *size);
+
void
port_init(void);
diff --git a/src/box/session.cc b/src/box/session.cc
index 908ec9c4e..793393fc8 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -63,6 +63,7 @@ static const struct session_owner_vtab generic_session_owner_vtab = {
/* .dup = */ generic_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ generic_session_owner_fd,
+ /* .push = */ generic_session_owner_push,
};
static struct session_owner *
@@ -95,6 +96,19 @@ session_owner_create(struct session_owner *owner, enum session_type type)
owner->vtab = &generic_session_owner_vtab;
}
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port)
+{
+ (void) owner;
+ (void) sync;
+ (void) port;
+ const char *session =
+ tt_sprintf("Session '%s'", session_type_strs[owner->type]);
+ diag_set(ClientError, ER_UNSUPPORTED, session, "push()");
+ return -1;
+}
+
static inline uint64_t
sid_max()
{
diff --git a/src/box/session.h b/src/box/session.h
index 105dcab17..b5f4955d8 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -59,6 +59,7 @@ enum session_type {
extern const char *session_type_strs[];
struct session_owner_vtab;
+struct port;
/**
* Object to store session type specific data. For example, IProto
@@ -78,8 +79,18 @@ struct session_owner_vtab {
void (*free)(struct session_owner *);
/** Get the descriptor of an owner, if has. Else -1. */
int (*fd)(const struct session_owner *);
+ /** Push a port data into a session owner's channel. */
+ int (*push)(struct session_owner *, uint64_t, struct port *);
};
+/**
+ * In a common case, a session does not support push. This
+ * function always returns -1 and sets ER_UNSUPPORTED error.
+ */
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static inline struct session_owner *
session_owner_dup(struct session_owner *owner)
{
@@ -142,6 +153,12 @@ session_fd(const struct session *session)
return session->owner->vtab->fd(session->owner);
}
+static inline int
+session_push(struct session *session, uint64_t sync, struct port *port)
+{
+ return session->owner->vtab->push(session->owner, sync, port);
+}
+
/**
* Find a session by id.
*/
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f48525645..f3e929f69 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,6 +43,9 @@
#include "scramble.h"
#include "iproto_constants.h"
+static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
+ "encoded IPROTO_BODY keys must fit into one byte");
+
int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end)
@@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
uint32_t v_data_len; /* string length of array size */
};
+static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
+ IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
+
static const struct iproto_body_bin iproto_body_bin = {
0x81, IPROTO_DATA, 0xdd, 0
};
@@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
0x81, IPROTO_ERROR, 0xdb, 0
};
+struct PACKED iproto_body_push_bin {
+ uint8_t m_body; /* MP_MAP */
+ uint8_t k_data; /* IPROTO_PUSH */
+ uint8_t v_data; /* 1-size MP_ARRAY */
+};
+
+static_assert(sizeof(struct iproto_body_push_bin) + IPROTO_HEADER_LEN ==
+ IPROTO_PUSH_HEADER_LEN, "size of the prepared push");
+
+static const struct iproto_body_push_bin iproto_push_bin = {
+ 0x81, IPROTO_PUSH, 0x91
+};
+
/** Return a 4-byte numeric error code, with status flags. */
static inline uint32_t
iproto_encode_error(uint32_t error)
@@ -337,23 +356,21 @@ iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
(void) write(fd, e->errmsg, msg_len);
}
-enum { SVP_SIZE = IPROTO_HEADER_LEN + sizeof(iproto_body_bin) };
-
int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size)
{
/**
* Reserve memory before taking a savepoint.
* This ensures that we get a contiguous chunk of memory
* and the savepoint is pointing at the beginning of it.
*/
- void *ptr = obuf_reserve(buf, SVP_SIZE);
+ void *ptr = obuf_reserve(buf, size);
if (ptr == NULL) {
- diag_set(OutOfMemory, SVP_SIZE, "obuf", "reserve");
+ diag_set(OutOfMemory, size, "obuf_reserve", "ptr");
return -1;
}
*svp = obuf_create_svp(buf);
- ptr = obuf_alloc(buf, SVP_SIZE);
+ ptr = obuf_alloc(buf, size);
assert(ptr != NULL);
return 0;
}
@@ -373,6 +390,17 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
}
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version)
+{
+ char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+ iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
+ obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+ memcpy(pos + IPROTO_HEADER_LEN, &iproto_push_bin,
+ sizeof(iproto_push_bin));
+}
+
int
xrow_decode_dml(struct xrow_header *row, struct request *request,
uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index d407d151b..d2ec394c5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -50,6 +50,10 @@ enum {
XROW_HEADER_LEN_MAX = 40,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
+ /** 7 = sizeof(iproto_body_bin). */
+ IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
+ /** 3 = sizeof(iproto_body_push_bin). */
+ IPROTO_PUSH_HEADER_LEN = IPROTO_HEADER_LEN + 3,
};
struct xrow_header {
@@ -344,8 +348,18 @@ iproto_header_encode(char *data, uint32_t type, uint64_t sync,
struct obuf;
struct obuf_svp;
+/**
+ * Reserve obuf space for a header, which depends on a response
+ * size.
+ */
int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp);
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size);
+
+static inline int
+iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+{
+ return iproto_prepare_header(buf, svp, IPROTO_SELECT_HEADER_LEN);
+}
/**
* Write select header to a preallocated buffer.
@@ -355,6 +369,16 @@ void
iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
uint32_t schema_version, uint32_t count);
+static inline int
+iproto_prepare_push(struct obuf *buf, struct obuf_svp *svp)
+{
+ return iproto_prepare_header(buf, svp, IPROTO_PUSH_HEADER_LEN);
+}
+
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version);
+
/**
* Encode iproto header with IPROTO_OK response code.
* @param out Encode to.
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..a813b7760 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -134,6 +134,18 @@ fio_writen(int fd, const void *buf, size_t count)
return 0;
}
+int
+fio_write_silent(int fd, const void *buf, size_t count)
+{
+ ssize_t nwr = write(fd, buf, count);
+ if (nwr >= 0)
+ return nwr;
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
+ return 0;
+ else
+ return -1;
+}
+
ssize_t
fio_writev(int fd, struct iovec *iov, int iovcnt)
{
diff --git a/src/fio.h b/src/fio.h
index 12749afcb..c88e19258 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -108,6 +108,22 @@ fio_pread(int fd, void *buf, size_t count, off_t offset);
int
fio_writen(int fd, const void *buf, size_t count);
+/**
+ * Write the given buffer with no retrying for partial writes,
+ * and ignore transient write errors: EINTR, EWOULDBLOCK and
+ * EAGAIN - these errors are not logged, and the function returns
+ * 0.
+ * @param fd File descriptor to write to.
+ * @param buf Buffer to write.
+ * @param count Size of @a buf.
+ *
+ * @retval -1 Non-transient error.
+ * @retval 0 Nothing to write, or a transient error.
+ * @retval > 0 Written byte count.
+ */
+int
+fio_write_silent(int fd, const void *buf, size_t count);
+
/**
* A simple wrapper around writev().
* Re-tries write in case of EINTR.
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index 48d28bd6d..c91576dc2 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
test = tap.test("console")
-test:plan(59)
+test:plan(61)
-- Start console and connect to it
local server = console.listen(CONSOLE_SOCKET)
@@ -31,6 +31,13 @@ local handshake = client:read{chunk = 128}
test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake')
test:ok(client ~= nil, "connect to console")
+--
+-- gh-2677: box.session.push, text protocol support.
+--
+client:write('box.session.push(200, {sync = 0})\n')
+test:is(client:read(EOL):find('push:'), 1, "read pushed value")
+test:is(client:read(EOL):find('true'), 7, "read box.session.push result")
+
-- Execute some command
client:write("1\n")
test:is(yaml.decode(client:read(EOL))[1], 1, "eval")
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 46d85b327..ecf86af0d 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 87e26f84c..2c19ee479 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
diff --git a/test/box/push.result b/test/box/push.result
new file mode 100644
index 000000000..747503f59
--- /dev/null
+++ b/test/box/push.result
@@ -0,0 +1,364 @@
+--
+-- gh-2677: box.session.push.
+--
+--
+-- Usage.
+--
+box.session.push()
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100)
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100, {sync = -200})
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+---
+...
+console = require('console')
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+s = console.listen(3434)
+---
+...
+c = netbox.connect(3434, {console = true})
+---
+...
+c:eval('100')
+---
+- '---
+
+ - 100
+
+ ...
+
+'
+...
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_push(message)
+ table.insert(messages, message)
+end;
+---
+...
+function do_pushes()
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+---
+- '---
+
+ - 300
+
+ ...
+
+'
+...
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+---
+- '---
+
+ - 300
+
+ ...
+
+'
+...
+messages
+---
+- - '---
+
+ - 1
+
+ ...
+
+'
+ - '---
+
+ - 2
+
+ ...
+
+'
+ - '---
+
+ - 3
+
+ ...
+
+'
+ - '---
+
+ - 4
+
+ ...
+
+'
+ - '---
+
+ - 5
+
+ ...
+
+'
+...
+c:close()
+---
+...
+s:close()
+---
+- true
+...
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+c:ping()
+---
+- true
+...
+messages = {}
+---
+...
+c:call('do_pushes', {}, {on_push = on_push})
+---
+- 300
+...
+messages
+---
+- - 1
+ - 2
+ - 3
+ - 4
+ - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+ local sync = box.session.sync()
+ box.session.push('started dml', {sync = sync})
+ s:replace{key}
+ box.session.push('continued dml', {sync = sync})
+ s:replace{-key}
+ box.session.push('finished dml', {sync = sync})
+ return key
+end;
+---
+...
+function do_pushes(val)
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.yield()
+ end
+ return val
+end;
+---
+...
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3)
+ assert(c.messages[1] == 'started dml')
+ assert(c.messages[2] == 'continued dml')
+ assert(c.messages[3] == 'finished dml')
+ assert(s:get{c.retval})
+ assert(s:get{-c.retval})
+ else
+ assert(c.retval)
+ assert(#c.messages == 5)
+ for k, v in pairs(c.messages) do
+ assert(k == v)
+ end
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 4
+ - 4
+ - 4
+ - 4
+ - 4
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {50: [1]}
+ - {50: [2]}
+ - {50: [3]}
+ - {50: [4]}
+ - {50: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+--
+-- Ensure can not push in background.
+--
+ok = nil
+---
+...
+err = nil
+---
+...
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+---
+...
+f = fiber.create(back_push_f)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+ok, err
+---
+- false
+- Session 'background' does not support push()
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
new file mode 100644
index 000000000..02d571876
--- /dev/null
+++ b/test/box/push.test.lua
@@ -0,0 +1,163 @@
+--
+-- gh-2677: box.session.push.
+--
+
+--
+-- Usage.
+--
+box.session.push()
+box.session.push(100)
+box.session.push(100, {sync = -200})
+
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+console = require('console')
+netbox = require('net.box')
+fiber = require('fiber')
+s = console.listen(3434)
+c = netbox.connect(3434, {console = true})
+c:eval('100')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function on_push(message)
+ table.insert(messages, message)
+end;
+
+function do_pushes()
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+test_run:cmd("setopt delimiter ''");
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+messages
+
+c:close()
+s:close()
+
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+
+c = netbox.connect(box.cfg.listen)
+c:ping()
+messages = {}
+c:call('do_pushes', {}, {on_push = on_push})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+ local sync = box.session.sync()
+ box.session.push('started dml', {sync = sync})
+ s:replace{key}
+ box.session.push('continued dml', {sync = sync})
+ s:replace{-key}
+ box.session.push('finished dml', {sync = sync})
+ return key
+end;
+function do_pushes(val)
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.yield()
+ end
+ return val
+end;
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3)
+ assert(c.messages[1] == 'started dml')
+ assert(c.messages[2] == 'continued dml')
+ assert(c.messages[3] == 'finished dml')
+ assert(s:get{c.retval})
+ assert(s:get{-c.retval})
+ else
+ assert(c.retval)
+ assert(#c.messages == 5)
+ for k, v in pairs(c.messages) do
+ assert(k == v)
+ end
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+
+--
+-- Ensure can not push in background.
+--
+ok = nil
+err = nil
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+f = fiber.create(back_push_f)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+ok, err
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index d561b4813..7b0f2993f 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3");
---
- true
...
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
_ = box.space.test:before_replace(function(old, new)
+ if box.session.type() == 'applier' and not push_err then
+ push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+ end
if old ~= nil and new ~= nil then
return new[2] > old[2] and new or old
end
@@ -187,6 +197,10 @@ box.space.test:select()
- [9, 90]
- [10, 100]
...
+push_err
+---
+- Session 'applier' does not support push()
+...
test_run:cmd('restart server autobootstrap3')
box.space.test:select()
---
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index 2c6912d06..04e5a2172 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new)
end
end);
test_run:cmd("switch autobootstrap3");
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
_ = box.space.test:before_replace(function(old, new)
+ if box.session.type() == 'applier' and not push_err then
+ push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+ end
if old ~= nil and new ~= nil then
return new[2] > old[2] and new or old
end
@@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2')
box.space.test:select()
test_run:cmd("switch autobootstrap3")
box.space.test:select()
+push_err
test_run:cmd('restart server autobootstrap3')
box.space.test:select()
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [tarantool-patches] [PATCH 0/5] session: introduce box.session.push
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
` (4 preceding siblings ...)
2018-03-19 13:34 ` [PATCH 5/5] session: introduce box.session.push Vladislav Shpilevoy
@ 2018-03-19 13:41 ` v.shpilevoy
5 siblings, 0 replies; 20+ messages in thread
From: v.shpilevoy @ 2018-03-19 13:41 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev
Sorry, forgot a description.
Box.session.push() allows to send a message to a client with no
finishing a main request. It means, that a client can split a big
response in several pushes, or send notifications from a long
polling request.
Tarantool now supports two push types: via text protocol and via
binary protocol. Text pushes are needed mainly for printing,
logging and all other stdout output, written on a host side from
a client console connection. Text protocol push message contains
"push:" prefix followed by a YAML formatted text, where as a final
response always has '---' prefix. To catch pushed messages client
uses console.connect() on_push option, which takes a function
with a single argument - message.
IProto message is encoded just like a regular response, but instead
of IPROTO_DATA it has IPROTO_PUSH with a single element - pushed
MessagePack encoded data.
Text push is trivial - it is just blocking write into a socket with
a prefix. Binary push is more complex.
TX thread to notify IProto thread about new data in obuf sends a
message 'push_msg'. IProto thread, got this message, notifies libev
about new data, and then sends 'push_msg' back with updated write
position. TX thread, received the message back, updates its version
of a write position. If IProto do not send a write position, then
TX would write to the same obuf again and again, because it can not
know that IProto already flushed another obuf.
To avoid multiple 'push_msg' in fly between IProto and TX, the only
one 'push_msg' per connection is used. To deliver pushes, appeared
when 'push_msg' was in fly, TX thread sets a flag every time when
sees, that 'push_msg' is sent, and there is a new push. When
'push_msg' returns, it checks this flag, and if it is set, then the
IProto is notified again.
> 19 марта 2018 г., в 16:34, Vladislav Shpilevoy <v.shpilevoy@tarantool.org> написал(а):
>
> Branch: http://github.com/tarantool/tarantool/tree/gh-2677-box-session-push
> Issue: https://github.com/tarantool/tarantool/issues/2677
>
> Vladislav Shpilevoy (5):
> session: forbid creation from Lua binary and applier sessions
> lua: port console yaml formatting to C
> Remove empty function declaration
> session: introduce session_owner
> session: introduce box.session.push
>
> src/box/applier.cc | 4 +-
> src/box/authentication.cc | 4 +-
> src/box/authentication.h | 3 +-
> src/box/box.cc | 4 +-
> src/box/box.h | 2 +-
> src/box/iproto.cc | 321 +++++++++++++++++++++++----
> src/box/iproto_constants.c | 3 +-
> src/box/iproto_constants.h | 8 +
> src/box/lua/call.c | 1 +
> src/box/lua/console.c | 42 ++++
> src/box/lua/console.h | 8 +
> src/box/lua/console.lua | 40 +---
> src/box/lua/net_box.c | 37 ++++
> src/box/lua/net_box.lua | 97 ++++++--
> src/box/lua/session.c | 252 ++++++++++++++++++++-
> src/box/port.c | 7 +
> src/box/port.h | 15 ++
> src/box/session.cc | 86 +++++++-
> src/box/session.h | 106 +++++++--
> src/box/vinyl.c | 3 +-
> src/box/xrow.c | 40 +++-
> src/box/xrow.h | 26 ++-
> src/fio.c | 12 +
> src/fio.h | 16 ++
> src/lua/socket.h | 2 -
> test/app-tap/console.test.lua | 9 +-
> test/box-tap/session.test.lua | 12 +-
> test/box/net.box.result | 2 +-
> test/box/net.box.test.lua | 2 +-
> test/box/push.result | 364 +++++++++++++++++++++++++++++++
> test/box/push.test.lua | 163 ++++++++++++++
> test/replication/before_replace.result | 14 ++
> test/replication/before_replace.test.lua | 11 +
> third_party/lua-yaml/lyaml.cc | 9 +-
> third_party/lua-yaml/lyaml.h | 3 +
> 35 files changed, 1576 insertions(+), 152 deletions(-)
> create mode 100644 test/box/push.result
> create mode 100644 test/box/push.test.lua
>
> --
> 2.14.3 (Apple Git-98)
>
>
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 1/5] session: forbid creation from Lua binary and applier sessions
2018-03-19 13:34 ` [PATCH 1/5] session: forbid creation from Lua binary and applier sessions Vladislav Shpilevoy
@ 2018-03-20 13:20 ` Vladimir Davydov
2018-03-20 13:46 ` v.shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-20 13:20 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Mar 19, 2018 at 04:34:48PM +0300, Vladislav Shpilevoy wrote:
> Lua has no access to applier or binary sockets, and these session
> types must be forbidden.
>
> And after #2667 applier, binary, console and background session
> owners will be incapsulated inside corresponding modules.
I don't understand why we should explicitly check session type in this
funciton. After all, it's an internal function, never intended to be
called directly by the user, so why should we care?
> diff --git a/test/box-tap/session.test.lua b/test/box-tap/session.test.lua
> index 6fddced3c..ff952fd45 100755
> --- a/test/box-tap/session.test.lua
> +++ b/test/box-tap/session.test.lua
> @@ -15,14 +15,22 @@ session = box.session
> space = box.schema.space.create('tweedledum')
> index = space:create_index('primary', { type = 'hash' })
>
> -test:plan(53)
> +test:plan(55)
> +
> +--
> +-- Check that can start from Lua only either console or REPL.
> +--
> +local ok, err = pcall(box.internal.session.create, 100, "binary")
> +test:is(err, "Can not start non-console or non-REPL session from Lua", "bad session type")
> +ok, err = pcall(box.internal.session.create, 100, "applier")
> +test:is(err, "Can not start non-console or non-REPL session from Lua", "bad session type")
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 1/5] session: forbid creation from Lua binary and applier sessions
2018-03-20 13:20 ` Vladimir Davydov
@ 2018-03-20 13:46 ` v.shpilevoy
0 siblings, 0 replies; 20+ messages in thread
From: v.shpilevoy @ 2018-03-20 13:46 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
> 20 марта 2018 г., в 16:20, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
>
> On Mon, Mar 19, 2018 at 04:34:48PM +0300, Vladislav Shpilevoy wrote:
>> Lua has no access to applier or binary sockets, and these session
>> types must be forbidden.
>>
>> And after #2667 applier, binary, console and background session
>> owners will be incapsulated inside corresponding modules.
>
> I don't understand why we should explicitly check session type in this
> funciton. After all, it's an internal function, never intended to be
> called directly by the user, so why should we care?
Just to protect from incorrect usage of API, regardless of is it internal or not. If you propose to drop the patch, then I can drop. Have I drop it?
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 2/5] lua: port console yaml formatting to C
2018-03-19 13:34 ` [PATCH 2/5] lua: port console yaml formatting to C Vladislav Shpilevoy
@ 2018-03-20 17:51 ` Vladimir Davydov
2018-03-20 18:04 ` v.shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-20 17:51 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Mar 19, 2018 at 04:34:49PM +0300, Vladislav Shpilevoy wrote:
> Box.session.push() will be implemented in C lbox_session_push()
> function, which will use port to encapsulate different session
> types (binary, text) push() logic. And push() must be able to
> either encode an argument into message pack, or format it as a
> string using yaml. This formatting can not be done in Lua before
> push() call, since it breaks push() virtualization.
>
> Needed for #2677
>
> Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> ---
> src/box/lua/console.c | 42 ++++++++++++++++++++++++++++++++++++++++++
> src/box/lua/console.lua | 34 ++++++++--------------------------
> third_party/lua-yaml/lyaml.cc | 9 +++------
> third_party/lua-yaml/lyaml.h | 3 +++
> 4 files changed, 56 insertions(+), 32 deletions(-)
>
> diff --git a/src/box/lua/console.c b/src/box/lua/console.c
> @@ -328,6 +329,32 @@ lbox_console_add_history(struct lua_State *L)
> return 0;
> }
>
> +static int
> +lbox_console_format(struct lua_State *L)
> +{
> + int arg_count = lua_gettop(L);
> + if (arg_count == 0) {
> + lua_pushstring(L, "---\n...\n");
> + return 1;
> + }
> + lua_getfield(L, LUA_REGISTRYINDEX, "_LOADED");
> + lua_getfield(L, -1, "console");
> + lua_getfield(L, -1, "formatter");
> + lua_getfield(L, -1, "encode");
> + lua_createtable(L, arg_count, 0);
> + for (int i = 0; i < arg_count; ++i) {
> + if (lua_isnil(L, i + 1))
> + lua_getfield(L, -3, "NULL");
> + else
> + lua_pushvalue(L, i + 1);
> + lua_rawseti(L, -2, i + 1);
> + }
> + lua_call(L, 1, 1);
> + lua_insert(L, -4);
> + lua_pop(L, 3);
> + return 1;
> +}
> +
> void
> tarantool_lua_console_init(struct lua_State *L)
> {
> @@ -344,6 +372,20 @@ tarantool_lua_console_init(struct lua_State *L)
> lua_getfield(L, -1, "completion_handler");
> lua_pushcclosure(L, lbox_console_readline, 1);
> lua_setfield(L, -2, "readline");
> +
> + lua_yaml_new_formatter(L);
> + lua_getfield(L, -1, "cfg");
> + lua_createtable(L, 0, 4);
> + lua_pushboolean(L, true);
> + lua_setfield(L, -2, "encode_invalid_numbers");
> + lua_pushboolean(L, true);
> + lua_setfield(L, -2, "encode_load_metatables");
> + lua_pushboolean(L, true);
> + lua_setfield(L, -2, "encode_use_tostring");
> + lua_pushboolean(L, true);
> + lua_setfield(L, -2, "encode_invalid_as_nil");
> + lua_call(L, 1, 0);
> + lua_setfield(L, -2, "formatter");
> diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
> @@ -395,4 +375,6 @@ package.loaded['console'] = {
> on_start = on_start;
> on_client_disconnect = on_client_disconnect;
> completion_handler = internal.completion_handler;
> + formatter = internal.formatter;
I don't like that we export the serializer to Lua, but never use it in
Lua code, only in C. Can't we create a serializer in C and pass it to
the encoder function explicitly?
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 3/5] Remove empty function declaration
2018-03-19 13:34 ` [PATCH 3/5] Remove empty function declaration Vladislav Shpilevoy
@ 2018-03-20 17:55 ` Vladimir Davydov
2018-03-20 17:57 ` [tarantool-patches] " v.shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-20 17:55 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Mar 19, 2018 at 04:34:50PM +0300, Vladislav Shpilevoy wrote:
> diff --git a/src/lua/socket.h b/src/lua/socket.h
> index f669b5ada..ec4e79d87 100644
> --- a/src/lua/socket.h
> +++ b/src/lua/socket.h
> @@ -45,8 +45,6 @@ lbox_socket_local_resolve(const char *host, const char *port,
> struct sockaddr *addr, socklen_t *socklen);
> int
> lbox_socket_nonblock(int fh, int mode);
> -int bsdsocket_sendto(int fh, const char *host, const char *port,
> - const void *octets, size_t len, int flags);
The patch is OK, obviously, but it doesn't belong to this series.
Please submit irrelevant changes like this one separately.
^ permalink raw reply [flat|nested] 20+ messages in thread
* [tarantool-patches] Re: [PATCH 3/5] Remove empty function declaration
2018-03-20 17:55 ` Vladimir Davydov
@ 2018-03-20 17:57 ` v.shpilevoy
2018-03-21 9:16 ` Vladimir Davydov
0 siblings, 1 reply; 20+ messages in thread
From: v.shpilevoy @ 2018-03-20 17:57 UTC (permalink / raw)
To: tarantool-patches
> 20 марта 2018 г., в 20:55, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
>
> On Mon, Mar 19, 2018 at 04:34:50PM +0300, Vladislav Shpilevoy wrote:
>> diff --git a/src/lua/socket.h b/src/lua/socket.h
>> index f669b5ada..ec4e79d87 100644
>> --- a/src/lua/socket.h
>> +++ b/src/lua/socket.h
>> @@ -45,8 +45,6 @@ lbox_socket_local_resolve(const char *host, const char *port,
>> struct sockaddr *addr, socklen_t *socklen);
>> int
>> lbox_socket_nonblock(int fh, int mode);
>> -int bsdsocket_sendto(int fh, const char *host, const char *port,
>> - const void *octets, size_t len, int flags);
>
> The patch is OK, obviously, but it doesn't belong to this series.
> Please submit irrelevant changes like this one separately.
>
Do you propose to push it on a different branch?
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 2/5] lua: port console yaml formatting to C
2018-03-20 17:51 ` Vladimir Davydov
@ 2018-03-20 18:04 ` v.shpilevoy
2018-03-21 9:14 ` Vladimir Davydov
0 siblings, 1 reply; 20+ messages in thread
From: v.shpilevoy @ 2018-03-20 18:04 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
[-- Attachment #1: Type: text/plain, Size: 3438 bytes --]
> 20 марта 2018 г., в 20:51, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
>
> On Mon, Mar 19, 2018 at 04:34:49PM +0300, Vladislav Shpilevoy wrote:
>> Box.session.push() will be implemented in C lbox_session_push()
>> function, which will use port to encapsulate different session
>> types (binary, text) push() logic. And push() must be able to
>> either encode an argument into message pack, or format it as a
>> string using yaml. This formatting can not be done in Lua before
>> push() call, since it breaks push() virtualization.
>>
>> Needed for #2677
>>
>> Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
>> ---
>> src/box/lua/console.c | 42 ++++++++++++++++++++++++++++++++++++++++++
>> src/box/lua/console.lua | 34 ++++++++--------------------------
>> third_party/lua-yaml/lyaml.cc | 9 +++------
>> third_party/lua-yaml/lyaml.h | 3 +++
>> 4 files changed, 56 insertions(+), 32 deletions(-)
>>
>> diff --git a/src/box/lua/console.c b/src/box/lua/console.c
>
>> @@ -328,6 +329,32 @@ lbox_console_add_history(struct lua_State *L)
>> return 0;
>> }
>>
>> +static int
>> +lbox_console_format(struct lua_State *L)
>> +{
>> + int arg_count = lua_gettop(L);
>> + if (arg_count == 0) {
>> + lua_pushstring(L, "---\n...\n");
>> + return 1;
>> + }
>> + lua_getfield(L, LUA_REGISTRYINDEX, "_LOADED");
>> + lua_getfield(L, -1, "console");
>> + lua_getfield(L, -1, "formatter");
>> + lua_getfield(L, -1, "encode");
>> + lua_createtable(L, arg_count, 0);
>> + for (int i = 0; i < arg_count; ++i) {
>> + if (lua_isnil(L, i + 1))
>> + lua_getfield(L, -3, "NULL");
>> + else
>> + lua_pushvalue(L, i + 1);
>> + lua_rawseti(L, -2, i + 1);
>> + }
>> + lua_call(L, 1, 1);
>> + lua_insert(L, -4);
>> + lua_pop(L, 3);
>> + return 1;
>> +}
>> +
>> void
>> tarantool_lua_console_init(struct lua_State *L)
>> {
>
>> @@ -344,6 +372,20 @@ tarantool_lua_console_init(struct lua_State *L)
>> lua_getfield(L, -1, "completion_handler");
>> lua_pushcclosure(L, lbox_console_readline, 1);
>> lua_setfield(L, -2, "readline");
>> +
>> + lua_yaml_new_formatter(L);
>> + lua_getfield(L, -1, "cfg");
>> + lua_createtable(L, 0, 4);
>> + lua_pushboolean(L, true);
>> + lua_setfield(L, -2, "encode_invalid_numbers");
>> + lua_pushboolean(L, true);
>> + lua_setfield(L, -2, "encode_load_metatables");
>> + lua_pushboolean(L, true);
>> + lua_setfield(L, -2, "encode_use_tostring");
>> + lua_pushboolean(L, true);
>> + lua_setfield(L, -2, "encode_invalid_as_nil");
>> + lua_call(L, 1, 0);
>> + lua_setfield(L, -2, "formatter");
>
>> diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
>> @@ -395,4 +375,6 @@ package.loaded['console'] = {
>> on_start = on_start;
>> on_client_disconnect = on_client_disconnect;
>> completion_handler = internal.completion_handler;
>> + formatter = internal.formatter;
>
> I don't like that we export the serializer to Lua, but never use it in
> Lua code, only in C. Can't we create a serializer in C and pass it to
> the encoder function explicitly?
The problem of Lua formatter is that it is Lua table, not cdata. And to fully port it on C it is necessary to rewrite luaL_serializer and l_dump in lyaml.cc <http://lyaml.cc/>, that seems to be too complicated for a single propose - format push message. But if you insist, I can do full port.
[-- Attachment #2: Type: text/html, Size: 12604 bytes --]
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 4/5] session: introduce session_owner
2018-03-19 13:34 ` [PATCH 4/5] session: introduce session_owner Vladislav Shpilevoy
@ 2018-03-20 18:29 ` Vladimir Davydov
0 siblings, 0 replies; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-20 18:29 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Mar 19, 2018 at 04:34:51PM +0300, Vladislav Shpilevoy wrote:
> Session owner stores a session type specific data. For example,
> IProto session has authentication salt, console session has
> file descriptor.
>
> For #2677 session owner of IProto and console will have push()
> virtual function to do box.session.push, which implementation
> depends on a session type.
>
> Needed for #2677
> diff --git a/src/box/session.h b/src/box/session.h
> @@ -60,6 +58,49 @@ enum session_type {
>
> extern const char *session_type_strs[];
>
> +struct session_owner_vtab;
> +
> +/**
> + * Object to store session type specific data. For example, IProto
> + * stores iproto_connection, console stores file descriptor.
> + */
> +struct session_owner {
I don't really like the name. Please come up with some alternatives
(session_context, creator, data? dunno) so that we can pick the best
one.
> + /** Session type. */
> + enum session_type type;
> + /** Virtual session owner methods. */
> + const struct session_owner_vtab *vtab;
> +};
> +
> +struct session_owner_vtab {
> + /** Allocate a duplicate of an owner. */
> + struct session_owner *(*dup)(struct session_owner *);
> + /** Destroy an owner, and free its memory. */
> + void (*free)(struct session_owner *);
> + /** Get the descriptor of an owner, if has. Else -1. */
> + int (*fd)(const struct session_owner *);
> +};
> +
> +static inline struct session_owner *
> +session_owner_dup(struct session_owner *owner)
> +{
> + return owner->vtab->dup(owner);
> +}
> +
> +static inline void
> +session_owner_delete(struct session_owner *owner)
> +{
> + owner->vtab->free(owner);
> +}
> diff --git a/src/box/session.cc b/src/box/session.cc
> @@ -112,13 +155,25 @@ session_create(int fd, enum session_type type)
> return session;
> }
>
> +int
> +session_set_owner(struct session *session, struct session_owner *new_owner)
> +{
> + struct session_owner *dup = session_owner_dup(new_owner);
> + if (dup == NULL)
> + return -1;
> + if (session->owner != NULL)
> + session_owner_delete(session->owner);
> + session->owner = dup;
> + return 0;
> +}
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> @@ -533,7 +533,9 @@ applier_f(va_list ap)
> * Set correct session type for use in on_replace()
> * triggers.
> */
> - current_session()->type = SESSION_TYPE_APPLIER;
> + struct session_owner applier_owner;
> + session_owner_create(&applier_owner, SESSION_TYPE_APPLIER);
> + session_set_owner(current_session(), &applier_owner);
I don't like the way you set the owner: first you initialize it on
stack, then duplicate it on heap. This doesn't look good.
Let's try to embed the session_owner structure (or whatever it will be
called) in struct session with some padding so that we don't need to
allocate anything, i.e.:
struct session_owner {
int type;
struct session_owner_vtab vtab;
char pad[128];
};
struct session {
struct session_owner owner;
We can use static assertions to make sure all deriving structures fit in
(see struct port for example).
Then we would add
iproto_init_session(session, ...)
console_init_session(session, ...)
session_clear_owner(session)
(elaborate the names pls)
that would initialize session->owner appropriately.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 5/5] session: introduce box.session.push
2018-03-19 13:34 ` [PATCH 5/5] session: introduce box.session.push Vladislav Shpilevoy
@ 2018-03-21 9:10 ` Vladimir Davydov
2018-03-21 9:30 ` [tarantool-patches] " v.shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-21 9:10 UTC (permalink / raw)
To: Vladislav Shpilevoy; +Cc: tarantool-patches
On Mon, Mar 19, 2018 at 04:34:52PM +0300, Vladislav Shpilevoy wrote:
> Box.session.push() allows to send a message to a client with no
> finishing a main request.
>
> Tarantool supports two push types: via text protocol and via
> binary protocol. Text protocol push message contains "push:"
> prefix followed by a YAML formatted text, where as a final
> response always has '---' prefix. To catch pushed messages on a
> client side use console.connect() on_push options which takes
> a function with a single argument - message.
>
> IProto message is encoded just like a regular response, but
> instead of IPROTO_DATA it has IPROTO_PUSH with a single element -
> pushed MessagePack encoded data.
>
> Text push is trivial - it is just blocking write into a socket
> with a prefix. Binary push is more complex.
>
> TX thread to notify IProto thread about new data in obuf sends
> a message 'push_msg'. IProto thread, got this message, notifies
> libev about new data, and then sends 'push_msg' back with
> updated write position. TX thread, received the message back,
> updates its version of a write position. If IProto will not send
> a write position, then TX will write to the same obuf again
> and again, because it can not know that IProto already flushed
> another obuf.
>
> To avoid multiple 'push_msg' in fly between IProto and TX, the
> only one 'push_msg' per connection is used. To deliver pushes,
> appeared when 'push_msg' was in fly, TX thread sets a flag every
> time when sees, that 'push_msg' is sent, and there is a new push.
> When 'push_msg' returns, it checks this flag, and if it is set,
> then the IProto is notified again.
>
> Closes #2677
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> @@ -70,6 +109,37 @@ struct iproto_session_owner {
> char salt[IPROTO_SALT_SIZE];
> /** IProto connection. */
> struct iproto_connection *connection;
> + /**
> + * Is_push_in_progress is set, when a push_msg is sent to
> + * IProto thread, and reset, when the message is returned
> + * to TX. If a new push sees, that a push_msg is already
> + * sent to IProto, then has_new_pushes is set. After push
> + * notification is returned to TX, it checks
> + * has_new_pushes. If it is set, then the notification is
> + * sent again. This ping-pong continues, until TX stopped
> + * pushing. It allows to
> + * 1) avoid multiple push_msg from one session in fly,
> + * 2) do not block push() until a previous push() is
> + * finished.
> + *
> + * IProto TX
> + * -------------------------------------------------------
> + * + [push message]
> + * start socket <--- notification ----
> + * write
> + * + [push message]
> + * + [push message]
> + * ...
> + * end socket
> + * write ----------------> check for new
> + * pushes - found
> + * <--- notification ---
> + * ....
> + */
> + bool has_new_pushes;
> + bool is_push_in_progress;
> + /** Push notification for IProto thread. */
> + struct iproto_push_msg push_msg;
Please move this to iproto_connection.
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> @@ -79,6 +79,14 @@ enum iproto_key {
> /* Leave a gap between request keys and response keys */
> IPROTO_DATA = 0x30,
> IPROTO_ERROR = 0x31,
> + /**
> + * Tarantool supports two push types: binary and text.
> + * A text push can be distinguished from a response by a
> + * prefix "push:".
I doubt we need to designate text pushes at all. IMO they are useful
only for printing text to the user console. I suggest you disable
the on_push callback if net_box is operating in the 'console' mode,
instead just append pushes to the output, without a prefix.
> + * Binary push is encoded using IPROTO_PUSH key in a
> + * message body, which replaces IPROTO_DATA.
> + */
> + IPROTO_PUSH = 0x32,
> diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> +/**
> + * Search for IPROTO_PUSH key in a MessagePack encoded response
> + * body. It is needed without entire message decoding, when a user
> + * wants to store raw responses and pushes in its own buffer.
> + */
> +static int
> +netbox_body_is_push(struct lua_State *L)
> +{
> + uint32_t ctypeid;
> + const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
> + assert(ctypeid == luaL_ctypeid(L, "char *"));
> + assert(mp_typeof(*body) == MP_MAP);
> + lua_pushboolean(L, mp_decode_map(&body) == 1 &&
> + mp_typeof(*body) == MP_UINT &&
> + mp_decode_uint(&body) == IPROTO_PUSH);
> + return 1;
> +}
> +
Can't you do this check in net_box.lua, without involving C?
> diff --git a/src/box/lua/session.c b/src/box/lua/session.c
> +/**
> + * Write @a text into @a fd in a blocking mode, ignoring transient
> + * socket errors.
> + * @param fd Console descriptor.
> + * @param text Text to send.
> + * @param len Length of @a text.
> + */
> +static inline int
> +console_do_push(int fd, const char *text, uint32_t len)
> +{
> + while (len > 0) {
> + int written = fio_write_silent(fd, text, len);
I don't think that using a blocking function here is acceptable
(AFAICS fio_write_silent() calls the write syscall on session fd).
> + if (written < 0)
> + return -1;
> + assert((uint32_t) written <= len);
> + len -= written;
> + text += written;
> + }
> + return 0;
> +}
> +
> +static int
> +console_session_owner_push(struct session_owner *owner, uint64_t sync,
> + struct port *port)
> +{
> + /* Console has no sync. */
> + (void) sync;
> + assert(owner->vtab == &console_session_owner_vtab);
> + int fd = console_session_owner_fd(owner);
> + uint32_t text_len;
> + const char *text = port_dump_raw(port, &text_len);
> + if (text == NULL ||
> + console_do_push(fd, "push:", strlen("push:")) != 0 ||
> + console_do_push(fd, text, text_len) != 0)
> + return -1;
> + else
> + return 0;
> +}
> +/**
> + * Push a message using a protocol, depending on a session type.
> + * @param data Data to push, first argument on a stack.
> + * @param opts Options. Now requires a single possible option -
> + * sync. Second argument on a stack.
> + */
> +static int
> +lbox_session_push(struct lua_State *L)
> +{
> + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
> +usage_error:
> + return luaL_error(L, "Usage: box.session.push(data, opts)");
I don't think that we should oblige the user to pass the 'sync' value
explicitly - this would be really annoying. I think we should save the
sync somehow (fiber local storage, request?) and pass it implicitly.
> + }
> + lua_getfield(L, 2, "sync");
> + if (! lua_isnumber(L, 3))
> + goto usage_error;
> + double lua_sync = lua_tonumber(L, 3);
> + lua_pop(L, 1);
> + uint64_t sync = (uint64_t) lua_sync;
> + if (lua_sync != sync)
> + goto usage_error;
> + struct lua_push_port port;
> + port.vtab = &lua_push_port_vtab;
> + port.L = L;
> + /*
> + * Pop the opts - they must not be pushed. Leave only data
> + * on a stack.
> + */
> + lua_remove(L, 2);
> + if (session_push(current_session(), sync, (struct port *) &port) != 0) {
> + return luaT_error(L);
> + } else {
> + lua_pushboolean(L, true);
> + return 1;
What's the point in returning 'true' on success?
> + }
> +}
> diff --git a/src/box/port.h b/src/box/port.h
> @@ -76,6 +76,11 @@ struct port_vtab {
> * format.
> */
> int (*dump_16)(struct port *port, struct obuf *out);
> + /**
> + * Same as dump, but find a memory for an output buffer
> + * for itself.
> + */
> + const char *(*dump_raw)(struct port *port, uint32_t *size);
Somehow this doesn't feel right. May be, we should encode Lua stack in
msgpack first, and then re-encode it to Yaml. May be, we shouldn't use
the 'port' at all. May be, I'm being too picky, and we should leave it
as is. Anyway, please think of alternatives.
> diff --git a/src/box/xrow.c b/src/box/xrow.c
> @@ -43,6 +43,9 @@
> #include "scramble.h"
> #include "iproto_constants.h"
>
> +static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
> + "encoded IPROTO_BODY keys must fit into one byte");
> +
Why check this now? IPROTO_DATA and IPROTO_PUSH can't occasionally
change as they are defined in the binary protocol so there's no point
in this static assertion IMO.
> int
> xrow_header_decode(struct xrow_header *header, const char **pos,
> const char *end)
> @@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
> uint32_t v_data_len; /* string length of array size */
> };
>
> +static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
> + IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
> +
> static const struct iproto_body_bin iproto_body_bin = {
> 0x81, IPROTO_DATA, 0xdd, 0
> };
> @@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
> 0x81, IPROTO_ERROR, 0xdb, 0
> };
>
> +struct PACKED iproto_body_push_bin {
> + uint8_t m_body; /* MP_MAP */
> + uint8_t k_data; /* IPROTO_PUSH */
> + uint8_t v_data; /* 1-size MP_ARRAY */
> +};
Why don't you just reuse iproto_body_bin for this?
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 2/5] lua: port console yaml formatting to C
2018-03-20 18:04 ` v.shpilevoy
@ 2018-03-21 9:14 ` Vladimir Davydov
2018-03-21 9:30 ` v.shpilevoy
0 siblings, 1 reply; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-21 9:14 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches
On Tue, Mar 20, 2018 at 09:04:09PM +0300, v.shpilevoy@tarantool.org wrote:
>
>
> > 20 марта 2018 г., в 20:51, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
> >
> > On Mon, Mar 19, 2018 at 04:34:49PM +0300, Vladislav Shpilevoy wrote:
> >> Box.session.push() will be implemented in C lbox_session_push()
> >> function, which will use port to encapsulate different session
> >> types (binary, text) push() logic. And push() must be able to
> >> either encode an argument into message pack, or format it as a
> >> string using yaml. This formatting can not be done in Lua before
> >> push() call, since it breaks push() virtualization.
> >>
> >> Needed for #2677
> >>
> >> Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
> >> ---
> >> src/box/lua/console.c | 42 ++++++++++++++++++++++++++++++++++++++++++
> >> src/box/lua/console.lua | 34 ++++++++--------------------------
> >> third_party/lua-yaml/lyaml.cc | 9 +++------
> >> third_party/lua-yaml/lyaml.h | 3 +++
> >> 4 files changed, 56 insertions(+), 32 deletions(-)
> >>
> >> diff --git a/src/box/lua/console.c b/src/box/lua/console.c
> >
> >> @@ -328,6 +329,32 @@ lbox_console_add_history(struct lua_State *L)
> >> return 0;
> >> }
> >>
> >> +static int
> >> +lbox_console_format(struct lua_State *L)
> >> +{
> >> + int arg_count = lua_gettop(L);
> >> + if (arg_count == 0) {
> >> + lua_pushstring(L, "---\n...\n");
> >> + return 1;
> >> + }
> >> + lua_getfield(L, LUA_REGISTRYINDEX, "_LOADED");
> >> + lua_getfield(L, -1, "console");
> >> + lua_getfield(L, -1, "formatter");
> >> + lua_getfield(L, -1, "encode");
> >> + lua_createtable(L, arg_count, 0);
> >> + for (int i = 0; i < arg_count; ++i) {
> >> + if (lua_isnil(L, i + 1))
> >> + lua_getfield(L, -3, "NULL");
> >> + else
> >> + lua_pushvalue(L, i + 1);
> >> + lua_rawseti(L, -2, i + 1);
> >> + }
> >> + lua_call(L, 1, 1);
> >> + lua_insert(L, -4);
> >> + lua_pop(L, 3);
> >> + return 1;
> >> +}
> >> +
> >> void
> >> tarantool_lua_console_init(struct lua_State *L)
> >> {
> >
> >> @@ -344,6 +372,20 @@ tarantool_lua_console_init(struct lua_State *L)
> >> lua_getfield(L, -1, "completion_handler");
> >> lua_pushcclosure(L, lbox_console_readline, 1);
> >> lua_setfield(L, -2, "readline");
> >> +
> >> + lua_yaml_new_formatter(L);
> >> + lua_getfield(L, -1, "cfg");
> >> + lua_createtable(L, 0, 4);
> >> + lua_pushboolean(L, true);
> >> + lua_setfield(L, -2, "encode_invalid_numbers");
> >> + lua_pushboolean(L, true);
> >> + lua_setfield(L, -2, "encode_load_metatables");
> >> + lua_pushboolean(L, true);
> >> + lua_setfield(L, -2, "encode_use_tostring");
> >> + lua_pushboolean(L, true);
> >> + lua_setfield(L, -2, "encode_invalid_as_nil");
> >> + lua_call(L, 1, 0);
> >> + lua_setfield(L, -2, "formatter");
> >
> >> diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
> >> @@ -395,4 +375,6 @@ package.loaded['console'] = {
> >> on_start = on_start;
> >> on_client_disconnect = on_client_disconnect;
> >> completion_handler = internal.completion_handler;
> >> + formatter = internal.formatter;
> >
> > I don't like that we export the serializer to Lua, but never use it in
> > Lua code, only in C. Can't we create a serializer in C and pass it to
> > the encoder function explicitly?
>
> The problem of Lua formatter is that it is Lua table, not cdata. And
> to fully port it on C it is necessary to rewrite luaL_serializer and
> l_dump in lyaml.cc <http://lyaml.cc/>, that seems to be too
> complicated for a single propose - format push message. But if you
> insist, I can do full port.
I'm not sure... Obviously, I don't want you to rewrite the yaml encode.
I just want to understand if there are alternatives to exporting the
yaml encoder to the Lua console module, just to use it in C. Please take
a look at how src/lua/msgpack.c is implemented - may be, we could borrow
some ideas from it.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 3/5] Remove empty function declaration
2018-03-20 17:57 ` [tarantool-patches] " v.shpilevoy
@ 2018-03-21 9:16 ` Vladimir Davydov
0 siblings, 0 replies; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-21 9:16 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches
On Tue, Mar 20, 2018 at 08:57:53PM +0300, v.shpilevoy@tarantool.org wrote:
>
>
> > 20 марта 2018 г., в 20:55, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
> >
> > On Mon, Mar 19, 2018 at 04:34:50PM +0300, Vladislav Shpilevoy wrote:
> >> diff --git a/src/lua/socket.h b/src/lua/socket.h
> >> index f669b5ada..ec4e79d87 100644
> >> --- a/src/lua/socket.h
> >> +++ b/src/lua/socket.h
> >> @@ -45,8 +45,6 @@ lbox_socket_local_resolve(const char *host, const char *port,
> >> struct sockaddr *addr, socklen_t *socklen);
> >> int
> >> lbox_socket_nonblock(int fh, int mode);
> >> -int bsdsocket_sendto(int fh, const char *host, const char *port,
> >> - const void *octets, size_t len, int flags);
> >
> > The patch is OK, obviously, but it doesn't belong to this series.
> > Please submit irrelevant changes like this one separately.
> >
>
> Do you propose to push it on a different branch?
I don't think we need to do this now. Let's just push this trivial patch
to the trunk so that you can drop it from the series in v2.
^ permalink raw reply [flat|nested] 20+ messages in thread
* [tarantool-patches] Re: [PATCH 5/5] session: introduce box.session.push
2018-03-21 9:10 ` Vladimir Davydov
@ 2018-03-21 9:30 ` v.shpilevoy
2018-03-21 12:25 ` Vladimir Davydov
0 siblings, 1 reply; 20+ messages in thread
From: v.shpilevoy @ 2018-03-21 9:30 UTC (permalink / raw)
To: tarantool-patches
>
> I doubt we need to designate text pushes at all. IMO they are useful
> only for printing text to the user console. I suggest you disable
> the on_push callback if net_box is operating in the 'console' mode,
> instead just append pushes to the output, without a prefix.
On_push can be used from netbox, where pushed message will finish request, if it has no
prefix - it just can not be distinguished from a final response.
>
>> diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
>> +/**
>> + * Search for IPROTO_PUSH key in a MessagePack encoded response
>> + * body. It is needed without entire message decoding, when a user
>> + * wants to store raw responses and pushes in its own buffer.
>> + */
>> +static int
>> +netbox_body_is_push(struct lua_State *L)
>> +{
>> + uint32_t ctypeid;
>> + const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
>> + assert(ctypeid == luaL_ctypeid(L, "char *"));
>> + assert(mp_typeof(*body) == MP_MAP);
>> + lua_pushboolean(L, mp_decode_map(&body) == 1 &&
>> + mp_typeof(*body) == MP_UINT &&
>> + mp_decode_uint(&body) == IPROTO_PUSH);
>> + return 1;
>> +}
>> +
>
> Can't you do this check in net_box.lua, without involving C?
No, messagepack Lua api does not allow to decode a part of a message.
>
>> diff --git a/src/box/lua/session.c b/src/box/lua/session.c
>> +/**
>> + * Write @a text into @a fd in a blocking mode, ignoring transient
>> + * socket errors.
>> + * @param fd Console descriptor.
>> + * @param text Text to send.
>> + * @param len Length of @a text.
>> + */
>> +static inline int
>> +console_do_push(int fd, const char *text, uint32_t len)
>> +{
>> + while (len > 0) {
>> + int written = fio_write_silent(fd, text, len);
>
> I don't think that using a blocking function here is acceptable
> (AFAICS fio_write_silent() calls the write syscall on session fd).
It is acceptable since it is blocking in the original Lua code. See
console.lua and socket.lua.
>> return 0;
>> +}
>
>> +/**
>> + * Push a message using a protocol, depending on a session type.
>> + * @param data Data to push, first argument on a stack.
>> + * @param opts Options. Now requires a single possible option -
>> + * sync. Second argument on a stack.
>> + */
>> +static int
>> +lbox_session_push(struct lua_State *L)
>> +{
>> + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
>> +usage_error:
>> + return luaL_error(L, "Usage: box.session.push(data, opts)");
>
> I don't think that we should oblige the user to pass the 'sync' value
> explicitly - this would be really annoying. I think we should save the
> sync somehow (fiber local storage, request?) and pass it implicitly.
In the push ticket on Github you can see a discussion about this question. And it appeared, that passing sync explicitly is the only way to do push correctly. We can store it neither in fiber (encapsulation violation) nor in Lua in some hidden variable (a tried, many many times) nor use session sync (it can be changed after yield). So passing sync explicitly is ok. It is approved by Kostja.
>> + }
>> + lua_getfield(L, 2, "sync");
>> + if (! lua_isnumber(L, 3))
>> + goto usage_error;
>> + double lua_sync = lua_tonumber(L, 3);
>> + lua_pop(L, 1);
>> + uint64_t sync = (uint64_t) lua_sync;
>> + if (lua_sync != sync)
>> + goto usage_error;
>> + struct lua_push_port port;
>> + port.vtab = &lua_push_port_vtab;
>> + port.L = L;
>> + /*
>> + * Pop the opts - they must not be pushed. Leave only data
>> + * on a stack.
>> + */
>> + lua_remove(L, 2);
>> + if (session_push(current_session(), sync, (struct port *) &port) != 0) {
>> + return luaT_error(L);
>> + } else {
>> + lua_pushboolean(L, true);
>> + return 1;
>
> What's the point in returning 'true' on success?
What is alternative?
>
>> + }
>> +}
>
>> diff --git a/src/box/port.h b/src/box/port.h
>> @@ -76,6 +76,11 @@ struct port_vtab {
>> * format.
>> */
>> int (*dump_16)(struct port *port, struct obuf *out);
>> + /**
>> + * Same as dump, but find a memory for an output buffer
>> + * for itself.
>> + */
>> + const char *(*dump_raw)(struct port *port, uint32_t *size);
>
> Somehow this doesn't feel right. May be, we should encode Lua stack in
> msgpack first, and then re-encode it to Yaml. May be, we shouldn't use
> the 'port' at all. May be, I'm being too picky, and we should leave it
> as is. Anyway, please think of alternatives.
I already have spent many time on alternatives, and it appeared, that a port
is the most useful way. Formatting to a message pack and back to YAML is memory
and CPU overhead, and moreover if we format it in a message pack, we are forced to
use region for encoded data, because console has no obuf, and into IProto this region
must be copied. Now for IProto pushes a message is encoded directly in obuf, with no
multiple coping.
>
>> diff --git a/src/box/xrow.c b/src/box/xrow.c
>> @@ -43,6 +43,9 @@
>> #include "scramble.h"
>> #include "iproto_constants.h"
>>
>> +static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
>> + "encoded IPROTO_BODY keys must fit into one byte");
>> +
>
> Why check this now? IPROTO_DATA and IPROTO_PUSH can't occasionally
> change as they are defined in the binary protocol so there's no point
> in this static assertion IMO.
I thought that it looks more clear for a newbie, who reads IProto code. For me at the beginning it
was very unclear why we are sure, that codes can fit into one byte, and how iproto_body/header_bin
work.
But if you think, that it is bad idea, I can delete it.
>
>> int
>> xrow_header_decode(struct xrow_header *header, const char **pos,
>> const char *end)
>> @@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
>> uint32_t v_data_len; /* string length of array size */
>> };
>>
>> +static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
>> + IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
>> +
>> static const struct iproto_body_bin iproto_body_bin = {
>> 0x81, IPROTO_DATA, 0xdd, 0
>> };
>> @@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
>> 0x81, IPROTO_ERROR, 0xdb, 0
>> };
>>
>> +struct PACKED iproto_body_push_bin {
>> + uint8_t m_body; /* MP_MAP */
>> + uint8_t k_data; /* IPROTO_PUSH */
>> + uint8_t v_data; /* 1-size MP_ARRAY */
>> +};
>
> Why don't you just reuse iproto_body_bin for this?
IProto push message body header requires just 3 bytes, while response body requires 7.
>
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [PATCH 2/5] lua: port console yaml formatting to C
2018-03-21 9:14 ` Vladimir Davydov
@ 2018-03-21 9:30 ` v.shpilevoy
0 siblings, 0 replies; 20+ messages in thread
From: v.shpilevoy @ 2018-03-21 9:30 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
>>>
>>>> diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
>>>> @@ -395,4 +375,6 @@ package.loaded['console'] = {
>>>> on_start = on_start;
>>>> on_client_disconnect = on_client_disconnect;
>>>> completion_handler = internal.completion_handler;
>>>> + formatter = internal.formatter;
>>>
>>> I don't like that we export the serializer to Lua, but never use it in
>>> Lua code, only in C. Can't we create a serializer in C and pass it to
>>> the encoder function explicitly?
>>
>> The problem of Lua formatter is that it is Lua table, not cdata. And
>> to fully port it on C it is necessary to rewrite luaL_serializer and
>> l_dump in lyaml.cc <http://lyaml.cc/>, that seems to be too
>> complicated for a single propose - format push message. But if you
>> insist, I can do full port.
>
> I'm not sure... Obviously, I don't want you to rewrite the yaml encode.
> I just want to understand if there are alternatives to exporting the
> yaml encoder to the Lua console module, just to use it in C. Please take
> a look at how src/lua/msgpack.c is implemented - may be, we could borrow
> some ideas from it.
Ok, I will investigate.
^ permalink raw reply [flat|nested] 20+ messages in thread
* Re: [tarantool-patches] Re: [PATCH 5/5] session: introduce box.session.push
2018-03-21 9:30 ` [tarantool-patches] " v.shpilevoy
@ 2018-03-21 12:25 ` Vladimir Davydov
0 siblings, 0 replies; 20+ messages in thread
From: Vladimir Davydov @ 2018-03-21 12:25 UTC (permalink / raw)
To: v.shpilevoy; +Cc: tarantool-patches
On Wed, Mar 21, 2018 at 12:30:19PM +0300, v.shpilevoy@tarantool.org wrote:
>
> >
> > I doubt we need to designate text pushes at all. IMO they are useful
> > only for printing text to the user console. I suggest you disable
> > the on_push callback if net_box is operating in the 'console' mode,
> > instead just append pushes to the output, without a prefix.
>
> On_push can be used from netbox, where pushed message will finish request, if it has no
> prefix - it just can not be distinguished from a final response.
For the record: Kostja suggested to use Yaml tags for this.
>
> >
> >> diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
> >> +/**
> >> + * Search for IPROTO_PUSH key in a MessagePack encoded response
> >> + * body. It is needed without entire message decoding, when a user
> >> + * wants to store raw responses and pushes in its own buffer.
> >> + */
> >> +static int
> >> +netbox_body_is_push(struct lua_State *L)
> >> +{
> >> + uint32_t ctypeid;
> >> + const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
> >> + assert(ctypeid == luaL_ctypeid(L, "char *"));
> >> + assert(mp_typeof(*body) == MP_MAP);
> >> + lua_pushboolean(L, mp_decode_map(&body) == 1 &&
> >> + mp_typeof(*body) == MP_UINT &&
> >> + mp_decode_uint(&body) == IPROTO_PUSH);
> >> + return 1;
> >> +}
> >> +
> >
> > Can't you do this check in net_box.lua, without involving C?
>
> No, messagepack Lua api does not allow to decode a part of a message.
OK.
>
> >
> >> diff --git a/src/box/lua/session.c b/src/box/lua/session.c
> >> +/**
> >> + * Write @a text into @a fd in a blocking mode, ignoring transient
> >> + * socket errors.
> >> + * @param fd Console descriptor.
> >> + * @param text Text to send.
> >> + * @param len Length of @a text.
> >> + */
> >> +static inline int
> >> +console_do_push(int fd, const char *text, uint32_t len)
> >> +{
> >> + while (len > 0) {
> >> + int written = fio_write_silent(fd, text, len);
> >
> > I don't think that using a blocking function here is acceptable
> > (AFAICS fio_write_silent() calls the write syscall on session fd).
>
> It is acceptable since it is blocking in the original Lua code. See
> console.lua and socket.lua.
OK.
>
> >> return 0;
> >> +}
> >
> >> +/**
> >> + * Push a message using a protocol, depending on a session type.
> >> + * @param data Data to push, first argument on a stack.
> >> + * @param opts Options. Now requires a single possible option -
> >> + * sync. Second argument on a stack.
> >> + */
> >> +static int
> >> +lbox_session_push(struct lua_State *L)
> >> +{
> >> + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
> >> +usage_error:
> >> + return luaL_error(L, "Usage: box.session.push(data, opts)");
> >
> > I don't think that we should oblige the user to pass the 'sync' value
> > explicitly - this would be really annoying. I think we should save the
> > sync somehow (fiber local storage, request?) and pass it implicitly.
>
> In the push ticket on Github you can see a discussion about this
> question. And it appeared, that passing sync explicitly is the only
> way to do push correctly. We can store it neither in fiber
> (encapsulation violation) nor in Lua in some hidden variable (a tried,
> many many times) nor use session sync (it can be changed after yield).
> So passing sync explicitly is ok. It is approved by Kostja.
But it's ugly... 'sync' is a part of the protocol, it shouldn't be
exposed to the user. Still I think we'd better use a fiber-local
variable for this.
>
> >> + }
> >> + lua_getfield(L, 2, "sync");
> >> + if (! lua_isnumber(L, 3))
> >> + goto usage_error;
> >> + double lua_sync = lua_tonumber(L, 3);
> >> + lua_pop(L, 1);
> >> + uint64_t sync = (uint64_t) lua_sync;
> >> + if (lua_sync != sync)
> >> + goto usage_error;
> >> + struct lua_push_port port;
> >> + port.vtab = &lua_push_port_vtab;
> >> + port.L = L;
> >> + /*
> >> + * Pop the opts - they must not be pushed. Leave only data
> >> + * on a stack.
> >> + */
> >> + lua_remove(L, 2);
> >> + if (session_push(current_session(), sync, (struct port *) &port) != 0) {
> >> + return luaT_error(L);
> >> + } else {
> >> + lua_pushboolean(L, true);
> >> + return 1;
> >
> > What's the point in returning 'true' on success?
>
> What is alternative?
Return nothing.
>
> >
> >> + }
> >> +}
> >
> >> diff --git a/src/box/port.h b/src/box/port.h
> >> @@ -76,6 +76,11 @@ struct port_vtab {
> >> * format.
> >> */
> >> int (*dump_16)(struct port *port, struct obuf *out);
> >> + /**
> >> + * Same as dump, but find a memory for an output buffer
> >> + * for itself.
> >> + */
> >> + const char *(*dump_raw)(struct port *port, uint32_t *size);
> >
> > Somehow this doesn't feel right. May be, we should encode Lua stack in
> > msgpack first, and then re-encode it to Yaml. May be, we shouldn't use
> > the 'port' at all. May be, I'm being too picky, and we should leave it
> > as is. Anyway, please think of alternatives.
>
> I already have spent many time on alternatives, and it appeared, that a port
> is the most useful way. Formatting to a message pack and back to YAML is memory
> and CPU overhead, and moreover if we format it in a message pack, we are forced to
> use region for encoded data, because console has no obuf, and into IProto this region
> must be copied. Now for IProto pushes a message is encoded directly in obuf, with no
> multiple coping.
What if we implemented yaml_stream, similar to mpstream we use for
encoding Lua objects in MsgPack, and use it here. Then all port methods
would look consistent.
Anyway, dump_raw is apparently not a very good name, because the
function actually encodes the result in Yaml. What about dump_plain?
Also, the comment is misleading - dump and dump_raw are quite different,
not only by the way they allocate buffer, but also how they present the
result.
>
> >
> >> diff --git a/src/box/xrow.c b/src/box/xrow.c
> >> @@ -43,6 +43,9 @@
> >> #include "scramble.h"
> >> #include "iproto_constants.h"
> >>
> >> +static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
> >> + "encoded IPROTO_BODY keys must fit into one byte");
> >> +
> >
> > Why check this now? IPROTO_DATA and IPROTO_PUSH can't occasionally
> > change as they are defined in the binary protocol so there's no point
> > in this static assertion IMO.
>
> I thought that it looks more clear for a newbie, who reads IProto
> code. For me at the beginning it was very unclear why we are sure,
> that codes can fit into one byte, and how iproto_body/header_bin work.
> But if you think, that it is bad idea, I can delete it.
I like when a patch has as small of a footprint as possible - it's easer
to review it then. So please remove.
>
> >
> >> int
> >> xrow_header_decode(struct xrow_header *header, const char **pos,
> >> const char *end)
> >> @@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
> >> uint32_t v_data_len; /* string length of array size */
> >> };
> >>
> >> +static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
> >> + IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
> >> +
> >> static const struct iproto_body_bin iproto_body_bin = {
> >> 0x81, IPROTO_DATA, 0xdd, 0
> >> };
> >> @@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
> >> 0x81, IPROTO_ERROR, 0xdb, 0
> >> };
> >>
> >> +struct PACKED iproto_body_push_bin {
> >> + uint8_t m_body; /* MP_MAP */
> >> + uint8_t k_data; /* IPROTO_PUSH */
> >> + uint8_t v_data; /* 1-size MP_ARRAY */
> >> +};
> >
> > Why don't you just reuse iproto_body_bin for this?
>
> IProto push message body header requires just 3 bytes, while response body requires 7.
I don't think these 4 bytes per push are worth the extra code.
^ permalink raw reply [flat|nested] 20+ messages in thread
end of thread, other threads:[~2018-03-21 12:25 UTC | newest]
Thread overview: 20+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-19 13:34 ` [PATCH 1/5] session: forbid creation from Lua binary and applier sessions Vladislav Shpilevoy
2018-03-20 13:20 ` Vladimir Davydov
2018-03-20 13:46 ` v.shpilevoy
2018-03-19 13:34 ` [PATCH 2/5] lua: port console yaml formatting to C Vladislav Shpilevoy
2018-03-20 17:51 ` Vladimir Davydov
2018-03-20 18:04 ` v.shpilevoy
2018-03-21 9:14 ` Vladimir Davydov
2018-03-21 9:30 ` v.shpilevoy
2018-03-19 13:34 ` [PATCH 3/5] Remove empty function declaration Vladislav Shpilevoy
2018-03-20 17:55 ` Vladimir Davydov
2018-03-20 17:57 ` [tarantool-patches] " v.shpilevoy
2018-03-21 9:16 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 4/5] session: introduce session_owner Vladislav Shpilevoy
2018-03-20 18:29 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 5/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-21 9:10 ` Vladimir Davydov
2018-03-21 9:30 ` [tarantool-patches] " v.shpilevoy
2018-03-21 12:25 ` Vladimir Davydov
2018-03-19 13:41 ` [tarantool-patches] [PATCH 0/5] " v.shpilevoy
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox