[tarantool-patches] Re: [PATCH v2 06/10] session: introduce session vtab and meta

Vladislav Shpilevoy v.shpilevoy at tarantool.org
Thu May 24 23:50:40 MSK 2018



On 10/05/2018 22:20, Konstantin Osipov wrote:
> * Vladislav Shpilevoy <v.shpilevoy at tarantool.org> [18/04/20 16:25]:
>> +int
>> +iproto_session_fd(struct session *session)
>> +{
>> +	struct iproto_connection *conn =
>> +		(struct iproto_connection *) session->meta.conn;
>> +	return conn->output.fd;
>> +}
> 
> Nitpick: I would not abbreviate connection when in a member, and
> use 'con' as in other places for name of the variable declared on
> the stack.

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index e612ba173..cd76fc4c5 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1581,7 +1581,7 @@ tx_process_connect(struct cmsg *m)
  		con->session = session_create(SESSION_TYPE_BINARY);
  		if (con->session == NULL)
  			diag_raise();
-		con->session->meta.conn = con;
+		con->session->meta.connection = con;
  		tx_fiber_init(con->session, 0);
  		static __thread char greeting[IPROTO_GREETING_SIZE];
  		/* TODO: dirty read from tx thread */
@@ -1730,9 +1730,9 @@ net_cord_f(va_list /* ap */)
  int
  iproto_session_fd(struct session *session)
  {
-	struct iproto_connection *conn =
-		(struct iproto_connection *) session->meta.conn;
-	return conn->output.fd;
+	struct iproto_connection *con =
+		(struct iproto_connection *) session->meta.connection;
+	return con->output.fd;
  }
  
  int64_t
diff --git a/src/box/session.h b/src/box/session.h
index e583c8c6b..ff3c7b2fa 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -71,7 +71,7 @@ struct session_meta {
  		/** IProto connection meta. */
  		struct {
  			uint64_t sync;
-			void *conn;
+			void *connection;
  		};
  		/** Only by console is used. */
  		int fd;

> 
>> +/**
>> + * Push a message using a protocol, depending on a session type.
>> + * @param data Data to push, first argument on a stack.
>> + * @retval true Success.
>> + * @retval nil, error Error occured.
>> + */
>> +static int
>> +lbox_session_push(struct lua_State *L)
>> +{
>> +	if (lua_gettop(L) != 1)
>> +		return luaL_error(L, "Usage: box.session.push(data)");
>> +
>> +	if (session_push(current_session(), NULL) != 0) {
>> +		lua_pushnil(L);
>> +		luaT_pusherror(L, box_error_last());
>> +		return 2;
>> +	} else {
>> +		lua_pushboolean(L, true);
>> +		return 1;
>> +	}
>> +}
> 
> I'm not sure we the calling convention should differ from the rest
> of box API, i.e. we should return nil, error rather than
> exceptions. Could you run a poll in the community chat?

It is not a subject to run a poll for. Out convention is already established,
and it is throw on OOM, and return nil + error object on other errors.

See the new diff below:

========================================================================

commit 24eaadece9219a3eacc46798d7c5662540a645f6
Author: Vladislav Shpilevoy <v.shpilevoy at tarantool.org>
Date:   Thu Apr 19 13:43:10 2018 +0300

     session: introduce session vtab and meta
     
     box.session.push implementation depends on session type -
     console session must send YAML tagged text, binary session must
     send MessagePack via another thread, other sessions must return
     error.
     
     Add virtual table to a session with 'push', 'fd' and 'sync'
     functions.
     
     The same virtual table together with struct session meta can be
     used to use memory of struct session more effectively. Before the
     patch session stored sync and fd as attributes, but:
     * fd was duplicated for iproto, which already has fd in
       connection;
     * sync is used only by iproto, and just occupies 8 byte in other
       sessions;
     * after the #2677 session additionaly must be able to store
       iproto connection pointer.
     
     Struct session meta uses C union to store either iproto, or
     console, or another meta, but not all of them together.
     
     Part of #2677

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index fd6022305..cd76fc4c5 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1086,7 +1086,7 @@ error:
  static void
  tx_fiber_init(struct session *session, uint64_t sync)
  {
-	session->sync = sync;
+	session->meta.sync = sync;
  	/*
  	 * We do not cleanup fiber keys at the end of each request.
  	 * This does not lead to privilege escalation as long as
@@ -1113,11 +1113,6 @@ tx_process_disconnect(struct cmsg *m)
  		container_of(m, struct iproto_connection, disconnect);
  	if (con->session) {
  		tx_fiber_init(con->session, 0);
-		/*
-		 * The socket is already closed in iproto thread,
-		 * prevent box.session.peer() from using it.
-		 */
-		con->session->fd = -1;
  		if (! rlist_empty(&session_on_disconnect))
  			session_run_on_disconnect_triggers(con->session);
  		session_destroy(con->session);
@@ -1583,9 +1578,10 @@ tx_process_connect(struct cmsg *m)
  	struct iproto_connection *con = msg->connection;
  	struct obuf *out = msg->connection->tx.p_obuf;
  	try {              /* connect. */
-		con->session = session_create(con->input.fd, SESSION_TYPE_BINARY);
+		con->session = session_create(SESSION_TYPE_BINARY);
  		if (con->session == NULL)
  			diag_raise();
+		con->session->meta.connection = con;
  		tx_fiber_init(con->session, 0);
  		static __thread char greeting[IPROTO_GREETING_SIZE];
  		/* TODO: dirty read from tx thread */
@@ -1731,6 +1727,20 @@ net_cord_f(va_list /* ap */)
  	return 0;
  }
  
+int
+iproto_session_fd(struct session *session)
+{
+	struct iproto_connection *con =
+		(struct iproto_connection *) session->meta.connection;
+	return con->output.fd;
+}
+
+int64_t
+iproto_session_sync(struct session *session)
+{
+	return session->meta.sync;
+}
+
  /** Initialize the iproto subsystem and start network io thread */
  void
  iproto_init()
@@ -1743,6 +1753,12 @@ iproto_init()
  	/* Create a pipe to "net" thread. */
  	cpipe_create(&net_pipe, "net");
  	cpipe_set_max_input(&net_pipe, iproto_msg_max / 2);
+	struct session_vtab iproto_session_vtab = {
+		/* .push = */ generic_session_push,
+		/* .fd = */ iproto_session_fd,
+		/* .sync = */ iproto_session_sync,
+	};
+	session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
  }
  
  /** Available IProto configuration changes. */
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 31614b4a8..7e17fa30a 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -30,6 +30,7 @@
   */
  
  #include "box/lua/console.h"
+#include "box/session.h"
  #include "lua/utils.h"
  #include "lua/fiber.h"
  #include "fiber.h"
@@ -359,6 +360,12 @@ lbox_console_format(struct lua_State *L)
  	return lua_yaml_encode(L, luaL_yaml_default);
  }
  
