[PATCH v2 06/10] session: introduce session vtab and meta

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Fri Apr 20 16:24:31 MSK 2018


box.session.push implementation depends on session type -
console session must send YAML tagged text, binary session must
send MessagePack via another thread, other sessions must return
error.

Add virtual table to a session with a single 'push' function.

The same virtual table together with struct session meta can be
used to use memory of struct session more effectively. Before the
patch session stored sync and fd as attributes, but:
* fd was duplicated for iproto, which already has fd in
  connection;
* sync is used only by iproto, and just occupies 8 byte in other
  sessions;
* after the #2677 session additionaly must be able to store
  iproto connection pointer.

Struct session meta uses C union to store either iproto, or
console, or another meta, but not all of them together.

Part of #2677
---
 src/box/iproto.cc                        |  30 +++++++--
 src/box/lua/console.c                    |  13 ++++
 src/box/lua/session.c                    |  33 ++++++++--
 src/box/session.cc                       |  47 ++++++++++++--
 src/box/session.h                        | 107 ++++++++++++++++++++++++++-----
 test/app-tap/console.test.lua            |   8 ++-
 test/box/push.result                     |  70 ++++++++++++++++++++
 test/box/push.test.lua                   |  35 ++++++++++
 test/replication/before_replace.result   |  14 ++++
 test/replication/before_replace.test.lua |  11 ++++
 10 files changed, 333 insertions(+), 35 deletions(-)
 create mode 100644 test/box/push.result
 create mode 100644 test/box/push.test.lua

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 9e809b2e5..a284368d3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1012,7 +1012,7 @@ error:
 static void
 tx_fiber_init(struct session *session, uint64_t sync)
 {
-	session->sync = sync;
+	session->meta.sync = sync;
 	/*
 	 * We do not cleanup fiber keys at the end of each request.
 	 * This does not lead to privilege escalation as long as
@@ -1039,11 +1039,6 @@ tx_process_disconnect(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	if (con->session) {
 		tx_fiber_init(con->session, 0);
-		/*
-		 * The socket is already closed in iproto thread,
-		 * prevent box.session.peer() from using it.
-		 */
-		con->session->fd = -1;
 		if (! rlist_empty(&session_on_disconnect))
 			session_run_on_disconnect_triggers(con->session);
 		session_destroy(con->session);
@@ -1504,9 +1499,10 @@ tx_process_connect(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = msg->connection->tx.p_obuf;
 	try {              /* connect. */
-		con->session = session_create(con->input.fd, SESSION_TYPE_BINARY);
+		con->session = session_create(SESSION_TYPE_BINARY);
 		if (con->session == NULL)
 			diag_raise();
+		con->session->meta.conn = con;
 		tx_fiber_init(con->session, 0);
 		static __thread char greeting[IPROTO_GREETING_SIZE];
 		/* TODO: dirty read from tx thread */
@@ -1643,6 +1639,20 @@ net_cord_f(va_list /* ap */)
 	return 0;
 }
 
+int
+iproto_session_fd(struct session *session)
+{
+	struct iproto_connection *conn =
+		(struct iproto_connection *) session->meta.conn;
+	return conn->output.fd;
+}
+
+int64_t
+iproto_session_sync(struct session *session)
+{
+	return session->meta.sync;
+}
+
 /** Initialize the iproto subsystem and start network io thread */
 void
 iproto_init()
@@ -1655,6 +1665,12 @@ iproto_init()
 	/* Create a pipe to "net" thread. */
 	cpipe_create(&net_pipe, "net");
 	cpipe_set_max_input(&net_pipe, IPROTO_MSG_MAX/2);
+	struct session_vtab iproto_session_vtab = {
+		/* .push = */ generic_session_push,
+		/* .fd = */ iproto_session_fd,
+		/* .sync = */ iproto_session_sync,
+	};
+	session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
 }
 
 /**
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index f6009d387..a3bf83cb1 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -30,6 +30,7 @@
  */
 
 #include "box/lua/console.h"
+#include "box/session.h"
 #include "lua/utils.h"
 #include "lua/fiber.h"
 #include "fiber.h"
@@ -357,6 +358,12 @@ lbox_console_format(struct lua_State *L)
 	return lua_yaml_encode(L, luaL_yaml_default);
 }
 
