* [PATCH 4/5] session: introduce session_owner
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
` (2 preceding siblings ...)
2018-03-19 13:34 ` [PATCH 3/5] Remove empty function declaration Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-20 18:29 ` Vladimir Davydov
2018-03-19 13:34 ` [PATCH 5/5] session: introduce box.session.push Vladislav Shpilevoy
2018-03-19 13:41 ` [tarantool-patches] [PATCH 0/5] " v.shpilevoy
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Session owner stores a session type specific data. For example,
IProto session has authentication salt, console session has
file descriptor.
For #2677 session owner of IProto and console will have push()
virtual function to do box.session.push, which implementation
depends on a session type.
Needed for #2677
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/applier.cc | 4 ++-
src/box/authentication.cc | 4 +--
src/box/authentication.h | 3 +-
src/box/box.cc | 4 +--
src/box/box.h | 2 +-
src/box/iproto.cc | 86 +++++++++++++++++++++++++++++++++++++++------
src/box/lua/session.c | 73 +++++++++++++++++++++++++++++++++-----
src/box/session.cc | 72 +++++++++++++++++++++++++++++++++-----
src/box/session.h | 89 ++++++++++++++++++++++++++++++++++++++++-------
src/box/vinyl.c | 3 +-
10 files changed, 293 insertions(+), 47 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 6bfe5a99a..581139509 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -533,7 +533,9 @@ applier_f(va_list ap)
* Set correct session type for use in on_replace()
* triggers.
*/
- current_session()->type = SESSION_TYPE_APPLIER;
+ struct session_owner applier_owner;
+ session_owner_create(&applier_owner, SESSION_TYPE_APPLIER);
+ session_set_owner(current_session(), &applier_owner);
/* Re-connect loop */
while (!fiber_is_cancelled()) {
diff --git a/src/box/authentication.cc b/src/box/authentication.cc
index fef549c55..811974cb9 100644
--- a/src/box/authentication.cc
+++ b/src/box/authentication.cc
@@ -37,7 +37,7 @@
static char zero_hash[SCRAMBLE_SIZE];
void
-authenticate(const char *user_name, uint32_t len,
+authenticate(const char *user_name, uint32_t len, const char *salt,
const char *tuple)
{
struct user *user = user_find_by_name_xc(user_name, len);
@@ -84,7 +84,7 @@ authenticate(const char *user_name, uint32_t len,
"invalid scramble size");
}
- if (scramble_check(scramble, session->salt, user->def->hash2)) {
+ if (scramble_check(scramble, salt, user->def->hash2)) {
auth_res.is_authenticated = false;
if (session_run_on_auth_triggers(&auth_res) != 0)
diag_raise();
diff --git a/src/box/authentication.h b/src/box/authentication.h
index e91fe0a0e..9935e3548 100644
--- a/src/box/authentication.h
+++ b/src/box/authentication.h
@@ -45,6 +45,7 @@ struct on_auth_trigger_ctx {
void
-authenticate(const char *user_name, uint32_t len, const char *tuple);
+authenticate(const char *user_name, uint32_t len, const char *salt,
+ const char *tuple);
#endif /* INCLUDES_TARANTOOL_BOX_AUTHENTICATION_H */
diff --git a/src/box/box.cc b/src/box/box.cc
index cb3199624..9fb85c04f 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1233,7 +1233,7 @@ box_on_join(const tt_uuid *instance_uuid)
}
void
-box_process_auth(struct auth_request *request)
+box_process_auth(struct auth_request *request, const char *salt)
{
rmean_collect(rmean_box, IPROTO_AUTH, 1);
@@ -1243,7 +1243,7 @@ box_process_auth(struct auth_request *request)
const char *user = request->user_name;
uint32_t len = mp_decode_strl(&user);
- authenticate(user, len, request->scramble);
+ authenticate(user, len, salt, request->scramble);
}
void
diff --git a/src/box/box.h b/src/box/box.h
index c9b5aad01..84899cc13 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -150,7 +150,7 @@ box_reset_stat(void);
} /* extern "C" */
void
-box_process_auth(struct auth_request *request);
+box_process_auth(struct auth_request *request, const char *salt);
void
box_process_join(struct ev_io *io, struct xrow_header *header);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index a75127a90..ad4b1a757 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -59,6 +59,63 @@
#include "replication.h" /* instance_uuid */
#include "iproto_constants.h"
#include "rmean.h"
+#include "random.h"
+
+enum { IPROTO_SALT_SIZE = 32 };
+
+/** Owner of binary IProto sessions. */
+struct iproto_session_owner {
+ struct session_owner base;
+ /** Authentication salt. */
+ char salt[IPROTO_SALT_SIZE];
+ /** IProto connection. */
+ struct iproto_connection *connection;
+};
+
+static struct session_owner *
+iproto_session_owner_dup(struct session_owner *owner);
+
+static int
+iproto_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab iproto_session_owner_vtab = {
+ /* .dup = */ iproto_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ iproto_session_owner_fd,
+};
+
+static struct session_owner *
+iproto_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &iproto_session_owner_vtab);
+ size_t size = sizeof(struct iproto_session_owner);
+ struct session_owner *dup = (struct session_owner *) malloc(size);
+ if (dup == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "iproto_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, size);
+ return dup;
+}
+
+static inline void
+iproto_session_owner_create(struct iproto_session_owner *owner,
+ struct iproto_connection *connection)
+{
+ owner->base.type = SESSION_TYPE_BINARY;
+ owner->base.vtab = &iproto_session_owner_vtab;
+ owner->connection = connection;
+ random_bytes(owner->salt, IPROTO_SALT_SIZE);
+}
+
+static inline char *
+iproto_session_salt(struct session *session)
+{
+ struct iproto_session_owner *session_owner =
+ (struct iproto_session_owner *) session->owner;
+ assert(session_owner->base.vtab == &iproto_session_owner_vtab);
+ return session_owner->salt;
+}
/* The number of iproto messages in flight */
enum { IPROTO_MSG_MAX = 768 };
@@ -368,6 +425,15 @@ struct iproto_connection
static struct mempool iproto_connection_pool;
static RLIST_HEAD(stopped_connections);
+static int
+iproto_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &iproto_session_owner_vtab);
+ const struct iproto_session_owner *session_owner =
+ ((const struct iproto_session_owner *) owner);
+ return session_owner->connection->input.fd;
+}
+
/**
* Return true if we have not enough spare messages
* in the message pool. Disconnect messages are
@@ -1033,11 +1099,6 @@ tx_process_disconnect(struct cmsg *m)
struct iproto_connection *con = msg->connection;
if (con->session) {
tx_fiber_init(con->session, 0);
- /*
- * The socket is already closed in iproto thread,
- * prevent box.session.peer() from using it.
- */
- con->session->fd = -1;
if (! rlist_empty(&session_on_disconnect))
session_run_on_disconnect_triggers(con->session);
session_destroy(con->session);
@@ -1328,8 +1389,9 @@ tx_process_misc(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct obuf *out = msg->connection->tx.p_obuf;
+ struct session *session = msg->connection->session;
- tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_fiber_init(session, msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1337,7 +1399,8 @@ tx_process_misc(struct cmsg *m)
try {
switch (msg->header.type) {
case IPROTO_AUTH:
- box_process_auth(&msg->auth);
+ box_process_auth(&msg->auth,
+ iproto_session_salt(session));
iproto_reply_ok_xc(out, msg->header.sync,
::schema_version);
break;
@@ -1481,15 +1544,18 @@ tx_process_connect(struct cmsg *m)
struct iproto_connection *con = msg->connection;
struct obuf *out = msg->connection->tx.p_obuf;
try { /* connect. */
- con->session = session_create(con->input.fd, SESSION_TYPE_BINARY);
+ struct iproto_session_owner owner;
+ iproto_session_owner_create(&owner, con);
+ con->session = session_create((struct session_owner *) &owner);
if (con->session == NULL)
diag_raise();
tx_fiber_init(con->session, 0);
static __thread char greeting[IPROTO_GREETING_SIZE];
/* TODO: dirty read from tx thread */
struct tt_uuid uuid = INSTANCE_UUID;
- greeting_encode(greeting, tarantool_version_id(),
- &uuid, con->session->salt, SESSION_SEED_SIZE);
+ greeting_encode(greeting, tarantool_version_id(), &uuid,
+ iproto_session_salt(con->session),
+ IPROTO_SALT_SIZE);
obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
if (! rlist_empty(&session_on_connect)) {
if (session_run_on_connect_triggers(con->session) != 0)
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index d8e91bf1f..af8411068 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -42,6 +42,54 @@
#include "box/user.h"
#include "box/schema.h"
+/** Owner of a console session. */
+struct console_session_owner {
+ struct session_owner base;
+ /** Console socket descriptor. Expects text data. */
+ int fd;
+};
+
+static struct session_owner *
+console_session_owner_dup(struct session_owner *owner);
+
+static int
+console_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab console_session_owner_vtab = {
+ /* .dup = */ console_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ console_session_owner_fd,
+};
+
+static struct session_owner *
+console_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &console_session_owner_vtab);
+ size_t size = sizeof(struct console_session_owner);
+ struct session_owner *dup = (struct session_owner *) malloc(size);
+ if (dup == NULL) {
+ diag_set(OutOfMemory, size, "malloc", "console_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, size);
+ return dup;
+}
+
+static int
+console_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &console_session_owner_vtab);
+ return ((const struct console_session_owner *) owner)->fd;
+}
+
+static inline void
+console_session_owner_create(struct console_session_owner *owner, int fd)
+{
+ owner->base.type = SESSION_TYPE_CONSOLE;
+ owner->base.vtab = &console_session_owner_vtab;
+ owner->fd = fd;
+}
+
static const char *sessionlib_name = "box.session";
/* Create session and pin it to fiber */
@@ -56,14 +104,24 @@ lbox_session_create(struct lua_State *L)
"session from Lua");
}
struct session *session = fiber_get_session(fiber());
+ int fd = luaL_optinteger(L, 1, -1);
if (session == NULL) {
- int fd = luaL_optinteger(L, 1, -1);
- session = session_create_on_demand(fd);
+ struct session_owner owner;
+ session_owner_create(&owner, type);
+ session = session_create_on_demand(&owner);
if (session == NULL)
return luaT_error(L);
}
- /* If a session already exists, simply reset its type */
- session->type = type;
+ /* If a session already exists, simply reset its owner. */
+ if (type == SESSION_TYPE_CONSOLE) {
+ struct console_session_owner owner;
+ console_session_owner_create(&owner, fd);
+ session_set_owner(session, (struct session_owner *) &owner);
+ } else {
+ struct session_owner owner;
+ session_owner_create(&owner, type);
+ session_set_owner(session, &owner);
+ }
lua_pushnumber(L, session->id);
return 1;
}
@@ -90,7 +148,7 @@ lbox_session_id(struct lua_State *L)
static int
lbox_session_type(struct lua_State *L)
{
- lua_pushstring(L, session_type_strs[current_session()->type]);
+ lua_pushstring(L, session_type_strs[session_type(current_session())]);
return 1;
}
@@ -237,7 +295,7 @@ lbox_session_fd(struct lua_State *L)
struct session *session = session_find(sid);
if (session == NULL)
luaL_error(L, "session.fd(): session does not exist");
- lua_pushinteger(L, session->fd);
+ lua_pushinteger(L, session_fd(session));
return 1;
}
@@ -251,7 +309,6 @@ lbox_session_peer(struct lua_State *L)
if (lua_gettop(L) > 1)
luaL_error(L, "session.peer(sid): bad arguments");
- int fd;
struct session *session;
if (lua_gettop(L) == 1)
session = session_find(luaL_checkint(L, 1));
@@ -259,7 +316,7 @@ lbox_session_peer(struct lua_State *L)
session = current_session();
if (session == NULL)
luaL_error(L, "session.peer(): session does not exist");
- fd = session->fd;
+ int fd = session_fd(session);
if (fd < 0) {
lua_pushnil(L); /* no associated peer */
return 1;
diff --git a/src/box/session.cc b/src/box/session.cc
index 0b0c5ae44..908ec9c4e 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -33,7 +33,6 @@
#include "memory.h"
#include "assoc.h"
#include "trigger.h"
-#include "random.h"
#include "user.h"
#include "error.h"
@@ -54,6 +53,48 @@ RLIST_HEAD(session_on_connect);
RLIST_HEAD(session_on_disconnect);
RLIST_HEAD(session_on_auth);
+static struct session_owner *
+generic_session_owner_dup(struct session_owner *owner);
+
+static int
+generic_session_owner_fd(const struct session_owner *owner);
+
+static const struct session_owner_vtab generic_session_owner_vtab = {
+ /* .dup = */ generic_session_owner_dup,
+ /* .delete = */ (void (*)(struct session_owner *)) free,
+ /* .fd = */ generic_session_owner_fd,
+};
+
+static struct session_owner *
+generic_session_owner_dup(struct session_owner *owner)
+{
+ assert(owner->vtab == &generic_session_owner_vtab);
+ struct session_owner *dup =
+ (struct session_owner *) malloc(sizeof(*dup));
+ if (dup == NULL) {
+ diag_set(OutOfMemory, sizeof(*dup), "malloc",
+ "default_session_owner");
+ return NULL;
+ }
+ memcpy(dup, owner, sizeof(*dup));
+ return dup;
+}
+
+static int
+generic_session_owner_fd(const struct session_owner *owner)
+{
+ assert(owner->vtab == &generic_session_owner_vtab);
+ (void) owner;
+ return -1;
+}
+
+void
+session_owner_create(struct session_owner *owner, enum session_type type)
+{
+ owner->type = type;
+ owner->vtab = &generic_session_owner_vtab;
+}
+
static inline uint64_t
sid_max()
{
@@ -80,7 +121,7 @@ session_on_stop(struct trigger *trigger, void * /* event */)
}
struct session *
-session_create(int fd, enum session_type type)
+session_create(struct session_owner *owner)
{
struct session *session =
(struct session *) mempool_alloc(&session_pool);
@@ -90,14 +131,15 @@ session_create(int fd, enum session_type type)
return NULL;
}
session->id = sid_max();
- session->fd = fd;
+ session->owner = session_owner_dup(owner);
+ if (session->owner == NULL) {
+ mempool_free(&session_pool, session);
+ return NULL;
+ }
session->sync = 0;
- session->type = type;
/* For on_connect triggers. */
credentials_init(&session->credentials, guest_user->auth_token,
guest_user->def->uid);
- if (fd >= 0)
- random_bytes(session->salt, SESSION_SEED_SIZE);
struct mh_i64ptr_node_t node;
node.key = session->id;
node.val = session;
@@ -105,6 +147,7 @@ session_create(int fd, enum session_type type)
mh_int_t k = mh_i64ptr_put(session_registry, &node, NULL, NULL);
if (k == mh_end(session_registry)) {
+ session_owner_delete(owner);
mempool_free(&session_pool, session);
diag_set(OutOfMemory, 0, "session hash", "new session");
return NULL;
@@ -112,13 +155,25 @@ session_create(int fd, enum session_type type)
return session;
}
+int
+session_set_owner(struct session *session, struct session_owner *new_owner)
+{
+ struct session_owner *dup = session_owner_dup(new_owner);
+ if (dup == NULL)
+ return -1;
+ if (session->owner != NULL)
+ session_owner_delete(session->owner);
+ session->owner = dup;
+ return 0;
+}
+
struct session *
-session_create_on_demand(int fd)
+session_create_on_demand(struct session_owner *owner)
{
assert(fiber_get_session(fiber()) == NULL);
/* Create session on demand */
- struct session *s = session_create(fd, SESSION_TYPE_BACKGROUND);
+ struct session *s = session_create(owner);
if (s == NULL)
return NULL;
s->fiber_on_stop = {
@@ -186,6 +241,7 @@ session_destroy(struct session *session)
{
struct mh_i64ptr_node_t node = { session->id, NULL };
mh_i64ptr_remove(session_registry, &node, NULL);
+ session_owner_delete(session->owner);
mempool_free(&session_pool, session);
}
diff --git a/src/box/session.h b/src/box/session.h
index 4f9235ea8..105dcab17 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -47,8 +47,6 @@ session_init();
void
session_free();
-enum { SESSION_SEED_SIZE = 32, SESSION_DELIM_SIZE = 16 };
-
enum session_type {
SESSION_TYPE_BACKGROUND = 0,
SESSION_TYPE_BINARY,
@@ -60,6 +58,49 @@ enum session_type {
extern const char *session_type_strs[];
+struct session_owner_vtab;
+
+/**
+ * Object to store session type specific data. For example, IProto
+ * stores iproto_connection, console stores file descriptor.
+ */
+struct session_owner {
+ /** Session type. */
+ enum session_type type;
+ /** Virtual session owner methods. */
+ const struct session_owner_vtab *vtab;
+};
+
+struct session_owner_vtab {
+ /** Allocate a duplicate of an owner. */
+ struct session_owner *(*dup)(struct session_owner *);
+ /** Destroy an owner, and free its memory. */
+ void (*free)(struct session_owner *);
+ /** Get the descriptor of an owner, if has. Else -1. */
+ int (*fd)(const struct session_owner *);
+};
+
+static inline struct session_owner *
+session_owner_dup(struct session_owner *owner)
+{
+ return owner->vtab->dup(owner);
+}
+
+static inline void
+session_owner_delete(struct session_owner *owner)
+{
+ owner->vtab->free(owner);
+}
+
+/**
+ * Initialize a session owner with @a type and with default
+ * virtual methods.
+ * @param owner Session owner to initialize. Is copied inside.
+ * @param type Session type.
+ */
+void
+session_owner_create(struct session_owner *owner, enum session_type type);
+
/**
* Abstraction of a single user session:
* for now, only provides accounting of established
@@ -72,10 +113,8 @@ extern const char *session_type_strs[];
struct session {
/** Session id. */
uint64_t id;
- /** File descriptor - socket of the connected peer.
- * Only if the session has a peer.
- */
- int fd;
+ /** Session owner with type specific data. */
+ struct session_owner *owner;
/**
* For iproto requests, we set this field
* to the value of packet sync. Since the
@@ -85,15 +124,24 @@ struct session {
* the first yield.
*/
uint64_t sync;
- enum session_type type;
- /** Authentication salt. */
- char salt[SESSION_SEED_SIZE];
/** Session user id and global grants */
struct credentials credentials;
/** Trigger for fiber on_stop to cleanup created on-demand session */
struct trigger fiber_on_stop;
};
+static inline enum session_type
+session_type(const struct session *session)
+{
+ return session->owner->type;
+}
+
+static inline int
+session_fd(const struct session *session)
+{
+ return session->owner->vtab->fd(session->owner);
+}
+
/**
* Find a session by id.
*/
@@ -154,7 +202,7 @@ extern struct credentials admin_credentials;
* trigger to destroy it when this fiber ends.
*/
struct session *
-session_create_on_demand(int fd);
+session_create_on_demand(struct session_owner *owner);
/*
* When creating a new fiber, the database (box)
@@ -171,7 +219,9 @@ current_session()
{
struct session *session = fiber_get_session(fiber());
if (session == NULL) {
- session = session_create_on_demand(-1);
+ struct session_owner owner;
+ session_owner_create(&owner, SESSION_TYPE_BACKGROUND);
+ session = session_create_on_demand(&owner);
if (session == NULL)
diag_raise();
}
@@ -191,7 +241,9 @@ effective_user()
(struct credentials *) fiber_get_key(fiber(),
FIBER_KEY_USER);
if (u == NULL) {
- session_create_on_demand(-1);
+ struct session_owner owner;
+ session_owner_create(&owner, SESSION_TYPE_BACKGROUND);
+ session_create_on_demand(&owner);
u = (struct credentials *) fiber_get_key(fiber(),
FIBER_KEY_USER);
}
@@ -216,7 +268,18 @@ session_storage_cleanup(int sid);
* trigger fails or runs out of resources.
*/
struct session *
-session_create(int fd, enum session_type type);
+session_create(struct session_owner *owner);
+
+/**
+ * Set new owner of a session.
+ * @param session Session to change owner.
+ * @param new_owner New session owner. Is duplicated inside.
+ *
+ * @retval -1 Memory error.
+ * @retval 0 Success.
+ */
+int
+session_set_owner(struct session *session, struct session_owner *new_owner);
/**
* Destroy a session.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index e0c30757c..7ae12f597 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2453,7 +2453,8 @@ vinyl_engine_prepare(struct engine *engine, struct txn *txn)
* available for the admin to track the lag so let the applier
* wait as long as necessary for memory dump to complete.
*/
- double timeout = (current_session()->type != SESSION_TYPE_APPLIER ?
+ double timeout = (session_type(current_session()) !=
+ SESSION_TYPE_APPLIER ?
env->timeout : TIMEOUT_INFINITY);
/*
* Reserve quota needed by the transaction before allocating
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread
* [PATCH 5/5] session: introduce box.session.push
2018-03-19 13:34 [PATCH 0/5] session: introduce box.session.push Vladislav Shpilevoy
` (3 preceding siblings ...)
2018-03-19 13:34 ` [PATCH 4/5] session: introduce session_owner Vladislav Shpilevoy
@ 2018-03-19 13:34 ` Vladislav Shpilevoy
2018-03-21 9:10 ` Vladimir Davydov
2018-03-19 13:41 ` [tarantool-patches] [PATCH 0/5] " v.shpilevoy
5 siblings, 1 reply; 20+ messages in thread
From: Vladislav Shpilevoy @ 2018-03-19 13:34 UTC (permalink / raw)
To: tarantool-patches; +Cc: vdavydov.dev, Vladislav Shpilevoy
Box.session.push() allows to send a message to a client with no
finishing a main request.
Tarantool supports two push types: via text protocol and via
binary protocol. Text protocol push message contains "push:"
prefix followed by a YAML formatted text, where as a final
response always has '---' prefix. To catch pushed messages on a
client side use console.connect() on_push options which takes
a function with a single argument - message.
IProto message is encoded just like a regular response, but
instead of IPROTO_DATA it has IPROTO_PUSH with a single element -
pushed MessagePack encoded data.
Text push is trivial - it is just blocking write into a socket
with a prefix. Binary push is more complex.
TX thread to notify IProto thread about new data in obuf sends
a message 'push_msg'. IProto thread, got this message, notifies
libev about new data, and then sends 'push_msg' back with
updated write position. TX thread, received the message back,
updates its version of a write position. If IProto will not send
a write position, then TX will write to the same obuf again
and again, because it can not know that IProto already flushed
another obuf.
To avoid multiple 'push_msg' in fly between IProto and TX, the
only one 'push_msg' per connection is used. To deliver pushes,
appeared when 'push_msg' was in fly, TX thread sets a flag every
time when sees, that 'push_msg' is sent, and there is a new push.
When 'push_msg' returns, it checks this flag, and if it is set,
then the IProto is notified again.
Closes #2677
Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
---
src/box/iproto.cc | 235 +++++++++++++++++---
src/box/iproto_constants.c | 3 +-
src/box/iproto_constants.h | 8 +
src/box/lua/call.c | 1 +
src/box/lua/console.c | 2 +-
src/box/lua/console.h | 8 +
src/box/lua/console.lua | 6 +-
src/box/lua/net_box.c | 37 ++++
src/box/lua/net_box.lua | 97 ++++++--
src/box/lua/session.c | 171 +++++++++++++++
src/box/port.c | 7 +
src/box/port.h | 15 ++
src/box/session.cc | 14 ++
src/box/session.h | 17 ++
src/box/xrow.c | 40 +++-
src/box/xrow.h | 26 ++-
src/fio.c | 12 +
src/fio.h | 16 ++
test/app-tap/console.test.lua | 9 +-
test/box/net.box.result | 2 +-
test/box/net.box.test.lua | 2 +-
test/box/push.result | 364 +++++++++++++++++++++++++++++++
test/box/push.test.lua | 163 ++++++++++++++
test/replication/before_replace.result | 14 ++
test/replication/before_replace.test.lua | 11 +
25 files changed, 1211 insertions(+), 69 deletions(-)
create mode 100644 test/box/push.result
create mode 100644 test/box/push.test.lua
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index ad4b1a757..4408293c1 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,6 +63,45 @@
enum { IPROTO_SALT_SIZE = 32 };
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+ struct obuf *obuf;
+ struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+ wpos->obuf = out;
+ wpos->svp = obuf_create_svp(out);
+}
+
+/* {{{ IPROTO_PUSH declarations. */
+
+/**
+ * Message to notify IProto thread about new data in an output
+ * buffer. Struct iproto_msg is not used here, because push
+ * notification can be much more compact: it does not have
+ * request, ibuf, length, flags ...
+ */
+struct iproto_push_msg {
+ struct cmsg base;
+ /**
+ * Before sending to IProto thread, the wpos is set to a
+ * current position in an output buffer. Before IProto
+ * returns the message to TX, it sets wpos to the last
+ * flushed position (works like iproto_msg.wpos).
+ */
+ struct iproto_wpos wpos;
+ /** IProto connection to push into. */
+ struct iproto_connection *connection;
+};
+
/** Owner of binary IProto sessions. */
struct iproto_session_owner {
struct session_owner base;
@@ -70,6 +109,37 @@ struct iproto_session_owner {
char salt[IPROTO_SALT_SIZE];
/** IProto connection. */
struct iproto_connection *connection;
+ /**
+ * Is_push_in_progress is set, when a push_msg is sent to
+ * IProto thread, and reset, when the message is returned
+ * to TX. If a new push sees, that a push_msg is already
+ * sent to IProto, then has_new_pushes is set. After push
+ * notification is returned to TX, it checks
+ * has_new_pushes. If it is set, then the notification is
+ * sent again. This ping-pong continues, until TX stopped
+ * pushing. It allows to
+ * 1) avoid multiple push_msg from one session in fly,
+ * 2) do not block push() until a previous push() is
+ * finished.
+ *
+ * IProto TX
+ * -------------------------------------------------------
+ * + [push message]
+ * start socket <--- notification ----
+ * write
+ * + [push message]
+ * + [push message]
+ * ...
+ * end socket
+ * write ----------------> check for new
+ * pushes - found
+ * <--- notification ---
+ * ....
+ */
+ bool has_new_pushes;
+ bool is_push_in_progress;
+ /** Push notification for IProto thread. */
+ struct iproto_push_msg push_msg;
};
static struct session_owner *
@@ -78,10 +148,27 @@ iproto_session_owner_dup(struct session_owner *owner);
static int
iproto_session_owner_fd(const struct session_owner *owner);
+/**
+ * Push a message from @a port to a remote client.
+ * @param owner IProto session owner.
+ * @param sync Message sync. Must be the same as a request sync to
+ * be able to detect their tie on a client side.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval 0 Success, a message is wrote to an output buffer. But
+ * it is not guaranteed, that it will be sent
+ * successfully.
+ */
+static int
+iproto_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static const struct session_owner_vtab iproto_session_owner_vtab = {
/* .dup = */ iproto_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ iproto_session_owner_fd,
+ /* .push = */ iproto_session_owner_push,
};
static struct session_owner *
@@ -105,6 +192,9 @@ iproto_session_owner_create(struct iproto_session_owner *owner,
owner->base.type = SESSION_TYPE_BINARY;
owner->base.vtab = &iproto_session_owner_vtab;
owner->connection = connection;
+ owner->has_new_pushes = false;
+ owner->is_push_in_progress = false;
+ owner->push_msg.connection = connection;
random_bytes(owner->salt, IPROTO_SALT_SIZE);
}
@@ -117,6 +207,8 @@ iproto_session_salt(struct session *session)
return session_owner->salt;
}
+/** }}} */
+
/* The number of iproto messages in flight */
enum { IPROTO_MSG_MAX = 768 };
@@ -164,24 +256,6 @@ iproto_reset_input(struct ibuf *ibuf)
}
}
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
- struct obuf *obuf;
- struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
- wpos->obuf = out;
- wpos->svp = obuf_create_svp(out);
-}
-
/* {{{ iproto_msg - declaration */
/**
@@ -1168,15 +1242,16 @@ tx_discard_input(struct iproto_msg *msg)
* not, the empty buffer is selected.
* - if neither of the buffers are empty, the function
* does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ * thread.
*/
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_msg(struct iproto_connection *con, struct iproto_wpos *wpos)
{
- struct iproto_msg *msg = (struct iproto_msg *) m;
- struct iproto_connection *con = msg->connection;
-
struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
- if (msg->wpos.obuf == con->tx.p_obuf) {
+ if (wpos->obuf == con->tx.p_obuf) {
/*
* We got a message advancing the buffer which
* is being appended to. The previous buffer is
@@ -1194,7 +1269,6 @@ tx_accept_msg(struct cmsg *m)
*/
con->tx.p_obuf = prev;
}
- return msg;
}
/**
@@ -1217,7 +1291,8 @@ tx_reply_error(struct iproto_msg *msg)
static void
tx_reply_iproto_error(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
struct obuf *out = msg->connection->tx.p_obuf;
iproto_reply_error(out, diag_last_error(&msg->diag),
msg->header.sync, ::schema_version);
@@ -1227,8 +1302,8 @@ tx_reply_iproto_error(struct cmsg *m)
static void
tx_process1(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
- struct obuf *out = msg->connection->tx.p_obuf;
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
if (tx_check_schema(msg->header.schema_version))
@@ -1236,8 +1311,11 @@ tx_process1(struct cmsg *m)
struct tuple *tuple;
struct obuf_svp svp;
- if (box_process1(&msg->dml, &tuple) ||
- iproto_prepare_select(out, &svp))
+ struct obuf *out;
+ if (box_process1(&msg->dml, &tuple) != 0)
+ goto error;
+ out = msg->connection->tx.p_obuf;
+ if (iproto_prepare_select(out, &svp) != 0)
goto error;
if (tuple && tuple_to_obuf(tuple, out))
goto error;
@@ -1252,8 +1330,9 @@ error:
static void
tx_process_select(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
- struct obuf *out = msg->connection->tx.p_obuf;
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
+ struct obuf *out;
struct obuf_svp svp;
struct port port;
int count;
@@ -1270,6 +1349,7 @@ tx_process_select(struct cmsg *m)
req->key, req->key_end, &port);
if (rc < 0)
goto error;
+ out = msg->connection->tx.p_obuf;
if (iproto_prepare_select(out, &svp) != 0) {
port_destroy(&port);
goto error;
@@ -1305,7 +1385,8 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
static void
tx_process_call(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
@@ -1387,7 +1468,8 @@ error:
static void
tx_process_misc(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
+ tx_accept_msg(msg->connection, &msg->wpos);
struct obuf *out = msg->connection->tx.p_obuf;
struct session *session = msg->connection->session;
@@ -1428,8 +1510,9 @@ error:
static void
tx_process_join_subscribe(struct cmsg *m)
{
- struct iproto_msg *msg = tx_accept_msg(m);
+ struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
+ tx_accept_msg(con, &msg->wpos);
tx_fiber_init(con->session, msg->header.sync);
@@ -1612,6 +1695,88 @@ static const struct cmsg_hop connect_route[] = {
/** }}} */
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param owner IProto session owner.
+ */
+static void
+tx_begin_push(struct iproto_session_owner *owner);
+
+/**
+ * Create an event to send push.
+ * @param m IProto push message.
+ */
+static void
+net_push_msg(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_connection *con = msg->connection;
+ con->wend = msg->wpos;
+ msg->wpos = con->wpos;
+ if (evio_has_fd(&con->output) && !ev_is_active(&con->output))
+ ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * After a message notifies IProto thread about pushed data, TX
+ * thread can already have a new push in one of obufs. This
+ * function checks for new pushes and possibly re-sends push
+ * notification to IProto thread.
+ */
+static void
+tx_check_for_new_push(struct cmsg *m)
+{
+ struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+ struct iproto_session_owner *owner =
+ container_of(msg, struct iproto_session_owner, push_msg);
+ tx_accept_msg(msg->connection, &msg->wpos);
+ owner->is_push_in_progress = false;
+ if (owner->has_new_pushes)
+ tx_begin_push(owner);
+}
+
+static const struct cmsg_hop push_route[] = {
+ { net_push_msg, &tx_pipe },
+ { tx_check_for_new_push, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_session_owner *owner)
+{
+ assert(! owner->is_push_in_progress);
+ cmsg_init((struct cmsg *) &owner->push_msg, push_route);
+ iproto_wpos_create(&owner->push_msg.wpos, owner->connection->tx.p_obuf);
+ owner->has_new_pushes = false;
+ owner->is_push_in_progress = true;
+ cpipe_push(&net_pipe, (struct cmsg *) &owner->push_msg);
+}
+
+static int
+iproto_session_owner_push(struct session_owner *session_owner, uint64_t sync,
+ struct port *port)
+{
+ struct iproto_session_owner *owner =
+ (struct iproto_session_owner *) session_owner;
+ struct iproto_connection *con = owner->connection;
+ struct obuf_svp svp;
+ if (iproto_prepare_push(con->tx.p_obuf, &svp) != 0)
+ return -1;
+ if (port_dump(port, con->tx.p_obuf) != 0) {
+ obuf_rollback_to_svp(con->tx.p_obuf, &svp);
+ return -1;
+ }
+ iproto_reply_push(con->tx.p_obuf, &svp, sync, ::schema_version);
+ if (! owner->is_push_in_progress)
+ tx_begin_push(owner);
+ else
+ owner->has_new_pushes = true;
+ return 0;
+}
+
+/** }}} */
+
/**
* Create a connection and start input.
*/
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index cd7b1d03b..893275c7b 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -174,7 +174,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
NULL, /* 0x2e */
NULL, /* 0x2f */
"data", /* 0x30 */
- "error" /* 0x31 */
+ "error", /* 0x31 */
+ "push", /* 0x32 */
};
const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 951842485..05d262a0b 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -79,6 +79,14 @@ enum iproto_key {
/* Leave a gap between request keys and response keys */
IPROTO_DATA = 0x30,
IPROTO_ERROR = 0x31,
+ /**
+ * Tarantool supports two push types: binary and text.
+ * A text push can be distinguished from a response by a
+ * prefix "push:".
+ * Binary push is encoded using IPROTO_PUSH key in a
+ * message body, which replaces IPROTO_DATA.
+ */
+ IPROTO_PUSH = 0x32,
IPROTO_KEY_MAX
};
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index be13812aa..fc70ed430 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -418,6 +418,7 @@ port_lua_destroy(struct port *base)
static const struct port_vtab port_lua_vtab = {
.dump = port_lua_dump,
.dump_16 = port_lua_dump_16,
+ .dump_raw = NULL,
.destroy = port_lua_destroy,
};
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 450745c90..3bc4b6425 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -329,7 +329,7 @@ lbox_console_add_history(struct lua_State *L)
return 0;
}
-static int
+int
lbox_console_format(struct lua_State *L)
{
int arg_count = lua_gettop(L);
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..92a4af035 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -39,6 +39,14 @@ struct lua_State;
void
tarantool_lua_console_init(struct lua_State *L);
+/**
+ * Encode Lua object into YAML string.
+ * @param Lua object to encode on top of a stack.
+ * @retval Lua string.
+ */
+int
+lbox_console_format(struct lua_State *L);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index b4199ef85..295bfcbaa 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -59,7 +59,7 @@ end
--
-- Evaluate command on remote instance
--
-local function remote_eval(self, line)
+local function remote_eval(self, line, opts)
if not line or self.remote.state ~= 'active' then
local err = self.remote.error
self.remote:close()
@@ -74,7 +74,7 @@ local function remote_eval(self, line)
--
-- execute line
--
- local ok, res = pcall(self.remote.eval, self.remote, line)
+ local ok, res = pcall(self.remote.eval, self.remote, line, opts)
return ok and res or format(false, res)
end
@@ -310,7 +310,7 @@ local function connect(uri, opts)
-- override methods
self.remote = remote
- self.eval = remote_eval
+ self.eval = function(s, l) return remote_eval(s, l, {on_push = opts.on_push}) end
self.prompt = string.format("%s:%s", self.remote.host, self.remote.port)
self.completion = function (str, pos1, pos2)
local c = string.format(
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index db2d2dbb4..9fa7935f7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -554,6 +554,41 @@ handle_error:
return 2;
}
+/**
+ * Search for a "push:" prefix in a message, received from a
+ * server using a text protocol.
+ */
+static int
+netbox_text_is_push(struct lua_State *L)
+{
+ assert(lua_gettop(L) == 2);
+ uint32_t ctypeid;
+ const char *text = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == luaL_ctypeid(L, "char *"));
+ uint32_t len = (uint32_t) lua_tonumber(L, 2);
+ uint32_t push_len = strlen("push:");
+ lua_pushboolean(L, len >= 5 && memcmp(text, "push:", push_len) == 0);
+ return 1;
+}
+
+/**
+ * Search for IPROTO_PUSH key in a MessagePack encoded response
+ * body. It is needed without entire message decoding, when a user
+ * wants to store raw responses and pushes in its own buffer.
+ */
+static int
+netbox_body_is_push(struct lua_State *L)
+{
+ uint32_t ctypeid;
+ const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+ assert(ctypeid == luaL_ctypeid(L, "char *"));
+ assert(mp_typeof(*body) == MP_MAP);
+ lua_pushboolean(L, mp_decode_map(&body) == 1 &&
+ mp_typeof(*body) == MP_UINT &&
+ mp_decode_uint(&body) == IPROTO_PUSH);
+ return 1;
+}
+
int
luaopen_net_box(struct lua_State *L)
{
@@ -571,6 +606,8 @@ luaopen_net_box(struct lua_State *L)
{ "encode_auth", netbox_encode_auth },
{ "decode_greeting",netbox_decode_greeting },
{ "communicate", netbox_communicate },
+ { "text_is_push", netbox_text_is_push },
+ { "body_is_push", netbox_body_is_push },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 87c8c548b..7ef6a6a2d 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,8 @@ local communicate = internal.communicate
local encode_auth = internal.encode_auth
local encode_select = internal.encode_select
local decode_greeting = internal.decode_greeting
+local text_is_push = internal.text_is_push
+local body_is_push = internal.body_is_push
local sequence_mt = { __serialize = 'sequence' }
local TIMEOUT_INFINITY = 500 * 365 * 86400
@@ -38,6 +40,7 @@ local IPROTO_SYNC_KEY = 0x01
local IPROTO_SCHEMA_VERSION_KEY = 0x05
local IPROTO_DATA_KEY = 0x30
local IPROTO_ERROR_KEY = 0x31
+local IPROTO_PUSH_KEY = 0x32
local IPROTO_GREETING_SIZE = 128
-- select errors from box.error
@@ -229,7 +232,8 @@ local function create_transport(host, port, user, password, callback)
end
-- REQUEST/RESPONSE --
- local function perform_request(timeout, buffer, method, schema_version, ...)
+ local function perform_request(timeout, buffer, method, on_push,
+ schema_version, ...)
if state ~= 'active' then
return last_errno or E_NO_CONNECTION, last_error
end
@@ -242,7 +246,7 @@ local function create_transport(host, port, user, password, callback)
local id = next_request_id
method_codec[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
- local request = table_new(0, 6) -- reserve space for 6 keys
+ local request = table_new(0, 7) -- reserve space for 7 keys
request.client = fiber_self()
request.method = method
request.schema_version = schema_version
@@ -254,6 +258,17 @@ local function create_transport(host, port, user, password, callback)
requests[id] = nil
return E_TIMEOUT, 'Timeout exceeded'
end
+ if request.messages then
+ -- Multiple push messages can appear on a single
+ -- event loop iteration.
+ local messages = request.messages
+ request.messages = nil
+ if on_push then
+ for _, m in pairs(messages) do
+ on_push(m)
+ end
+ end
+ end
until requests[id] == nil -- i.e. completed (beware spurious wakeups)
return request.errno, request.response
end
@@ -270,12 +285,12 @@ local function create_transport(host, port, user, password, callback)
if request == nil then -- nobody is waiting for the response
return
end
- requests[id] = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
if status ~= 0 then
-- Handle errors
+ requests[id] = nil
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -290,7 +305,16 @@ local function create_transport(host, port, user, password, callback)
local body_len = body_end - body_rpos
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
- request.response = tonumber(body_len)
+ if body_is_push(body_rpos) then
+ if request.messages then
+ table.insert(request.messages, tonumber(body_len))
+ else
+ request.messages = {tonumber(body_len)}
+ end
+ else
+ requests[id] = nil
+ request.response = tonumber(body_len)
+ end
wakeup_client(request.client)
return
end
@@ -298,7 +322,18 @@ local function create_transport(host, port, user, password, callback)
-- Decode xrow.body[DATA] to Lua objects
body, body_end_check = decode(body_rpos)
assert(body_end == body_end_check, "invalid xrow length")
- request.response = body[IPROTO_DATA_KEY]
+ if body[IPROTO_PUSH_KEY] then
+ assert(#body[IPROTO_PUSH_KEY] == 1)
+ assert(not body[IPROTO_DATA_KEY])
+ if request.messages then
+ table.insert(request.messages, body[IPROTO_PUSH_KEY][1])
+ else
+ request.messages = {body[IPROTO_PUSH_KEY][1]}
+ end
+ else
+ requests[id] = nil
+ request.response = body[IPROTO_DATA_KEY]
+ end
wakeup_client(request.client)
end
@@ -345,9 +380,16 @@ local function create_transport(host, port, user, password, callback)
if err then
return err, delim_pos
else
- local response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+ local response
+ local is_push = text_is_push(recv_buf.rpos, delim_pos + #delim)
+ if not is_push then
+ response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+ else
+ -- 5 - len of 'push:' prefix of a message.
+ response = ffi.string(recv_buf.rpos + 5, delim_pos + #delim - 5)
+ end
recv_buf.rpos = recv_buf.rpos + delim_pos + #delim
- return nil, response
+ return nil, response, is_push
end
end
@@ -408,18 +450,26 @@ local function create_transport(host, port, user, password, callback)
console_sm = function(rid)
local delim = '\n...\n'
- local err, response = send_and_recv_console()
+ local err, response, is_push = send_and_recv_console()
if err then
return error_sm(err, response)
else
local request = requests[rid]
- if request == nil then -- nobody is waiting for the response
- return
+ if request then
+ if is_push then
+ -- In a console mode it is impossible to
+ -- get multiple pushes on a single event loop
+ -- iteration.
+ assert(not request.messages)
+ request.messages = {response}
+ else
+ requests[rid] = nil
+ request.response = response
+ rid = next_id(rid)
+ end
+ wakeup_client(request.client)
+ return console_sm(rid)
end
- requests[rid] = nil
- request.response = response
- wakeup_client(request.client)
- return console_sm(next_id(rid))
end
end
@@ -761,7 +811,8 @@ function remote_methods:_request(method, opts, ...)
timeout = deadline and max(0, deadline - fiber_clock())
end
err, res = perform_request(timeout, buffer, method,
- self.schema_version, ...)
+ opts and opts.on_push, self.schema_version,
+ ...)
if not err and buffer ~= nil then
return res -- the length of xrow.body
elseif not err then
@@ -791,7 +842,7 @@ function remote_methods:ping(opts)
timeout = deadline and max(0, deadline - fiber_clock())
or (opts and opts.timeout)
end
- local err = self._transport.perform_request(timeout, nil, 'ping',
+ local err = self._transport.perform_request(timeout, nil, 'ping', nil,
self.schema_version)
return not err or err == E_WRONG_SCHEMA_VERSION
end
@@ -956,8 +1007,16 @@ console_methods.on_disconnect = remote_methods.on_disconnect
console_methods.on_connect = remote_methods.on_connect
console_methods.is_connected = remote_methods.is_connected
console_methods.wait_state = remote_methods.wait_state
-function console_methods:eval(line, timeout)
+function console_methods:eval(line, opts)
check_remote_arg(self, 'eval')
+ local timeout
+ local on_push
+ if type(opts) == 'table' then
+ timeout = opts.timeout or TIMEOUT_INFINITY
+ on_push = opts.on_push
+ else
+ timeout = opts
+ end
local err, res
local transport = self._transport
local pr = transport.perform_request
@@ -968,10 +1027,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+ err, res = pr(timeout, nil, 'eval', on_push, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+ err, res = pr(timeout, nil, 'inject', on_push, nil, line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index af8411068..73cc9da8f 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
#include "session.h"
#include "lua/utils.h"
#include "lua/trigger.h"
+#include "lua/msgpack.h"
#include <lua.h>
#include <lauxlib.h>
@@ -41,6 +42,10 @@
#include "box/session.h"
#include "box/user.h"
#include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
+#include "fio.h"
+#include "small/obuf.h"
/** Owner of a console session. */
struct console_session_owner {
@@ -55,10 +60,26 @@ console_session_owner_dup(struct session_owner *owner);
static int
console_session_owner_fd(const struct session_owner *owner);
+/**
+ * Send "push:" prefix + message in a blocking mode, with no
+ * yields, to a console socket.
+ * @param owner Console session owner.
+ * @param sync Sync. It is unused since a text protocol has no
+ * syncs.
+ * @param port Port with text to dump.
+ *
+ * @retval -1 Memory or IO error.
+ * @retval 0 Success.
+ */
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static const struct session_owner_vtab console_session_owner_vtab = {
/* .dup = */ console_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ console_session_owner_fd,
+ /* .push = */ console_session_owner_push,
};
static struct session_owner *
@@ -90,6 +111,45 @@ console_session_owner_create(struct console_session_owner *owner, int fd)
owner->fd = fd;
}
+/**
+ * Write @a text into @a fd in a blocking mode, ignoring transient
+ * socket errors.
+ * @param fd Console descriptor.
+ * @param text Text to send.
+ * @param len Length of @a text.
+ */
+static inline int
+console_do_push(int fd, const char *text, uint32_t len)
+{
+ while (len > 0) {
+ int written = fio_write_silent(fd, text, len);
+ if (written < 0)
+ return -1;
+ assert((uint32_t) written <= len);
+ len -= written;
+ text += written;
+ }
+ return 0;
+}
+
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port)
+{
+ /* Console has no sync. */
+ (void) sync;
+ assert(owner->vtab == &console_session_owner_vtab);
+ int fd = console_session_owner_fd(owner);
+ uint32_t text_len;
+ const char *text = port_dump_raw(port, &text_len);
+ if (text == NULL ||
+ console_do_push(fd, "push:", strlen("push:")) != 0 ||
+ console_do_push(fd, text, text_len) != 0)
+ return -1;
+ else
+ return 0;
+}
+
static const char *sessionlib_name = "box.session";
/* Create session and pin it to fiber */
@@ -418,6 +478,116 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
return 3;
}
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+ const struct port_vtab *vtab;
+ /**
+ * Lua state, containing data to dump on top of the stack.
+ */
+ struct lua_State *L;
+};
+
+/**
+ * Lua push port supports two dump types: usual and raw. Raw dump
+ * encodes a message as a YAML formatted text, usual dump encodes
+ * the message as MessagePack right into an output buffer.
+ */
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out);
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+ .dump = lua_push_port_dump_msgpack,
+ /*
+ * Dump_16 has no sense, since push appears since 1.10
+ * protocol.
+ */
+ .dump_16 = NULL,
+ .dump_raw = lua_push_port_dump_text,
+ .destroy = NULL,
+};
+
+static void
+obuf_error_cb(void *ctx)
+{
+ *((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out)
+{
+ struct lua_push_port *lua_port = (struct lua_push_port *) port;
+ assert(lua_port->vtab == &lua_push_port_vtab);
+ struct mpstream stream;
+ int rc = 0;
+ /*
+ * Do not use luamp_error to allow a caller to clear the
+ * obuf, if it already has allocated something (for
+ * example, iproto push reserves memory for a header).
+ */
+ mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+ obuf_error_cb, &rc);
+ luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+ if (rc != 0)
+ return -1;
+ mpstream_flush(&stream);
+ return 0;
+}
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size)
+{
+ struct lua_push_port *lua_port = (struct lua_push_port *) port;
+ assert(lua_port->vtab == &lua_push_port_vtab);
+ lbox_console_format(lua_port->L);
+ assert(lua_isstring(lua_port->L, -1));
+ size_t len;
+ const char *result = lua_tolstring(lua_port->L, -1, &len);
+ *size = (uint32_t) len;
+ return result;
+}
+
+/**
+ * Push a message using a protocol, depending on a session type.
+ * @param data Data to push, first argument on a stack.
+ * @param opts Options. Now requires a single possible option -
+ * sync. Second argument on a stack.
+ */
+static int
+lbox_session_push(struct lua_State *L)
+{
+ if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
+usage_error:
+ return luaL_error(L, "Usage: box.session.push(data, opts)");
+ }
+ lua_getfield(L, 2, "sync");
+ if (! lua_isnumber(L, 3))
+ goto usage_error;
+ double lua_sync = lua_tonumber(L, 3);
+ lua_pop(L, 1);
+ uint64_t sync = (uint64_t) lua_sync;
+ if (lua_sync != sync)
+ goto usage_error;
+ struct lua_push_port port;
+ port.vtab = &lua_push_port_vtab;
+ port.L = L;
+ /*
+ * Pop the opts - they must not be pushed. Leave only data
+ * on a stack.
+ */
+ lua_remove(L, 2);
+ if (session_push(current_session(), sync, (struct port *) &port) != 0) {
+ return luaT_error(L);
+ } else {
+ lua_pushboolean(L, true);
+ return 1;
+ }
+}
+
/**
* Sets trigger on_access_denied.
* For test purposes only.
@@ -489,6 +659,7 @@ box_lua_session_init(struct lua_State *L)
{"on_disconnect", lbox_session_on_disconnect},
{"on_auth", lbox_session_on_auth},
{"on_access_denied", lbox_session_on_access_denied},
+ {"push", lbox_session_push},
{NULL, NULL}
};
luaL_register_module(L, sessionlib_name, sessionlib);
diff --git a/src/box/port.c b/src/box/port.c
index 03f6be79d..7f3cea5e7 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_16(struct port *port, struct obuf *out)
return port->vtab->dump_16(port, out);
}
+const char *
+port_dump_raw(struct port *port, uint32_t *size)
+{
+ return port->vtab->dump_raw(port, size);
+}
+
void
port_init(void)
{
@@ -159,5 +165,6 @@ port_free(void)
const struct port_vtab port_tuple_vtab = {
.dump = port_tuple_dump,
.dump_16 = port_tuple_dump_16,
+ .dump_raw = NULL,
.destroy = port_tuple_destroy,
};
diff --git a/src/box/port.h b/src/box/port.h
index 7cf3339b5..d1db74909 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
* format.
*/
int (*dump_16)(struct port *port, struct obuf *out);
+ /**
+ * Same as dump, but find a memory for an output buffer
+ * for itself.
+ */
+ const char *(*dump_raw)(struct port *port, uint32_t *size);
/**
* Destroy a port and release associated resources.
*/
@@ -158,6 +163,16 @@ port_dump(struct port *port, struct obuf *out);
int
port_dump_16(struct port *port, struct obuf *out);
+/**
+ * Same as port_dump(), but find a memory for an output buffer for
+ * itself.
+ * @param port Port to dump.
+ * @param[out] size Size of the data.
+ * @retval Data.
+ */
+const char *
+port_dump_raw(struct port *port, uint32_t *size);
+
void
port_init(void);
diff --git a/src/box/session.cc b/src/box/session.cc
index 908ec9c4e..793393fc8 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -63,6 +63,7 @@ static const struct session_owner_vtab generic_session_owner_vtab = {
/* .dup = */ generic_session_owner_dup,
/* .delete = */ (void (*)(struct session_owner *)) free,
/* .fd = */ generic_session_owner_fd,
+ /* .push = */ generic_session_owner_push,
};
static struct session_owner *
@@ -95,6 +96,19 @@ session_owner_create(struct session_owner *owner, enum session_type type)
owner->vtab = &generic_session_owner_vtab;
}
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port)
+{
+ (void) owner;
+ (void) sync;
+ (void) port;
+ const char *session =
+ tt_sprintf("Session '%s'", session_type_strs[owner->type]);
+ diag_set(ClientError, ER_UNSUPPORTED, session, "push()");
+ return -1;
+}
+
static inline uint64_t
sid_max()
{
diff --git a/src/box/session.h b/src/box/session.h
index 105dcab17..b5f4955d8 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -59,6 +59,7 @@ enum session_type {
extern const char *session_type_strs[];
struct session_owner_vtab;
+struct port;
/**
* Object to store session type specific data. For example, IProto
@@ -78,8 +79,18 @@ struct session_owner_vtab {
void (*free)(struct session_owner *);
/** Get the descriptor of an owner, if has. Else -1. */
int (*fd)(const struct session_owner *);
+ /** Push a port data into a session owner's channel. */
+ int (*push)(struct session_owner *, uint64_t, struct port *);
};
+/**
+ * In a common case, a session does not support push. This
+ * function always returns -1 and sets ER_UNSUPPORTED error.
+ */
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+ struct port *port);
+
static inline struct session_owner *
session_owner_dup(struct session_owner *owner)
{
@@ -142,6 +153,12 @@ session_fd(const struct session *session)
return session->owner->vtab->fd(session->owner);
}
+static inline int
+session_push(struct session *session, uint64_t sync, struct port *port)
+{
+ return session->owner->vtab->push(session->owner, sync, port);
+}
+
/**
* Find a session by id.
*/
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f48525645..f3e929f69 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,6 +43,9 @@
#include "scramble.h"
#include "iproto_constants.h"
+static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
+ "encoded IPROTO_BODY keys must fit into one byte");
+
int
xrow_header_decode(struct xrow_header *header, const char **pos,
const char *end)
@@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
uint32_t v_data_len; /* string length of array size */
};
+static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
+ IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
+
static const struct iproto_body_bin iproto_body_bin = {
0x81, IPROTO_DATA, 0xdd, 0
};
@@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
0x81, IPROTO_ERROR, 0xdb, 0
};
+struct PACKED iproto_body_push_bin {
+ uint8_t m_body; /* MP_MAP */
+ uint8_t k_data; /* IPROTO_PUSH */
+ uint8_t v_data; /* 1-size MP_ARRAY */
+};
+
+static_assert(sizeof(struct iproto_body_push_bin) + IPROTO_HEADER_LEN ==
+ IPROTO_PUSH_HEADER_LEN, "size of the prepared push");
+
+static const struct iproto_body_push_bin iproto_push_bin = {
+ 0x81, IPROTO_PUSH, 0x91
+};
+
/** Return a 4-byte numeric error code, with status flags. */
static inline uint32_t
iproto_encode_error(uint32_t error)
@@ -337,23 +356,21 @@ iproto_write_error(int fd, const struct error *e, uint32_t schema_version,
(void) write(fd, e->errmsg, msg_len);
}
-enum { SVP_SIZE = IPROTO_HEADER_LEN + sizeof(iproto_body_bin) };
-
int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size)
{
/**
* Reserve memory before taking a savepoint.
* This ensures that we get a contiguous chunk of memory
* and the savepoint is pointing at the beginning of it.
*/
- void *ptr = obuf_reserve(buf, SVP_SIZE);
+ void *ptr = obuf_reserve(buf, size);
if (ptr == NULL) {
- diag_set(OutOfMemory, SVP_SIZE, "obuf", "reserve");
+ diag_set(OutOfMemory, size, "obuf_reserve", "ptr");
return -1;
}
*svp = obuf_create_svp(buf);
- ptr = obuf_alloc(buf, SVP_SIZE);
+ ptr = obuf_alloc(buf, size);
assert(ptr != NULL);
return 0;
}
@@ -373,6 +390,17 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
}
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version)
+{
+ char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+ iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
+ obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+ memcpy(pos + IPROTO_HEADER_LEN, &iproto_push_bin,
+ sizeof(iproto_push_bin));
+}
+
int
xrow_decode_dml(struct xrow_header *row, struct request *request,
uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index d407d151b..d2ec394c5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -50,6 +50,10 @@ enum {
XROW_HEADER_LEN_MAX = 40,
XROW_BODY_LEN_MAX = 128,
IPROTO_HEADER_LEN = 28,
+ /** 7 = sizeof(iproto_body_bin). */
+ IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
+ /** 3 = sizeof(iproto_body_push_bin). */
+ IPROTO_PUSH_HEADER_LEN = IPROTO_HEADER_LEN + 3,
};
struct xrow_header {
@@ -344,8 +348,18 @@ iproto_header_encode(char *data, uint32_t type, uint64_t sync,
struct obuf;
struct obuf_svp;
+/**
+ * Reserve obuf space for a header, which depends on a response
+ * size.
+ */
int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp);
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size);
+
+static inline int
+iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+{
+ return iproto_prepare_header(buf, svp, IPROTO_SELECT_HEADER_LEN);
+}
/**
* Write select header to a preallocated buffer.
@@ -355,6 +369,16 @@ void
iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
uint32_t schema_version, uint32_t count);
+static inline int
+iproto_prepare_push(struct obuf *buf, struct obuf_svp *svp)
+{
+ return iproto_prepare_header(buf, svp, IPROTO_PUSH_HEADER_LEN);
+}
+
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+ uint32_t schema_version);
+
/**
* Encode iproto header with IPROTO_OK response code.
* @param out Encode to.
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..a813b7760 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -134,6 +134,18 @@ fio_writen(int fd, const void *buf, size_t count)
return 0;
}
+int
+fio_write_silent(int fd, const void *buf, size_t count)
+{
+ ssize_t nwr = write(fd, buf, count);
+ if (nwr >= 0)
+ return nwr;
+ if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
+ return 0;
+ else
+ return -1;
+}
+
ssize_t
fio_writev(int fd, struct iovec *iov, int iovcnt)
{
diff --git a/src/fio.h b/src/fio.h
index 12749afcb..c88e19258 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -108,6 +108,22 @@ fio_pread(int fd, void *buf, size_t count, off_t offset);
int
fio_writen(int fd, const void *buf, size_t count);
+/**
+ * Write the given buffer with no retrying for partial writes,
+ * and ignore transient write errors: EINTR, EWOULDBLOCK and
+ * EAGAIN - these errors are not logged, and the function returns
+ * 0.
+ * @param fd File descriptor to write to.
+ * @param buf Buffer to write.
+ * @param count Size of @a buf.
+ *
+ * @retval -1 Non-transient error.
+ * @retval 0 Nothing to write, or a transient error.
+ * @retval > 0 Written byte count.
+ */
+int
+fio_write_silent(int fd, const void *buf, size_t count);
+
/**
* A simple wrapper around writev().
* Re-tries write in case of EINTR.
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index 48d28bd6d..c91576dc2 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
test = tap.test("console")
-test:plan(59)
+test:plan(61)
-- Start console and connect to it
local server = console.listen(CONSOLE_SOCKET)
@@ -31,6 +31,13 @@ local handshake = client:read{chunk = 128}
test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake')
test:ok(client ~= nil, "connect to console")
+--
+-- gh-2677: box.session.push, text protocol support.
+--
+client:write('box.session.push(200, {sync = 0})\n')
+test:is(client:read(EOL):find('push:'), 1, "read pushed value")
+test:is(client:read(EOL):find('true'), 7, "read box.session.push result")
+
-- Execute some command
client:write("1\n")
test:is(yaml.decode(client:read(EOL))[1], 1, "eval")
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 46d85b327..ecf86af0d 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 87e26f84c..2c19ee479 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
return cn:_request('select', opts, space_id, index_id, iterator,
offset, limit, key)
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
diff --git a/test/box/push.result b/test/box/push.result
new file mode 100644
index 000000000..747503f59
--- /dev/null
+++ b/test/box/push.result
@@ -0,0 +1,364 @@
+--
+-- gh-2677: box.session.push.
+--
+--
+-- Usage.
+--
+box.session.push()
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100)
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100, {sync = -200})
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+---
+...
+console = require('console')
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+s = console.listen(3434)
+---
+...
+c = netbox.connect(3434, {console = true})
+---
+...
+c:eval('100')
+---
+- '---
+
+ - 100
+
+ ...
+
+'
+...
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_push(message)
+ table.insert(messages, message)
+end;
+---
+...
+function do_pushes()
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+---
+- '---
+
+ - 300
+
+ ...
+
+'
+...
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+---
+- '---
+
+ - 300
+
+ ...
+
+'
+...
+messages
+---
+- - '---
+
+ - 1
+
+ ...
+
+'
+ - '---
+
+ - 2
+
+ ...
+
+'
+ - '---
+
+ - 3
+
+ ...
+
+'
+ - '---
+
+ - 4
+
+ ...
+
+'
+ - '---
+
+ - 5
+
+ ...
+
+'
+...
+c:close()
+---
+...
+s:close()
+---
+- true
+...
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+c:ping()
+---
+- true
+...
+messages = {}
+---
+...
+c:call('do_pushes', {}, {on_push = on_push})
+---
+- 300
+...
+messages
+---
+- - 1
+ - 2
+ - 3
+ - 4
+ - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+ local sync = box.session.sync()
+ box.session.push('started dml', {sync = sync})
+ s:replace{key}
+ box.session.push('continued dml', {sync = sync})
+ s:replace{-key}
+ box.session.push('finished dml', {sync = sync})
+ return key
+end;
+---
+...
+function do_pushes(val)
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.yield()
+ end
+ return val
+end;
+---
+...
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3)
+ assert(c.messages[1] == 'started dml')
+ assert(c.messages[2] == 'continued dml')
+ assert(c.messages[3] == 'finished dml')
+ assert(s:get{c.retval})
+ assert(s:get{-c.retval})
+ else
+ assert(c.retval)
+ assert(#c.messages == 5)
+ for k, v in pairs(c.messages) do
+ assert(k == v)
+ end
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 4
+ - 4
+ - 4
+ - 4
+ - 4
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {50: [1]}
+ - {50: [2]}
+ - {50: [3]}
+ - {50: [4]}
+ - {50: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+--
+-- Ensure can not push in background.
+--
+ok = nil
+---
+...
+err = nil
+---
+...
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+---
+...
+f = fiber.create(back_push_f)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+ok, err
+---
+- false
+- Session 'background' does not support push()
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
new file mode 100644
index 000000000..02d571876
--- /dev/null
+++ b/test/box/push.test.lua
@@ -0,0 +1,163 @@
+--
+-- gh-2677: box.session.push.
+--
+
+--
+-- Usage.
+--
+box.session.push()
+box.session.push(100)
+box.session.push(100, {sync = -200})
+
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+console = require('console')
+netbox = require('net.box')
+fiber = require('fiber')
+s = console.listen(3434)
+c = netbox.connect(3434, {console = true})
+c:eval('100')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function on_push(message)
+ table.insert(messages, message)
+end;
+
+function do_pushes()
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.sleep(0.01)
+ end
+ return 300
+end;
+test_run:cmd("setopt delimiter ''");
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+messages
+
+c:close()
+s:close()
+
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+
+c = netbox.connect(box.cfg.listen)
+c:ping()
+messages = {}
+c:call('do_pushes', {}, {on_push = on_push})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+ local sync = box.session.sync()
+ box.session.push('started dml', {sync = sync})
+ s:replace{key}
+ box.session.push('continued dml', {sync = sync})
+ s:replace{-key}
+ box.session.push('finished dml', {sync = sync})
+ return key
+end;
+function do_pushes(val)
+ local sync = box.session.sync()
+ for i = 1, 5 do
+ box.session.push(i, {sync = sync})
+ fiber.yield()
+ end
+ return val
+end;
+function push_catcher_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = false}
+ catcher.retval = c:call('do_pushes', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+function dml_push_and_dml_f()
+ fiber.yield()
+ started = started + 1
+ local catcher = {messages = {}, retval = nil, is_dml = true}
+ catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = function(message)
+ table.insert(catcher.messages, message)
+ end})
+ table.insert(catchers, catcher)
+ finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+ fiber.create(dml_push_and_dml_f)
+ fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+ if c.is_dml then
+ assert(#c.messages == 3)
+ assert(c.messages[1] == 'started dml')
+ assert(c.messages[2] == 'continued dml')
+ assert(c.messages[3] == 'finished dml')
+ assert(s:get{c.retval})
+ assert(s:get{-c.retval})
+ else
+ assert(c.retval)
+ assert(#c.messages == 5)
+ for k, v in pairs(c.messages) do
+ assert(k == v)
+ end
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+
+--
+-- Ensure can not push in background.
+--
+ok = nil
+err = nil
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+f = fiber.create(back_push_f)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+ok, err
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index d561b4813..7b0f2993f 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3");
---
- true
...
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
_ = box.space.test:before_replace(function(old, new)
+ if box.session.type() == 'applier' and not push_err then
+ push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+ end
if old ~= nil and new ~= nil then
return new[2] > old[2] and new or old
end
@@ -187,6 +197,10 @@ box.space.test:select()
- [9, 90]
- [10, 100]
...
+push_err
+---
+- Session 'applier' does not support push()
+...
test_run:cmd('restart server autobootstrap3')
box.space.test:select()
---
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index 2c6912d06..04e5a2172 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new)
end
end);
test_run:cmd("switch autobootstrap3");
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
_ = box.space.test:before_replace(function(old, new)
+ if box.session.type() == 'applier' and not push_err then
+ push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+ end
if old ~= nil and new ~= nil then
return new[2] > old[2] and new or old
end
@@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2')
box.space.test:select()
test_run:cmd("switch autobootstrap3")
box.space.test:select()
+push_err
test_run:cmd('restart server autobootstrap3')
box.space.test:select()
--
2.14.3 (Apple Git-98)
^ permalink raw reply [flat|nested] 20+ messages in thread