+int
+console_session_fd(struct session *session)
+{
+	return session->meta.fd;
+}
+
  void
  tarantool_lua_console_init(struct lua_State *L)
  {
@@ -392,6 +399,12 @@ tarantool_lua_console_init(struct lua_State *L)
  	 * load_history work the same way.
  	 */
  	lua_setfield(L, -2, "formatter");
+	struct session_vtab console_session_vtab = {
+		/* .push = */ generic_session_push,
+		/* .fd = */ console_session_fd,
+		/* .sync = */ generic_session_sync,
+	};
+	session_vtab_registry[SESSION_TYPE_CONSOLE] = console_session_vtab;
  }
  
  /*
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index 51caf199f..05010c4c3 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -50,10 +50,10 @@ lbox_session_create(struct lua_State *L)
  {
  	struct session *session = fiber_get_session(fiber());
  	if (session == NULL) {
-		int fd = luaL_optinteger(L, 1, -1);
-		session = session_create_on_demand(fd);
+		session = session_create_on_demand();
  		if (session == NULL)
  			return luaT_error(L);
+		session->meta.fd = luaL_optinteger(L, 1, -1);
  	}
  	/* If a session already exists, simply reset its type */
  	session->type = STR2ENUM(session_type, luaL_optstring(L, 2, "console"));
@@ -96,7 +96,7 @@ lbox_session_type(struct lua_State *L)
  static int
  lbox_session_sync(struct lua_State *L)
  {
-	lua_pushnumber(L, current_session()->sync);
+	lua_pushnumber(L, session_sync(current_session()));
  	return 1;
  }
  
@@ -231,7 +231,7 @@ lbox_session_fd(struct lua_State *L)
  	struct session *session = session_find(sid);
  	if (session == NULL)
  		luaL_error(L, "session.fd(): session does not exist");