+int
+console_session_fd(struct session *session)
+{
+	return session->meta.fd;
+}
+
 void
 tarantool_lua_console_init(struct lua_State *L)
 {
@@ -392,6 +399,12 @@ tarantool_lua_console_init(struct lua_State *L)
 	 * console.
 	 */
 	lua_setfield(L, -2, "formatter");
+	struct session_vtab console_session_vtab = {
+		/* .push = */ generic_session_push,
+		/* .fd = */ console_session_fd,
+		/* .sync = */ generic_session_sync,
+	};
+	session_vtab_registry[SESSION_TYPE_CONSOLE] = console_session_vtab;
 }
 
 /*
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 51caf199f..5fe5f08d4 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -50,10 +50,10 @@ lbox_session_create(struct lua_State *L)
 {
 	struct session *session = fiber_get_session(fiber());
 	if (session == NULL) {
-		int fd = luaL_optinteger(L, 1, -1);
-		session = session_create_on_demand(fd);
+		session = session_create_on_demand();
 		if (session == NULL)
 			return luaT_error(L);
+		session->meta.fd = luaL_optinteger(L, 1, -1);
 	}
 	/* If a session already exists, simply reset its type */
 	session->type = STR2ENUM(session_type, luaL_optstring(L, 2, "console"));
@@ -96,7 +96,7 @@ lbox_session_type(struct lua_State *L)
 static int
 lbox_session_sync(struct lua_State *L)
 {
-	lua_pushnumber(L, current_session()->sync);
+	lua_pushnumber(L, session_sync(current_session()));
 	return 1;
 }
 
@@ -231,7 +231,7 @@ lbox_session_fd(struct lua_State *L)
 	struct session *session = session_find(sid);
 	if (session == NULL)
 		luaL_error(L, "session.fd(): session does not exist");
-	lua_pushinteger(L, session->fd);
+	lua_pushinteger(L, session_fd(session));
 	return 1;
 }
 
@@ -253,7 +253,7 @@ lbox_session_peer(struct lua_State *L)
 		session = current_session();
 	if (session == NULL)
 		luaL_error(L, "session.peer(): session does not exist");
-	fd = session->fd;
+	fd = session_fd(session);
 	if (fd < 0) {
 		lua_pushnil(L); /* no associated peer */
 		return 1;
@@ -355,6 +355,28 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
 	return 3;
 }
 
+/**
+ * Push a message using a protocol, depending on a session type.
+ * @param data Data to push, first argument on a stack.
+ * @retval true Success.
+ * @retval nil, error Error occured.
+ */
+static int
+lbox_session_push(struct lua_State *L)
+{
+	if (lua_gettop(L) != 1)
+		return luaL_error(L, "Usage: box.session.push(data)");
+
+	if (session_push(current_session(), NULL) != 0) {
+		lua_pushnil(L);
+		luaT_pusherror(L, box_error_last());
+		return 2;
+	} else {
+		lua_pushboolean(L, true);
+		return 1;
+	}
+}
+
 /**
  * Sets trigger on_access_denied.
  * For test purposes only.
@@ -429,6 +451,7 @@ box_lua_session_init(struct lua_State *L)
 		{"on_disconnect", lbox_session_on_disconnect},
 		{"on_auth", lbox_session_on_auth},
 		{"on_access_denied", lbox_session_on_access_denied},
+		{"push", lbox_session_push},
 		{NULL, NULL}
 	};
 	luaL_register_module(L, sessionlib_name, sessionlib);
diff --git a/src/box/session.cc b/src/box/session.cc
index 3d787bd51..4a1397c24 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -45,6 +45,20 @@ const char *session_type_strs[] = {
 	"unknown",
 };
 
+static struct session_vtab generic_session_vtab = {
+	/* .push = */ generic_session_push,
+	/* .fd = */ generic_session_fd,
+	/* .sync = */ generic_session_sync,
+};
+
+struct session_vtab session_vtab_registry[] = {
+	/* BACKGROUND */ generic_session_vtab,
+	/* BINARY */ generic_session_vtab,
+	/* CONSOLE */ generic_session_vtab,
+	/* REPL */ generic_session_vtab,
+	/* APPLIER */ generic_session_vtab,
+};
+
 static struct mh_i64ptr_t *session_registry;
 
 struct mempool session_pool;
@@ -79,7 +93,7 @@ session_on_stop(struct trigger *trigger, void * /* event */)
 }
 
 struct session *
-session_create(int fd, enum session_type type)
+session_create(enum session_type type)
 {
 	struct session *session =
 		(struct session *) mempool_alloc(&session_pool);
@@ -89,8 +103,7 @@ session_create(int fd, enum session_type type)
 		return NULL;
 	}
 	session->id = sid_max();