-	lua_pushinteger(L, session->fd);
+	lua_pushinteger(L, session_fd(session));
  	return 1;
  }
  
@@ -253,7 +253,7 @@ lbox_session_peer(struct lua_State *L)
  		session = current_session();
  	if (session == NULL)
  		luaL_error(L, "session.peer(): session does not exist");
-	fd = session->fd;
+	fd = session_fd(session);
  	if (fd < 0) {
  		lua_pushnil(L); /* no associated peer */
  		return 1;
@@ -355,6 +355,29 @@ lbox_push_on_access_denied_event(struct lua_State *L, void *event)
  	return 3;
  }
  
+/**
+ * Push a message using a protocol, depending on a session type.
+ * @param L Lua state. First argument on the stack is data to
+ *        push.
+ * @retval 1 Success, true is pushed.
+ * @retval 2 Error. Nil and error object are pushed.
+ */
+static int
+lbox_session_push(struct lua_State *L)
+{
+	if (lua_gettop(L) != 1)
+		return luaL_error(L, "Usage: box.session.push(data)");
+
+	if (session_push(current_session(), NULL) != 0) {
+		lua_pushnil(L);
+		luaT_pusherror(L, box_error_last());
+		return 2;
+	} else {
+		lua_pushboolean(L, true);
+		return 1;
+	}
+}
+
  /**
   * Sets trigger on_access_denied.
   * For test purposes only.
@@ -429,6 +452,7 @@ box_lua_session_init(struct lua_State *L)
  		{"on_disconnect", lbox_session_on_disconnect},
  		{"on_auth", lbox_session_on_auth},
  		{"on_access_denied", lbox_session_on_access_denied},
+		{"push", lbox_session_push},
  		{NULL, NULL}
  	};
  	luaL_register_module(L, sessionlib_name, sessionlib);
diff --git a/src/box/session.cc b/src/box/session.cc
index 3d787bd51..4a1397c24 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -45,6 +45,20 @@ const char *session_type_strs[] = {
  	"unknown",
  };
  
+static struct session_vtab generic_session_vtab = {
+	/* .push = */ generic_session_push,
+	/* .fd = */ generic_session_fd,
+	/* .sync = */ generic_session_sync,
+};
+
+struct session_vtab session_vtab_registry[] = {
+	/* BACKGROUND */ generic_session_vtab,
+	/* BINARY */ generic_session_vtab,
+	/* CONSOLE */ generic_session_vtab,
+	/* REPL */ generic_session_vtab,
+	/* APPLIER */ generic_session_vtab,
+};
+
  static struct mh_i64ptr_t *session_registry;
  
  struct mempool session_pool;
@@ -79,7 +93,7 @@ session_on_stop(struct trigger *trigger, void * /* event */)
  }
  
  struct session *
-session_create(int fd, enum session_type type)
+session_create(enum session_type type)
  {
  	struct session *session =
  		(struct session *) mempool_alloc(&session_pool);
@@ -89,8 +103,7 @@ session_create(int fd, enum session_type type)
  		return NULL;
  	}
  	session->id = sid_max();
-	session->fd =  fd;
-	session->sync = 0;
+	memset(&session->meta, 0, sizeof(session->meta));
  	session->type = type;
  	/* For on_connect triggers. */
  	credentials_init(&session->credentials, guest_user->auth_token,
@@ -110,12 +123,12 @@ session_create(int fd, enum session_type type)
  }
  
  struct session *
-session_create_on_demand(int fd)
+session_create_on_demand()
  {
  	assert(fiber_get_session(fiber()) == NULL);
  
  	/* Create session on demand */
-	struct session *s = session_create(fd, SESSION_TYPE_BACKGROUND);
+	struct session *s = session_create(SESSION_TYPE_BACKGROUND);
  	if (s == NULL)
  		return NULL;
  	s->fiber_on_stop = {
@@ -278,3 +291,27 @@ access_check_universe(user_access_t access)
  	}
  	return 0;
  }
+
+int
+generic_session_push(struct session *session, struct port *port)
+{
+	(void) port;
+	const char *name =
+		tt_sprintf("Session '%s'", session_type_strs[session->type]);
+	diag_set(ClientError, ER_UNSUPPORTED, name, "push()");
+	return -1;
+}
+
+int
+generic_session_fd(struct session *session)
+{
+	(void) session;
+	return -1;
+}
+
+int64_t
+generic_session_sync(struct session *session)
+{
+	(void) session;
+	return 0;
+}
diff --git a/src/box/session.h b/src/box/session.h
index c387e6f95..a37fbdad0 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -41,6 +41,9 @@
  extern "C" {
  #endif /* defined(__cplusplus) */
  
+struct port;
+struct session_vtab;
+
  void
  session_init();
  
@@ -58,6 +61,23 @@ enum session_type {
  
  extern const char *session_type_strs[];
  
+/**
+ * Session meta is used in different ways by sessions of different
+ * types, and allows to do not store attributes in struct session,
+ * that are used only by a session of particular type.
+ */
+struct session_meta {
+	union {
+		/** IProto connection meta. */
+		struct {
+			uint64_t sync;
+			void *connection;
+		};
+		/** Only by console is used. */
+		int fd;
+	};
+};
+
  /**
   * Abstraction of a single user session:
   * for now, only provides accounting of established
@@ -70,26 +90,48 @@ extern const char *session_type_strs[];
  struct session {
  	/** Session id. */
  	uint64_t id;
-	/** File descriptor - socket of the connected peer.
-	 * Only if the session has a peer.
-	 */
-	int fd;
-	/**
-	 * For iproto requests, we set this field
-	 * to the value of packet sync. Since the
-	 * session may be reused between many requests,
-	 * the value is true only at the beginning
-	 * of the request, and gets distorted after
-	 * the first yield.
-	 */
-	uint64_t sync;
  	enum session_type type;
+	/** Session metadata. */
+	struct session_meta meta;
  	/** Session user id and global grants */
  	struct credentials credentials;
  	/** Trigger for fiber on_stop to cleanup created on-demand session */
  	struct trigger fiber_on_stop;
  };
  
+struct session_vtab {
+	/**
+	 * Push a port data into a session data channel - socket,
+	 * console or something.
+	 * @param session Session to push into.
+	 * @param port Port with data to push.
+	 *
+	 * @retval  0 Success.
+	 * @retval -1 Error.
+	 */
+	int
+	(*push)(struct session *session, struct port *port);
+	/**
+	 * Get session file descriptor if exists.
+	 * @param session Session to get descriptor from.
+	 * @retval  -1 No fd.
+	 * @retval >=0 Found fd.
+	 */
+	int
+	(*fd)(struct session *session);
+	/**
+	 * For iproto requests, we set sync to the value of packet
+	 * sync. Since the session may be reused between many
+	 * requests, the value is true only at the beginning
+	 * of the request, and gets distorted after the first
+	 * yield. For other sessions it is 0.
+	 */
+	int64_t
+	(*sync)(struct session *session);
+};
+
+extern struct session_vtab session_vtab_registry[];
+
  /**
   * Find a session by id.
   */
@@ -150,7 +192,7 @@ extern struct credentials admin_credentials;
   * trigger to destroy it when this fiber ends.
   */
  struct session *
-session_create_on_demand(int fd);
+session_create_on_demand();
  
  /*
   * When creating a new fiber, the database (box)
@@ -167,7 +209,7 @@ current_session()
  {
  	struct session *session = fiber_get_session(fiber());
  	if (session == NULL) {
-		session = session_create_on_demand(-1);
+		session = session_create_on_demand();
  		if (session == NULL)
  			diag_raise();
  	}
@@ -187,7 +229,7 @@ effective_user()
  		(struct credentials *) fiber_get_key(fiber(),
  						     FIBER_KEY_USER);
  	if (u == NULL) {
-		session_create_on_demand(-1);
+		session_create_on_demand();
  		u = (struct credentials *) fiber_get_key(fiber(),
  							 FIBER_KEY_USER);
  	}
@@ -212,7 +254,7 @@ session_storage_cleanup(int sid);
   * trigger fails or runs out of resources.
   */
  struct session *
-session_create(int fd, enum session_type type);
+session_create(enum session_type type);
  
  /**
   * Destroy a session.
@@ -251,6 +293,39 @@ access_check_session(struct user *user);
  int
  access_check_universe(user_access_t access);
  
+static inline int
+session_push(struct session *session, struct port *port)
+{
+	return session_vtab_registry[session->type].push(session, port);
+}
+
+static inline int
+session_fd(struct session *session)
+{
+	return session_vtab_registry[session->type].fd(session);
+}
+
+static inline int
+session_sync(struct session *session)
+{
+	return session_vtab_registry[session->type].sync(session);
+}
+
+/**
+ * In a common case, a session does not support push. This
+ * function always returns -1 and sets ER_UNSUPPORTED error.
+ */
+int
+generic_session_push(struct session *session, struct port *port);
+
+/** Return -1 from any session. */
+int
+generic_session_fd(struct session *session);
+
+/** Return 0 from any session. */
+int64_t
+generic_session_sync(struct session *session);
+
  #if defined(__cplusplus)
  } /* extern "C" */
  
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index fecfa52c8..a5b3061a9 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
  
  test = tap.test("console")
  