-	session->fd =  fd;
-	session->sync = 0;
+	memset(&session->meta, 0, sizeof(session->meta));
 	session->type = type;
 	/* For on_connect triggers. */
 	credentials_init(&session->credentials, guest_user->auth_token,
@@ -110,12 +123,12 @@ session_create(int fd, enum session_type type)
 }
 
 struct session *
-session_create_on_demand(int fd)
+session_create_on_demand()
 {
 	assert(fiber_get_session(fiber()) == NULL);
 
 	/* Create session on demand */
-	struct session *s = session_create(fd, SESSION_TYPE_BACKGROUND);
+	struct session *s = session_create(SESSION_TYPE_BACKGROUND);
 	if (s == NULL)
 		return NULL;
 	s->fiber_on_stop = {
@@ -278,3 +291,27 @@ access_check_universe(user_access_t access)
 	}
 	return 0;
 }
+
+int
+generic_session_push(struct session *session, struct port *port)
+{
+	(void) port;
+	const char *name =
+		tt_sprintf("Session '%s'", session_type_strs[session->type]);
+	diag_set(ClientError, ER_UNSUPPORTED, name, "push()");
+	return -1;
+}
+
+int
+generic_session_fd(struct session *session)
+{
+	(void) session;
+	return -1;
+}
+
+int64_t
+generic_session_sync(struct session *session)
+{
+	(void) session;
+	return 0;
+}
diff --git a/src/box/session.h b/src/box/session.h
index c387e6f95..e583c8c6b 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -41,6 +41,9 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+struct port;
+struct session_vtab;
+
 void
 session_init();
 
@@ -58,6 +61,23 @@ enum session_type {
 
 extern const char *session_type_strs[];
 
+/**
+ * Session meta is used in different ways by sessions of different
+ * types, and allows to do not store attributes in struct session,
+ * that are used only by a session of particular type.
+ */
+struct session_meta {
+	union {
+		/** IProto connection meta. */
+		struct {
+			uint64_t sync;
+			void *conn;
+		};
+		/** Only by console is used. */
+		int fd;
+	};
+};
+
 /**
  * Abstraction of a single user session:
  * for now, only provides accounting of established
@@ -70,26 +90,48 @@ extern const char *session_type_strs[];
 struct session {
 	/** Session id. */
 	uint64_t id;
-	/** File descriptor - socket of the connected peer.
-	 * Only if the session has a peer.
-	 */
-	int fd;
-	/**
-	 * For iproto requests, we set this field
-	 * to the value of packet sync. Since the
-	 * session may be reused between many requests,
-	 * the value is true only at the beginning
-	 * of the request, and gets distorted after
-	 * the first yield.
-	 */
-	uint64_t sync;
 	enum session_type type;
+	/** Session metadata. */
+	struct session_meta meta;
 	/** Session user id and global grants */
 	struct credentials credentials;
 	/** Trigger for fiber on_stop to cleanup created on-demand session */
 	struct trigger fiber_on_stop;
 };
 
+struct session_vtab {
+	/**
+	 * Push a port data into a session data channel - socket,
+	 * console or something.
+	 * @param session Session to push into.
+	 * @param port Port with data to push.
+	 *
+	 * @retval  0 Success.
+	 * @retval -1 Error.
+	 */
+	int
+	(*push)(struct session *session, struct port *port);
+	/**
+	 * Get session file descriptor if exists.
+	 * @param session Session to get descriptor from.
+	 * @retval  -1 No fd.
+	 * @retval >=0 Found fd.
+	 */
+	int
+	(*fd)(struct session *session);
+	/**
+	 * For iproto requests, we set sync to the value of packet
+	 * sync. Since the session may be reused between many
+	 * requests, the value is true only at the beginning
+	 * of the request, and gets distorted after the first
+	 * yield. For other sessions it is 0.
+	 */
+	int64_t
+	(*sync)(struct session *session);
+};
+
+extern struct session_vtab session_vtab_registry[];
+
 /**
  * Find a session by id.
  */
@@ -150,7 +192,7 @@ extern struct credentials admin_credentials;
  * trigger to destroy it when this fiber ends.
  */
 struct session *
-session_create_on_demand(int fd);
+session_create_on_demand();
 
 /*
  * When creating a new fiber, the database (box)
@@ -167,7 +209,7 @@ current_session()
 {
 	struct session *session = fiber_get_session(fiber());
 	if (session == NULL) {
-		session = session_create_on_demand(-1);
+		session = session_create_on_demand();
 		if (session == NULL)
 			diag_raise();
 	}
@@ -187,7 +229,7 @@ effective_user()
 		(struct credentials *) fiber_get_key(fiber(),
 						     FIBER_KEY_USER);
 	if (u == NULL) {
-		session_create_on_demand(-1);
+		session_create_on_demand();
 		u = (struct credentials *) fiber_get_key(fiber(),
 							 FIBER_KEY_USER);
 	}
@@ -212,7 +254,7 @@ session_storage_cleanup(int sid);
  * trigger fails or runs out of resources.
  */
 struct session *