-test:plan(59)
+test:plan(60)
  
  -- Start console and connect to it
  local server = console.listen(CONSOLE_SOCKET)
@@ -31,6 +31,12 @@ local handshake = client:read{chunk = 128}
  test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake')
  test:ok(client ~= nil, "connect to console")
  
+--
+-- gh-2677: box.session.push, text protocol support.
+--
+client:write('box.session.push(200)\n')
+test:is(client:read(EOL), "---\n- null\n- Session 'console' does not support push()\n...\n", "push does not work")
+
  -- Execute some command
  client:write("1\n")
  test:is(yaml.decode(client:read(EOL))[1], 1, "eval")
diff --git a/test/box/push.result b/test/box/push.result
new file mode 100644
index 000000000..816f06e00
--- /dev/null
+++ b/test/box/push.result
@@ -0,0 +1,70 @@
+--
+-- gh-2677: box.session.push.
+--
+--
+-- Usage.
+--
+box.session.push()
+---
+- error: 'Usage: box.session.push(data)'
+...
+box.session.push(1, 2)
+---
+- error: 'Usage: box.session.push(data)'
+...
+ok = nil
+---
+...
+err = nil
+---
+...
+function do_push() ok, err = box.session.push(1) end
+---
+...
+--
+-- Test binary protocol.
+--
+netbox = require('net.box')
+---
+...
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+c:ping()
+---
+- true
+...
+c:call('do_push')
+---
+...
+ok, err
+---
+- null
+- Session 'binary' does not support push()
+...
+c:close()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+--
+-- Ensure can not push in background.
+--
+fiber = require('fiber')
+---
+...
+f = fiber.create(do_push)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+ok, err
+---
+- null
+- Session 'background' does not support push()
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
new file mode 100644
index 000000000..a59fe0a4c
--- /dev/null
+++ b/test/box/push.test.lua
@@ -0,0 +1,35 @@
+--
+-- gh-2677: box.session.push.
+--
+
+--
+-- Usage.
+--
+box.session.push()
+box.session.push(1, 2)
+
+ok = nil
+err = nil
+function do_push() ok, err = box.session.push(1) end
+
+--
+-- Test binary protocol.
+--
+netbox = require('net.box')
+box.schema.user.grant('guest', 'read,write,execute', 'universe')
+
+c = netbox.connect(box.cfg.listen)
+c:ping()
+c:call('do_push')
+ok, err
+c:close()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+
+--
+-- Ensure can not push in background.
+--
+fiber = require('fiber')
+f = fiber.create(do_push)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+ok, err
diff --git a/test/replication/before_replace.result b/test/replication/before_replace.result
index d561b4813..9937c5769 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3");
  ---
  - true
  ...
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
  _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = box.session.push(100)
+    end
      if old ~= nil and new ~= nil then
          return new[2] > old[2] and new or old
      end
@@ -187,6 +197,10 @@ box.space.test:select()
    - [9, 90]
    - [10, 100]
  ...
+push_err
+---
+- Session 'applier' does not support push()
+...
  test_run:cmd('restart server autobootstrap3')
  box.space.test:select()
  ---
diff --git a/test/replication/before_replace.test.lua b/test/replication/before_replace.test.lua
index 2c6912d06..52ace490a 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new)
      end
  end);
  test_run:cmd("switch autobootstrap3");
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
  _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = box.session.push(100)
+    end
      if old ~= nil and new ~= nil then
          return new[2] > old[2] and new or old
      end
@@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2')
  box.space.test:select()
  test_run:cmd("switch autobootstrap3")
  box.space.test:select()
+push_err
  test_run:cmd('restart server autobootstrap3')
  box.space.test:select()
  



More information about the Tarantool-patches mailing list