-session_create(int fd, enum session_type type);
+session_create(enum session_type type);
 
 /**
  * Destroy a session.
@@ -251,6 +293,37 @@ access_check_session(struct user *user);
 int
 access_check_universe(user_access_t access);
 
+static inline int
+session_push(struct session *session, struct port *port)
+{
+	return session_vtab_registry[session->type].push(session, port);
+}
+
+static inline int
+session_fd(struct session *session)
+{
+	return session_vtab_registry[session->type].fd(session);
+}
+
+static inline int
+session_sync(struct session *session)
+{
+	return session_vtab_registry[session->type].sync(session);
+}
+
+/**
+ * In a common case, a session does not support push. This
+ * function always returns -1 and sets ER_UNSUPPORTED error.
+ */
+int
+generic_session_push(struct session *session, struct port *port);
+
+int
+generic_session_fd(struct session *session);
+
+int64_t
+generic_session_sync(struct session *session);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index 48d28bd6d..d2e88b55b 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(59)
+test:plan(60)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -31,6 +31,12 @@ local handshake = client:read{chunk = 128}
 test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake')
 test:ok(client ~= nil, "connect to console")
 
+--
+-- gh-2677: box.session.push, text protocol support.
+--
+client:write('box.session.push(200)\n')
+test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+
 -- Execute some command
 client:write("1\n")
 test:is(yaml.decode(client:read(EOL))[1], 1, "eval")
diff --git a/test/box/push.result b/test/box/push.result
new file mode 100644
index 000000000..816f06e00
--- /dev/null
+++ b/test/box/push.result
@@ -0,0 +1,70 @@
+--
+-- gh-2677: box.session.push.
+--
+--
+-- Usage.
+--
+box.session.push()
+---
+- error: 'Usage: box.session.push(data)'
+...
+box.session.push(1, 2)
+---
+- error: 'Usage: box.session.push(data)'
+...
+ok = nil
+---
+...
+err = nil
+---
+...
+function do_push() ok, err = box.session.push(1) end
+---
+...
+--
+-- Test binary protocol.
+--
+netbox = require('net.box')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+c:ping()
+---
+- true
+...
+c:call('do_push')
+---
+...
+ok, err
+---
+- null
+- Session 'binary' does not support push()
+...
+c:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+--
+-- Ensure can not push in background.
+--
+fiber = require('fiber')
+---
+...
+f = fiber.create(do_push)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+ok, err
+---
+- null
+- Session 'background' does not support push()
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
new file mode 100644
index 000000000..a59fe0a4c
--- /dev/null
+++ b/test/box/push.test.lua
@@ -0,0 +1,35 @@
+--
+-- gh-2677: box.session.push.
+--
+
+--
+-- Usage.
+--
+box.session.push()
+box.session.push(1, 2)
+
+ok = nil
+err = nil
+function do_push() ok, err = box.session.push(1) end
+
+--
+-- Test binary protocol.
+--
+netbox = require('net.box')
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+
+c = netbox.connect(box.cfg.listen)
+c:ping()
+c:call('do_push')
+ok, err
+c:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+
+--
+-- Ensure can not push in background.
+--
+fiber = require('fiber')
+f = fiber.create(do_push)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+ok, err
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index d561b4813..9937c5769 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3");
 ---
 - true
 ...
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
 _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = box.session.push(100)
+    end
     if old ~= nil and new ~= nil then
         return new[2] > old[2] and new or old
     end
@@ -187,6 +197,10 @@ box.space.test:select()
   - [9, 90]
   - [10, 100]
 ...
+push_err
+---
+- Session 'applier' does not support push()
+...
 test_run:cmd('restart server autobootstrap3')
 box.space.test:select()
 ---
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index 2c6912d06..52ace490a 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new)
     end
 end);
 test_run:cmd("switch autobootstrap3");
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
 _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = box.session.push(100)
+    end
     if old ~= nil and new ~= nil then
         return new[2] > old[2] and new or old
     end
@@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2')
 box.space.test:select()
 test_run:cmd("switch autobootstrap3")
 box.space.test:select()
+push_err
 test_run:cmd('restart server autobootstrap3')
 box.space.test:select()
 
-- 
2.15.1 (Apple Git-101)




More information about the Tarantool-patches mailing list