Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2] iproto: introduce a proxy module.
@ 2018-10-02 18:05 Serge Petrenko
  2018-10-03  8:49 ` Vladimir Davydov
                   ` (5 more replies)
  0 siblings, 6 replies; 12+ messages in thread
From: Serge Petrenko @ 2018-10-02 18:05 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

This patch introduces a proxy module which can be used to hide cluster
details from user.
A proxy runs on one of the cluster instances in iproto thread and
forwards rw requests, such as insert, replace to master and ro requests
to any suitable instance. The proxy also forwards requests to the
instance it is configured on, in case the instance is suitable for
performing such requests, and the proxy is configured accordingly.
When forwarding to a local instance, proxy doesn't use network and sends
requests to process directly to tx thread through cbus.
Proxy holds a single connection to a remote instance per every cluster
user.
By default, upon new client connection, all requests from the client are
forwarded through "guest" connection. Upon recieving an AUTH request,
proxy processes it on a local instance (this can be done, since proxy is
being run on one of the cluster instances and has access to user data).
If auth is successful, new client requests are forwarded through
corresponding user connection, otherwise, proxy keeps forwarding request
through guest connection.

Each instance connection has 2 corresponding fibers: one
writer("forwarder"), and one reader("replier"), a queue of requests
to be forwarded, a monotonically increasing sync counter, and a map of
client syncs and corresponding client connections to reply to. Upon recieving
a new request, proxy determines which instance is to process it by decoding
the request type.
Upon finding an instancea and an instance connection,
proxy adds the request and corresponding clientc onnection to send a reply
to to forwarder queue and signals the forwarder it has a new request
ready. Forwarder changes the request sync to a counter value and
remembers the old sync and the connection to reply to, once the reply is
recieved. Then forwarder sends modified request to the instance, and
proceeds to the next request.
Replier waits for input from instance connection. Upon recieving some,
it looks up the old sync value and the corresponding client connection
to write the reply to. Then it sends a reply to the client and continues
waiting for input from the instance.

Since salt proxy sends to a client differs from the salt it recieves
from a remote instance, forwarding auth requests to establish non-guest
connections is a little bit tricky:
let hash1 = sha1(password),
    hash2 = sha1(hash1)
then upon auth proxy recieves such a string from the client:
    reply = xor(hash1, sha1(proxy_salt, hash2))
proxy has to send an auth request of such form to an instance:
    request = xor(hash1, sha1(instance_salt, hash2))
proxy fetches hash2 via a special message to tx thread (again, it is
accessible, since proxy is run on one of the cluster instances).
Then proxy computes hash1 = xor(reply, sha1(proxy_salt, hash2)) and
computes the request using hash1, hash2 and instance_salt.

Part of #2625

@TarantoolBot document
Title: introduce a proxy module.

A proxy runs on one of the cluster instances in iproto thread and
forwards rw requests, such as insert, replace to master and ro requests
to any suitable instance. The proxy also forwards requests to the
instance it is configured on, in case the instance is suitable for
performing such requests, and the proxy is configured accordingly.
In such a case proxy doesn't use network and processes requests right away.

Proxy may be configured like this:
```
netbox = require("net.box")
netbox.listen(uri_to_listen, {cluster={
	{uri=uri1, is_master=false},
	{uri=uri2, is_master=true},
	...
	}})
```
---
Sorry, forgot to attach issue and branch in the last letter:
https://github.com/tarantool/tarantool/issues/2625
https://github.com/tarantool/tarantool/tree/sp/gh-2625-proxy

 src/box/box.cc          |  68 +++-
 src/box/box.h           |  13 +-
 src/box/errcode.h       |   2 +-
 src/box/iproto.cc       | 708 +++++++++++++++++++++++++++++++++++++++-
 src/box/iproto.h        |  23 ++
 src/box/lua/cfg.cc      |   2 +-
 src/box/lua/net_box.c   |  59 ++++
 src/box/lua/net_box.lua |   2 +
 src/box/xrow.c          |  46 +++
 src/box/xrow.h          |  44 +++
 src/box/xrow_io.cc      |  17 +
 src/box/xrow_io.h       |   6 +
 src/scramble.c          |  23 ++
 src/scramble.h          |   9 +
 14 files changed, 999 insertions(+), 23 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 804fc00e5..5a7f89930 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -741,13 +741,25 @@ box_set_replication_skip_conflict(void)
 }
 
 void
-box_listen(void)
+box_listen_xc(const char *uri)
 {
-	const char *uri = cfg_gets("listen");
+	if (uri == NULL)
+		uri = cfg_gets("listen");
 	box_check_uri(uri, "listen");
 	iproto_listen(uri);
 }
 
+int
+box_listen(const char *uri)
+{
+	try {
+		box_listen_xc(uri);
+	} catch (Exception * e) {
+		return -1;
+	}
+	return 0;
+}
+
 void
 box_set_log_level(void)
 {
@@ -1810,7 +1822,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
 	 * Begin listening on the socket to enable
 	 * master-master replication leader election.
 	 */
-	box_listen();
+	box_listen_xc(NULL);
 	/*
 	 * Wait for the cluster to start up.
 	 *
@@ -1881,7 +1893,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	recovery_scan(recovery, &replicaset.vclock);
 
 	if (wal_dir_lock >= 0) {
-		box_listen();
+		box_listen_xc(NULL);
 		box_sync_replication(false);
 
 		struct replica *master;
@@ -1944,7 +1956,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		 * applied in hot standby mode.
 		 */
 		vclock_copy(&replicaset.vclock, &recovery->vclock);
-		box_listen();
+		box_listen_xc(NULL);
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
@@ -1971,6 +1983,28 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
+void
+box_init_iproto()
+{
+	if (!iproto_is_configured()) {
+		/*
+		 * Since iproto may be initted in a call to netbox.listen()
+		 * before box is configured, we need to first establish
+		 * a tx endpoint to write cbus messages.
+		 */
+		/* Join the cord interconnect as "tx" endpoint. */
+		fiber_pool_create(&tx_fiber_pool, "tx",
+				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
+				  FIBER_POOL_IDLE_TIMEOUT);
+		/* Add an extra endpoint for WAL wake up/rollback messages. */
+		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
+		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
+		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+
+		iproto_init();
+	}
+}
+
 void
 box_init(void)
 {
@@ -1999,16 +2033,20 @@ box_is_configured(void)
 static inline void
 box_cfg_xc(void)
 {
-	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx",
-			  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
-			  FIBER_POOL_IDLE_TIMEOUT);
-	/* Add an extra endpoint for WAL wake up/rollback messages. */
-	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
-
-	rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
-	rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+	/*
+	 * We may have already done this in box_init_iproto()
+	 */
+	if (!iproto_is_configured()) {
+		/* Join the cord interconnect as "tx" endpoint. */
+		fiber_pool_create(&tx_fiber_pool, "tx",
+				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
+				  FIBER_POOL_IDLE_TIMEOUT);
+		/* Add an extra endpoint for WAL wake up/rollback messages. */
+		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
 
+		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
+		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+	}
 	gc_init();
 	engine_init();
 	if (module_init() != 0)
@@ -2016,7 +2054,7 @@ box_cfg_xc(void)
 	schema_init();
 	replication_init();
 	port_init();
-	iproto_init();
+	box_init_iproto();
 	wal_thread_start();
 
 	title("loading");
diff --git a/src/box/box.h b/src/box/box.h
index 9930d4a1a..cfbcc033e 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -51,6 +51,12 @@ struct obuf;
 struct ev_io;
 struct auth_request;
 
+/*
+ * Init iproto thread if it wasn't initted yet.
+ */
+void
+box_init_iproto(void);
+
 /*
  * Initialize box library
  * @throws C++ exception
@@ -155,6 +161,12 @@ const char *box_status(void);
 void
 box_reset_stat(void);
 
+void
+box_listen_xc(const char *uri);
+
+int
+box_listen(const char *uri);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
@@ -177,7 +189,6 @@ box_process_vote(struct ballot *ballot);
 void
 box_check_config();
 
-void box_listen(void);
 void box_set_replication(void);
 void box_set_log_level(void);
 void box_set_log_format(void);
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 4115e6b65..c49018c5e 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -205,7 +205,7 @@ struct errcode_record {
 	/*150 */_(ER_CANT_CREATE_COLLATION,	"Failed to initialize collation: %s.") \
 	/*151 */_(ER_WRONG_COLLATION_OPTIONS,	"Wrong collation options (field %u): %s") \
 	/*152 */_(ER_NULLABLE_PRIMARY,		"Primary index of the space '%s' can not contain nullable parts") \
-	/*153 */_(ER_UNUSED,			"") \
+	/*153 */_(ER_NO_SUCH_INSTANCE,		"No instance found to proxy request to") \
 	/*154 */_(ER_TRANSACTION_YIELD,		"Transaction has been aborted by a fiber yield") \
 	/*155 */_(ER_NO_SUCH_GROUP,		"Replication group '%s' does not exist") \
 
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3d13bad2f..0bce58a02 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -46,6 +46,7 @@
 #include "sio.h"
 #include "evio.h"
 #include "coio.h"
+#include "coio_task.h"
 #include "scoped_guard.h"
 #include "memory.h"
 #include "random.h"
@@ -56,11 +57,14 @@
 #include "tuple_convert.h"
 #include "session.h"
 #include "xrow.h"
+#include "xrow_io.h"
 #include "schema.h" /* schema_version */
 #include "replication.h" /* instance_uuid */
 #include "iproto_constants.h"
 #include "rmean.h"
 #include "errinj.h"
+#include "assoc.h"
+#include "uri.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -127,6 +131,22 @@ unsigned iproto_readahead = 16320;
 /* The maximal number of iproto messages in fly. */
 static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
 
+
+static bool is_iproto_configured = false;
+static bool is_proxy_configured = false;
+
+bool
+iproto_is_configured(void)
+{
+    return is_iproto_configured;
+}
+
+bool
+proxy_is_configured(void)
+{
+	return is_proxy_configured;
+}
+
 /**
  * How big is a buffer which needs to be shrunk before
  * it is put back into buffer cache.
@@ -154,6 +174,224 @@ iproto_reset_input(struct ibuf *ibuf)
 	}
 }
 
+struct proxy_instance {
+	bool is_ro;
+	/**
+	 * A flag indicating this instance refers to a local
+	 * instance the proxy is being run on. Needed to remember
+	 * not to establish connections to such an instance,
+	 * because all the forwarding will be done directly to tx
+	 * thread over cbus.
+	 */
+	bool is_local;
+	struct uri uri;
+
+	/** Instance's link in a list of all known instances. */
+	struct rlist link;
+	/** A map of all established connections to the instance. */
+	struct mh_strnptr_t *fat_connections;
+};
+
+struct sync_hash_entry {
+	uint64_t sync;
+	struct iproto_connection *con;
+};
+
+/** A single connection to a remote instance. */
+struct proxy_connection {
+	/** The instance we hold a connection to. */
+	struct proxy_instance *instance;
+	char * username;
+	/**
+	 * Monotonically increasing sync value for all requests
+	 * forwarded to instance through this connection.
+	 */
+	uint64_t sync_ctr;
+	/** Connection to the instance. */
+	struct {
+		struct ev_io io;
+		struct ibuf ibuf;
+	} connection;
+	/**
+	 * A map containing pairs of translated syncs plus
+	 * corresponding connections and original syncs.
+	 */
+	struct mh_i64ptr_t *sync_hash;
+	/**
+	 * A fiber recieving responses from the instance and
+	 * forwarding them back to clients.
+	 */
+	struct fiber *replier;
+	/** A fiber forwarding requests to the instance. */
+	struct fiber *forwarder;
+	/**
+	 * A cond to be signaled upon adding new entries to the request queue.
+	 */
+	struct fiber_cond forwarder_cond;
+	/*
+	 * A queue of all the requests to be forwarded through
+	 * the connection.
+	 */
+	struct rlist request_queue;
+	/** Salt recieved in instance's greeting. */
+	uint8_t salt[IPROTO_SALT_SIZE];
+};
+
+/** A single entry in proxy_connection request queue. */
+struct pqueue_entry {
+	/** Entry link in queue. */
+	struct rlist link;
+	/** The iproto connection a request was recieved from. */
+	struct iproto_connection *con;
+	/**
+	 * An iproto_msg corresponding to the request, as parsed
+	 * in iproto_msg_decode().
+	 */
+	struct iproto_msg *msg;
+};
+
+/**
+ * The local instance. Set only if proxy is run on one of the
+ * cluster nodes.
+ */
+struct proxy_instance *local_instance = NULL;
+
+/** A list of all instances known to proxy. */
+static RLIST_HEAD(proxy_instances);
+
+static struct proxy_instance *
+proxy_instance_new(bool is_ro, bool is_local, const char *uri)
+{
+	struct proxy_instance *instance = (struct proxy_instance *)
+		calloc(1, sizeof(*instance));
+	if (instance == NULL) {
+		diag_set(OutOfMemory, sizeof(*instance), "malloc",
+			 "struct proxy_instance");
+		return NULL;
+	}
+
+	instance->fat_connections = mh_strnptr_new();
+	if (instance->fat_connections == NULL) {
+		free(instance);
+		diag_set(OutOfMemory, sizeof(*instance->fat_connections), "malloc",
+			 "struct mh_strnptr_t");
+		return NULL;
+	}
+
+	instance->is_ro = is_ro;
+	instance->is_local = is_local;
+	if (is_local) {
+		/* There can be only one local instance. */
+		assert(local_instance == NULL);
+		assert(box_is_configured());
+		local_instance = instance;
+	} else if (uri_parse(&instance->uri, uri) || !instance->uri.service) {
+			free(instance);
+			diag_set(ClientError, ER_CFG, "uri",
+				 "expected host:service or /unix.socket");
+			return NULL;
+	}
+
+	rlist_add_tail(&proxy_instances, &instance->link);
+
+	return instance;
+}
+
+static void
+proxy_connection_delete(struct proxy_connection *pcon);
+
+static void
+proxy_instance_delete(struct proxy_instance *instance)
+{
+	if (instance->is_local) {
+		assert(local_instance != NULL);
+		local_instance = NULL;
+	}
+	mh_int_t i;
+	mh_foreach(instance->fat_connections, i) {
+		struct proxy_connection *pcon = (struct proxy_connection *) mh_strnptr_node(instance->fat_connections, i)->val;
+		mh_strnptr_del(instance->fat_connections, i, 0);
+		proxy_connection_delete(pcon);
+	}
+	free(instance);
+}
+
+static int
+proxy_replier_f(va_list ap);
+
+static int
+proxy_forwarder_f(va_list ap);
+
+/**
+ * Initiate a new connection to a remote instance.
+ */
+static struct proxy_connection *
+proxy_connection_new(struct proxy_instance *instance, const char *username,
+		     const char *scramble, const char *salt)
+{
+	static const char *guest = "guest";
+	if (username == NULL)
+		username = guest;
+	/* We do not establish connections to a local instance. */
+	assert(!instance->is_local && instance != local_instance);
+	struct proxy_connection *pcon = (struct proxy_connection *)
+		calloc(1, sizeof(*pcon));
+	if (pcon == NULL) {
+		diag_set(OutOfMemory, sizeof(*pcon), "malloc",
+			 "struct proxy_connection");
+		return NULL;
+	}
+	pcon->instance = instance;
+	pcon->username = (char *)malloc(strlen(username) + 1);
+	strcpy(pcon->username, username);
+	pcon->sync_ctr = 0;
+	ibuf_create(&pcon->connection.ibuf, cord_slab_cache(), iproto_readahead);
+
+	pcon->sync_hash = mh_i64ptr_new();
+	rlist_create(&pcon->request_queue);
+	fiber_cond_create(&pcon->forwarder_cond);
+
+	char name[FIBER_NAME_MAX];
+	int pos = snprintf(name, sizeof(name), "forwarder/");
+	pos += snprintf(name + pos, sizeof(name) - pos, "%s", username);
+	pos += snprintf(name + pos, sizeof(name) - pos, "@");
+	uri_format(name + pos, sizeof(name) - pos, &instance->uri, false);
+	pcon->forwarder = fiber_new(name, proxy_forwarder_f);
+	fiber_set_joinable(pcon->forwarder, true);
+	fiber_start(pcon->forwarder, pcon, scramble, salt);
+
+	uint32_t hash = mh_strn_hash(username, strlen(username));
+	struct mh_strnptr_node_t node = {username, strlen(username), hash, pcon};
+	struct mh_strnptr_node_t *ret;
+	mh_strnptr_put(instance->fat_connections, &node, &ret, NULL);
+	/*
+	 * Make sure we didn't have a connection under the same
+	 * user previously.
+	 */
+	assert(ret == NULL);
+	return pcon;
+}
+
+static void
+proxy_connection_delete(struct proxy_connection *pcon)
+{
+	free(pcon->username);
+	ibuf_destroy(&pcon->connection.ibuf);
+	fiber_cancel(pcon->forwarder);
+	fiber_cancel(pcon->replier);
+	fiber_cond_destroy(&pcon->forwarder_cond);
+	mh_int_t i;
+	mh_foreach(pcon->sync_hash, i) {
+		struct sync_hash_entry *cs = (struct sync_hash_entry *)
+					     mh_i64ptr_node(pcon->sync_hash,
+							    i)->val;
+		mh_i64ptr_del(pcon->sync_hash, i, 0);
+		free(cs);
+	}
+
+	free(pcon);
+}
+
 /* {{{ iproto_msg - declaration */
 
 /**
@@ -303,6 +541,30 @@ static const struct cmsg_hop push_route[] = {
 	{ tx_end_push, NULL }
 };
 
+struct proxy_auth_msg {
+	struct cmsg base;
+	struct iproto_msg *iproto_msg;
+	bool success;
+};
+
+/**
+ * Process an auth locally remembering whether it was successful or not.
+ */
+static void
+tx_process_proxy_auth(struct cmsg *m);
+
+/**
+ * Send a reply with auth result to the client.
+ * Also upon a successful auth remember scramble
+ * recieved from a client.
+ */
+static void
+proxy_finish_auth(struct cmsg *m);
+
+static const struct cmsg_hop proxy_auth_route[] = {
+	{ tx_process_proxy_auth, &net_pipe },
+	{ proxy_finish_auth, NULL },
+};
 
 /* }}} */
 
@@ -449,6 +711,11 @@ struct iproto_connection
 		 */
 		bool is_push_pending;
 	} tx;
+	struct {
+		char *user_name;
+		bool authenticated;
+		char scramble[SCRAMBLE_SIZE];
+	} proxy;
 	/** Authentication salt. */
 	char salt[IPROTO_SALT_SIZE];
 };
@@ -672,6 +939,12 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
+static struct proxy_instance *
+proxy_find_instance(struct iproto_msg *msg);
+
+static struct proxy_connection *
+proxy_find_connection(struct proxy_instance *instance, const char *username);
+
 /**
  * Enqueue all requests which were read up. If a request limit is
  * reached - stop the connection input even if not the whole batch
@@ -734,16 +1007,63 @@ err_msgpack:
 		msg->len = reqend - reqstart; /* total request length */
 
 		iproto_msg_decode(msg, &pos, reqend, &stop_input);
-		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
-		 */
-		cpipe_push_input(&tx_pipe, &msg->base);
+
 		n_requests++;
 		/* Request is parsed */
 		assert(reqend > reqstart);
 		assert(con->parse_size >= (size_t) (reqend - reqstart));
 		con->parse_size -= reqend - reqstart;
+
+		if (!is_proxy_configured) {
+			/*
+			 * This can't throw, but should not be
+			 * done in case of exception.
+			 */
+process_locally:
+			cpipe_push_input(&tx_pipe, &msg->base);
+			continue;
+		}
+
+		struct proxy_instance *instance = proxy_find_instance(msg);
+		if (instance == NULL) {
+			diag_log();
+			continue;
+		}
+		if (instance == local_instance) {
+			if (msg->header.type != IPROTO_AUTH)
+				goto process_locally;
+			struct proxy_auth_msg *m = (struct proxy_auth_msg *)malloc(sizeof(*m));
+			m->iproto_msg = msg;
+			cmsg_init(&m->base, proxy_auth_route);
+			cpipe_push_input(&tx_pipe, &m->base);
+			continue;
+		}
+		const char *username = NULL;
+		if (con->proxy.authenticated) {
+			username = con->proxy.user_name;
+		}
+		struct proxy_connection *pcon = proxy_find_connection(instance,
+								      username);
+
+		if (pcon == NULL) {
+			pcon = proxy_connection_new(instance, username,
+						    con->proxy.scramble,
+						    con->salt);
+			if (pcon == NULL)
+				diag_raise();
+		}
+
+		struct pqueue_entry *ent = (struct pqueue_entry *)
+					   calloc(1, sizeof(*ent));
+		if (ent == NULL) {
+			diag_set(OutOfMemory, sizeof(*ent), "malloc",
+				 "struct pqueue_entry");
+			diag_raise();
+		}
+		ent->msg = msg;
+		ent->con = con;
+		rlist_add_tail(&pcon->request_queue, &ent->link);
+		fiber_cond_signal(&pcon->forwarder_cond);
 	}
 	if (stop_input) {
 		/**
@@ -996,6 +1316,8 @@ iproto_connection_new(int fd)
 	con->is_disconnected = false;
 	con->tx.is_push_pending = false;
 	con->tx.is_push_sent = false;
+	con->proxy.user_name = NULL;
+	con->proxy.authenticated = false;
 	return con;
 }
 
@@ -1932,6 +2254,7 @@ iproto_init()
 		/* .sync = */ iproto_session_sync,
 	};
 	session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
+	is_iproto_configured = true;
 }
 
 /** Available iproto configuration changes. */
@@ -2041,3 +2364,378 @@ iproto_set_msg_max(int new_iproto_msg_max)
 	iproto_do_cfg(&cfg_msg);
 	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
 }
+
+/**
+ * Search the proxie's instance list to find the one
+ * instance capable of processing the request in msg->header.
+ */
+static struct proxy_instance *
+proxy_find_instance(struct iproto_msg *msg)
+{
+	bool request_is_ro;
+	uint8_t type = msg->header.type;
+
+	switch (type) {
+	case IPROTO_SELECT:
+	case IPROTO_AUTH:
+		request_is_ro = true;
+		break;
+	case IPROTO_INSERT:
+	case IPROTO_REPLACE:
+	case IPROTO_UPDATE:
+	case IPROTO_DELETE:
+	case IPROTO_UPSERT:
+	case IPROTO_EVAL:
+	case IPROTO_CALL:
+	case IPROTO_CALL_16:
+		request_is_ro = false;
+		break;
+	default:
+		/*
+		 * ping, join, subscribe,
+		 * vote, vote_deprecated go here
+		 */
+		if (local_instance != NULL)
+			return local_instance;
+		else
+			request_is_ro = false;
+	}
+
+	if (local_instance != NULL && (!local_instance->is_ro || request_is_ro))
+		return local_instance;
+
+	struct proxy_instance *instance;
+	rlist_foreach_entry(instance, &proxy_instances, link) {
+		/* Rules can be more complex in future. */
+		if (!instance->is_ro || request_is_ro) {
+			return instance;
+		}
+	}
+	diag_set(ClientError, ER_NO_SUCH_INSTANCE);
+	return NULL;
+}
+
+/**
+ * Find an established connection authenticated as specified user.
+ */
+static struct proxy_connection *
+proxy_find_connection(struct proxy_instance *instance, const char *username)
+{
+	static const char *guest = "guest";
+	if (username == NULL)
+		username = guest;
+	uint32_t hash = mh_strn_hash(username, strlen(username));
+	struct mh_strnptr_key_t key = {username, strlen(username), hash};
+	mh_int_t i = mh_strnptr_find(instance->fat_connections, &key, NULL);
+	if (i != mh_end(instance->fat_connections)) {
+		struct mh_strnptr_node_t *node = mh_strnptr_node(instance->fat_connections, i);
+		return (struct proxy_connection *)node->val;
+	} else
+		return NULL;
+}
+
+static void
+tx_process_proxy_auth(struct cmsg *m)
+{
+	struct proxy_auth_msg *msg = (struct proxy_auth_msg *)m;
+	struct iproto_connection *con = msg->iproto_msg->connection;
+	try {
+		box_process_auth(&msg->iproto_msg->auth, con->salt);
+	} catch (Exception *e) {
+		msg->success = false;
+		tx_reply_error(msg->iproto_msg);
+		return;
+	}
+	msg->success = true;
+
+	struct obuf *out = con->tx.p_obuf;
+	iproto_reply_ok_xc(out, msg->iproto_msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->iproto_msg->wpos, out);
+}
+
+static void
+proxy_finish_auth(struct cmsg *m)
+{
+	struct proxy_auth_msg *msg = (struct proxy_auth_msg *)m;
+	struct iproto_connection *con = msg->iproto_msg->connection;
+
+	con->proxy.authenticated = msg->success;
+
+	if(msg->success) {
+		const char *user_name = msg->iproto_msg->auth.user_name;
+		const char *scramble = msg->iproto_msg->auth.scramble;
+		uint32_t username_len = mp_decode_strl(&user_name);
+		con->proxy.user_name = (char *)malloc(1 + username_len);
+		strncpy(con->proxy.user_name, user_name, username_len);
+		uint32_t part_count = mp_decode_array(&scramble);
+		/* These conditions were checked in box_process_auth() */
+		assert(part_count == 0 && strcmp(con->proxy.user_name, "guest") == 0 ||
+		       part_count == 2);
+		if (part_count < 2) {
+			memset(con->proxy.scramble, 0, SCRAMBLE_SIZE);
+		} else {
+			mp_next(&scramble);
+			uint32_t scramble_len;
+			if (mp_typeof(*scramble) == MP_STR) {
+				scramble = mp_decode_str(&scramble, &scramble_len);
+			} else {
+				/* type is MP_BIN, as checked in authenticate() */
+				scramble = mp_decode_bin(&scramble, &scramble_len);
+			}
+			assert(scramble_len == SCRAMBLE_SIZE);
+			strncpy(con->proxy.scramble, scramble,
+				scramble_len);
+		}
+	}
+
+	net_send_msg(&msg->iproto_msg->base);
+}
+
+/**
+ * Forward the request specified in msg->header
+ * to a specified instance connection.
+ */
+static void
+proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg,
+			struct proxy_connection *pcon)
+{
+	struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs));
+	cs->sync = msg->header.sync;
+	cs->con = con;
+	msg->header.sync = ++pcon->sync_ctr;
+	struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs};
+	if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) ==
+	    mh_end(pcon->sync_hash)) {
+		diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash");
+		diag_raise();
+	}
+	coio_write_xrow(&pcon->connection.io, &msg->header);
+
+	/*
+	 * After forwarding the request, mark it as read and
+	 * delete the msg.
+	 */
+	msg->p_ibuf->rpos += msg->len;
+	iproto_msg_delete(msg);
+}
+
+/**
+ * A message used to fetch user's hash2 from tx thread.
+ */
+struct proxy_hash2_msg {
+	struct cbus_call_msg base;
+	char *username;
+	size_t username_len;
+	char hash2[SCRAMBLE_SIZE];
+};
+
+static int
+proxy_get_hash2(struct cbus_call_msg *msg)
+{
+	struct proxy_hash2_msg *m = (struct proxy_hash2_msg *)msg;
+	struct user *user = user_find_by_name(m->username, m->username_len);
+	if (user == NULL)
+		return -1;
+	strncpy(m->hash2, user->def->hash2, SCRAMBLE_SIZE);
+	return 0;
+}
+
+/**
+ * Given a proxy_connection, establish an actual connection to a
+ * remote instance: connect and authenticate.
+ */
+static int
+proxy_establish_connection(struct proxy_connection *pcon, const char *scramble,
+			   const char *salt)
+{
+	coio_create(&pcon->connection.io, -1);
+	coio_connect(&pcon->connection.io, &pcon->instance->uri, NULL, NULL);
+
+	char greetingbuf[IPROTO_GREETING_SIZE];
+	struct greeting greeting;
+	coio_readn(&pcon->connection.io, greetingbuf, IPROTO_GREETING_SIZE);
+	if (greeting_decode(greetingbuf, &greeting) != 0) {
+		tnt_raise(LoggedError, ER_PROTOCOL, "Invalid greeting");
+	}
+	memcpy(pcon->salt, greeting.salt, IPROTO_SALT_SIZE);
+	if (strcmp(pcon->username, "guest")) {
+		size_t username_len = strlen(pcon->username);
+		struct xrow_header row;
+		memset(&row, 0, sizeof(row));
+		if (scramble != NULL) {
+			struct proxy_hash2_msg *msg = (struct proxy_hash2_msg *)
+			    calloc(1, sizeof(*msg));
+			msg->username = pcon->username;
+			msg->username_len = username_len;
+			if (cbus_call(&tx_pipe, &net_pipe, &msg->base,
+				      proxy_get_hash2, NULL, TIMEOUT_INFINITY) != 0)
+				diag_raise();
+
+			xrow_reencode_auth_xc(&row, salt, IPROTO_SALT_SIZE,
+					      greeting.salt,
+					      greeting.salt_len,  pcon->username,
+					      username_len,
+					      scramble, msg->hash2);
+			free(msg);
+		} else {
+			xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len,
+					    pcon->username, username_len, scramble, 0);
+		}
+
+		coio_write_xrow(&pcon->connection.io, &row);
+		coio_read_xrow(&pcon->connection.io, &pcon->connection.ibuf, &row);
+		if (row.type != IPROTO_OK) {
+			xrow_decode_error_xc(&row);
+		}
+	}
+	return 0;
+}
+
+static int
+proxy_forwarder_f(va_list ap)
+{
+	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
+	const char *scramble = va_arg(ap, char *);
+	const char *salt = va_arg(ap, char *);
+
+	proxy_establish_connection(pcon, scramble, salt);
+
+	char name[FIBER_NAME_MAX];
+	int pos = snprintf(name, sizeof(name), "replier/");
+	pos += snprintf(name + pos, sizeof(name) - pos, "@");
+	uri_format(name + pos, sizeof(name) - pos, &pcon->instance->uri, false);
+	pcon->replier = fiber_new(name, proxy_replier_f);
+	fiber_set_joinable(pcon->replier, true);
+	fiber_start(pcon->replier, pcon);
+
+	while (!fiber_is_cancelled()) {
+		struct rlist *head;
+		while ((head = rlist_first(&pcon->request_queue)) ==
+		       &pcon->request_queue) {
+			fiber_cond_wait(&pcon->forwarder_cond);
+		}
+
+		struct pqueue_entry *ent = rlist_shift_entry(&pcon->request_queue,
+							     struct pqueue_entry,
+							     link);
+		struct iproto_connection *con = ent->con;
+		struct iproto_msg * msg = ent->msg;
+		free(ent);
+
+		proxy_forward_request(con, msg, pcon);
+
+		fiber_gc();
+	}
+	return 0;
+}
+
+/**
+ * Delete all the instances from instance list.
+ */
+static void
+proxy_instance_list_purge()
+{
+	while (!rlist_empty(&proxy_instances)) {
+		struct proxy_instance *instance =
+		    rlist_shift_entry(&proxy_instances, struct proxy_instance, link);
+		proxy_instance_delete(instance);
+	}
+}
+
+/**
+ * A message used to pass proxy configuration parameters to iproto thread.
+ */
+struct proxy_cfg_msg: public cbus_call_msg
+{
+	struct proxy_cfg *config;
+};
+
+static int
+proxy_configure_f(struct cbus_call_msg *m)
+{
+	struct proxy_cfg_msg *msg = (struct proxy_cfg_msg *)m;
+	/*
+	 * If it is the second call to proxy_configure,
+	 * first remove all the old instances and close
+	 * connections, and do not init coio again.
+	 */
+	if (is_proxy_configured) {
+		proxy_instance_list_purge();
+	} else {
+		coio_init();
+		coio_enable();
+	}
+	for(size_t i = 0; msg->config[i].uri != NULL; ++i) {
+		/* Will do for now??? */
+		bool is_ro = !msg->config[i].is_master;
+		bool is_local = msg->config[i].is_local;
+		struct proxy_instance *instance =
+			proxy_instance_new(is_ro, is_local, msg->config[i].uri);
+		if (instance == NULL) {
+			return -1;
+		}
+		/*
+		 * Init a guest connection to an instance:
+		 * It will be the first connection in instance's
+		 * fat connecitons list.
+		 */
+		if (!is_local) {
+			struct proxy_connection *con =
+				proxy_connection_new(instance, NULL, NULL, NULL);
+			if (con == NULL) {
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
+
+void
+proxy_configure(struct proxy_cfg *config)
+{
+	struct proxy_cfg_msg msg;
+	msg.config = config;
+	int rc = cbus_call(&net_pipe, &tx_pipe, &msg, proxy_configure_f, NULL,
+			   TIMEOUT_INFINITY);
+	free(config);
+	if (rc != 0)
+		diag_raise();
+	is_proxy_configured = true;
+}
+
+static int
+proxy_replier_f(va_list ap)
+{
+	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
+
+	struct ev_io io;
+	coio_create(&io, pcon->connection.io.fd);
+
+	while (!fiber_is_cancelled()) {
+		struct xrow_header row;
+		coio_read_xrow(&io, &pcon->connection.ibuf, &row);
+		uint64_t key = row.sync;
+		mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL);
+		if (i == mh_end(pcon->sync_hash)) {
+			/*
+			 * Some error. We recieved a reply with sync
+			 * not corresponding to any connection
+			 */
+			say_warn("sync recieved from remote instance is not in sync_hash");
+			continue;
+		}
+		struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i);
+		if (row.type != IPROTO_CHUNK) {
+			mh_i64ptr_remove(pcon->sync_hash, node, NULL);
+		}
+		struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val;
+		struct iproto_connection *con = cs->con;
+		uint64_t sync = cs->sync;
+		row.sync = sync;
+		free(cs);
+		coio_write_iproto_response(&con->output, &row);
+
+		fiber_gc();
+	}
+	return 0;
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index b9a6cf8f7..6bb76ae2f 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -56,6 +56,12 @@ enum {
 
 extern unsigned iproto_readahead;
 
+bool
+iproto_is_configured(void);
+
+bool
+proxy_is_configured(void);
+
 /**
  * Return size of memory used for storing network buffers.
  */
@@ -68,6 +74,23 @@ iproto_mem_used(void);
 void
 iproto_reset_stat(void);
 
+struct proxy_cfg {
+	const char *uri;
+	bool is_master;
+	/*
+	 * A flag set to true for the instance
+	 * proxy is being run on.
+	 */
+	bool is_local;
+};
+
+/*
+ * Configure proxy. Initiate guest connections to instances from
+ * config and start proxying requests.
+ */
+void
+proxy_configure(struct proxy_cfg *config);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index c3825591c..19f6ec0f6 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -69,7 +69,7 @@ static int
 lbox_cfg_set_listen(struct lua_State *L)
 {
 	try {
-		box_listen();
+		box_listen_xc(NULL);
 	} catch (Exception *) {
 		luaT_error(L);
 	}
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index ad47f724b..36b24716a 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -35,10 +35,12 @@
 #include <msgpuck.h> /* mp_store_u32() */
 #include "scramble.h"
 
+#include "box/box.h" /* box_listen() */
 #include "box/iproto_constants.h"
 #include "box/lua/tuple.h" /* luamp_convert_tuple() / luamp_convert_key() */
 #include "box/xrow.h"
 #include "box/tuple.h"
+#include "box/iproto.h" /* proxy_configure() / struct proxy_cfg */
 
 #include "lua/msgpack.h"
 #include "third_party/base64.h"
@@ -593,6 +595,62 @@ netbox_decode_body(struct lua_State *L)
 	return 2;
 }
 
+static int
+netbox_listen(struct lua_State *L)
+{
+	int opts_count = lua_gettop(L);
+	if (opts_count == 0 || (opts_count == 1 && !box_is_configured()))
+		return luaL_error(L, "Usage: netbox.listen(uri, {cluster="\
+				     "{{uri=uri1, is_master1=true/false}, ...}})");
+
+	const char *uri = lua_tostring(L, 1);
+	box_init_iproto();
+
+	if (opts_count > 2) {
+		lua_settop(L, 2);
+	} else if (opts_count == 1) {
+		if(box_listen(uri) == -1)
+			luaT_error(L);
+		return 0;
+	}
+	luaL_checktype(L, 2, LUA_TTABLE);
+	lua_getfield(L, 2, "cluster");
+	luaL_checktype(L, -1, LUA_TTABLE);
+	int opts_len = lua_objlen(L, -1);
+	for(int i = 1; i <= opts_len; ++i) {
+		lua_rawgeti(L, 3, i);
+		luaL_checktype(L, -1, LUA_TTABLE);
+		lua_getfield(L, -1, "uri");
+		lua_getfield(L, -2, "is_master");
+	}
+	struct proxy_cfg *config = malloc((opts_len + 1) * sizeof(*config));
+	if (config == NULL) {
+		diag_set(OutOfMemory, (opts_len + 1) * sizeof(*config),
+			 "malloc", "struct proxy_cfg");
+		luaT_error(L);
+	}
+	size_t pos = 0;
+	for(int i = - opts_len * 3; i < 0; i += 3) {
+		config[pos].uri = lua_tostring(L, i + 1);
+		config[pos].is_master = lua_toboolean(L, i + 2);
+		config[pos].is_local = strcmp(uri, config[pos].uri) == 0;
+		if (config[pos].is_local && !box_is_configured()) {
+			free(config);
+			return luaL_error(L, "You cannot proxy to local instance "\
+					     "before box is configured");
+		}
+		++pos;
+	}
+	config[pos].uri = NULL;
+
+	if(box_listen(uri) == -1)
+		luaT_error(L);
+
+	proxy_configure(config);
+
+	return 0;
+}
+
 int
 luaopen_net_box(struct lua_State *L)
 {
@@ -611,6 +669,7 @@ luaopen_net_box(struct lua_State *L)
 		{ "decode_greeting",netbox_decode_greeting },
 		{ "communicate",    netbox_communicate },
 		{ "decode_body",    netbox_decode_body },
+		{ "listen",	    netbox_listen },
 		{ NULL, NULL}
 	};
 	/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index fd6ebf9de..b1d220dde 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,7 @@ local communicate     = internal.communicate
 local encode_auth     = internal.encode_auth
 local encode_select   = internal.encode_select
 local decode_greeting = internal.decode_greeting
+local listen          = internal.listen
 
 local TIMEOUT_INFINITY = 500 * 365 * 86400
 local VSPACE_ID        = 281
@@ -1422,6 +1423,7 @@ local this_module = {
     new = connect, -- Tarantool < 1.7.1 compatibility,
     wrap = wrap,
     establish_connection = establish_connection,
+    listen = listen,
 }
 
 function this_module.timeout(timeout, ...)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 4473acfe3..2f7742138 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -275,6 +275,16 @@ iproto_header_encode(char *out, uint32_t type, uint64_t sync,
 	memcpy(out, &header, sizeof(header));
 }
 
+void
+iproto_header_to_iovec(const struct xrow_header *header, struct iovec *out,
+		       uint32_t bodylen)
+{
+	out->iov_base = region_alloc(&fiber()->gc, IPROTO_HEADER_LEN);
+	iproto_header_encode((char *)out->iov_base, header->type, header->sync,
+			     header->schema_version, bodylen);
+	out->iov_len = IPROTO_HEADER_LEN;
+}
+
 struct PACKED iproto_body_bin {
 	uint8_t m_body;                    /* MP_MAP */
 	uint8_t k_data;                    /* IPROTO_DATA or IPROTO_ERROR */
@@ -852,6 +862,42 @@ xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
 	return 0;
 }
 
+int
+xrow_reencode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2)
+{
+	size_t buf_size = XROW_BODY_LEN_MAX + login_len + SCRAMBLE_SIZE;
+	char *buf = (char *) region_alloc(&fiber()->gc, buf_size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, buf_size, "region_alloc", "buf");
+		return -1;
+	}
+
+	char *d = buf;
+	d = mp_encode_map(d, 2);
+	d = mp_encode_uint(d, IPROTO_USER_NAME);
+	d = mp_encode_str(d, login, login_len);
+	assert(salt_len >= SCRAMBLE_SIZE);
+	assert(msalt_len >= SCRAMBLE_SIZE);
+	(void)salt_len;
+	(void)msalt_len;
+
+	char new_scramble[SCRAMBLE_SIZE];
+	scramble_reencode(new_scramble, scramble, salt, msalt, hash2);
+	d = mp_encode_uint(d, IPROTO_TUPLE);
+	d = mp_encode_array(d, 2);
+	d = mp_encode_str(d, "chap-sha1", strlen("chap-sha1"));
+	d = mp_encode_str(d, new_scramble, SCRAMBLE_SIZE);
+
+	assert(d <= buf + buf_size);
+	packet->body[0].iov_base = buf;
+	packet->body[0].iov_len = (d - buf);
+	packet->bodycnt = 1;
+	packet->type = IPROTO_AUTH;
+	return 0;
+}
+
 void
 xrow_decode_error(struct xrow_header *row)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8d..b70f16038 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -226,6 +226,27 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len,
 		 const char *login, size_t login_len, const char *password,
 		 size_t password_len);
 
+/**
+ * Reencode an AUTH request recieved from a client
+ * to send it to another instance.
+ * @param[out] packet.
+ * @param salt Salt a proxy sent to client.
+ * @param salt_len length of @salt.
+ * @param msalt Salt a proxy recieved from an instance.
+ * @param msalt_len length of @msalt.
+ * @param login User login.
+ * @param login_len Length of @login.
+ * @param scramble Scramble recieved from client.
+ * @param hash2 hash2 of the user, i.e. sha1(sha1(password))
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+xrow_reencode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2);
+
 /** Reply to IPROTO_VOTE request. */
 struct ballot {
 	/** Set if the instance is running in read-only mode. */
@@ -515,6 +536,18 @@ greeting_decode(const char *greetingbuf, struct greeting *greeting);
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out);
 
+/**
+ * Create an iproto_header based on values from @row and encode it
+ * to specified iovec.
+ *
+ * @param row Row to take values from.
+ * @param[out] out Encoded iproto_header basing on @row values.
+ * @param bodylen Lengh of row body to be encoded in header.
+ */
+void
+iproto_header_to_iovec(const struct xrow_header *row, struct iovec *out,
+		       uint32_t bodylen);
+
 /**
  * Decode ERROR and set it to diagnostics area.
  * @param row Encoded error.
@@ -631,6 +664,17 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len,
 		diag_raise();
 }
 
+/** @copydoc xrow_reencode_auth. */
+static inline void
+xrow_reencode_auth_xc(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2)
+{
+	if (xrow_reencode_auth(packet, salt, salt_len, msalt, msalt_len, login,
+			       login_len, scramble, hash2) != 0)
+		diag_raise();
+}
+
 /** @copydoc xrow_decode_ballot. */
 static inline void
 xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot)
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index e9ee6b0c8..48fe6dc11 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -101,3 +101,20 @@ coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
 	coio_writev(coio, iov, iovcnt, 0);
 }
 
+void
+coio_write_iproto_response(struct ev_io *coio, const struct xrow_header *row)
+{
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = 0;
+	uint32_t bodylen = row->body[0].iov_len;
+	for (int i = 1; i < row->bodycnt; ++i) {
+		bodylen += row->body[i].iov_len;
+	}
+	iproto_header_to_iovec(row, iov + iovcnt++, bodylen);
+	assert(row->bodycnt + 1 <= XROW_IOVMAX);
+	for (int i = 0; i < row->bodycnt; ++i) {
+		iov[i+1].iov_base = row->body[i].iov_base;
+		iov[i+1].iov_len = row->body[i].iov_len;
+	}
+	coio_writev(coio, iov, row->bodycnt + 1, 0);
+}
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..c199e838e 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -48,6 +48,12 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
+/**
+ * Just like coio_write_xrow, but also write schema_version from
+ * row in the response header.
+ */
+void
+coio_write_iproto_response(struct ev_io *coio, const struct xrow_header *row);
 
 #if defined(__cplusplus)
 } /* extern "C" */
diff --git a/src/scramble.c b/src/scramble.c
index ca1f98793..6dc43932b 100644
--- a/src/scramble.c
+++ b/src/scramble.c
@@ -67,6 +67,29 @@ scramble_prepare(void *out, const void *salt, const void *password,
 	xor(out, hash1, out, SCRAMBLE_SIZE);
 }
 
+void
+scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
+		  const void *hash2)
+{
+	unsigned char hash1[SCRAMBLE_SIZE];
+	unsigned char sh[SCRAMBLE_SIZE];
+	SHA1_CTX ctx;
+
+	SHA1Init(&ctx);
+	SHA1Update(&ctx, salt, SCRAMBLE_SIZE);
+	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
+	SHA1Final(sh, &ctx);
+
+	xor(hash1, in, sh, SCRAMBLE_SIZE);
+
+	SHA1Init(&ctx);
+	SHA1Update(&ctx, msalt, SCRAMBLE_SIZE);
+	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
+	SHA1Final(out, &ctx);
+
+	xor(out, hash1, out, SCRAMBLE_SIZE);
+}
+
 int
 scramble_check(const void *scramble, const void *salt, const void *hash2)
 {
diff --git a/src/scramble.h b/src/scramble.h
index 7dee31483..2ca975bd7 100644
--- a/src/scramble.h
+++ b/src/scramble.h
@@ -89,6 +89,15 @@ scramble_check(const void *scramble, const void *salt, const void *hash2);
 void
 password_prepare(const char *password, int len, char *out, int out_len);
 
+/**
+ * Given a scrambble recieved from a client, salt sent to client,
+ * salt recieved from another instance and user hash2, recalculate
+ * a scramble to be sent to a remote instance for authentication.
+ */
+void
+scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
+		  const void *hash2);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif
-- 
2.17.1 (Apple Git-112)

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
@ 2018-10-03  8:49 ` Vladimir Davydov
  2018-10-04 11:54 ` [tarantool-patches] " Georgy Kirichenko
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 12+ messages in thread
From: Vladimir Davydov @ 2018-10-03  8:49 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches

On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> ---
> Sorry, forgot to attach issue and branch in the last letter:
> https://github.com/tarantool/tarantool/issues/2625
> https://github.com/tarantool/tarantool/tree/sp/gh-2625-proxy

Please don't send v2 in case the patch doesn't change at all or changes
are negligible. Instead reply to the original email.

> 
>  src/box/box.cc          |  68 +++-
>  src/box/box.h           |  13 +-
>  src/box/errcode.h       |   2 +-
>  src/box/iproto.cc       | 708 +++++++++++++++++++++++++++++++++++++++-
>  src/box/iproto.h        |  23 ++
>  src/box/lua/cfg.cc      |   2 +-
>  src/box/lua/net_box.c   |  59 ++++
>  src/box/lua/net_box.lua |   2 +
>  src/box/xrow.c          |  46 +++
>  src/box/xrow.h          |  44 +++
>  src/box/xrow_io.cc      |  17 +
>  src/box/xrow_io.h       |   6 +
>  src/scramble.c          |  23 ++
>  src/scramble.h          |   9 +
>  14 files changed, 999 insertions(+), 23 deletions(-)

I haven't looked through the patch yet, but I can see from the diff that
a test is missing. Please add.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
  2018-10-03  8:49 ` Vladimir Davydov
@ 2018-10-04 11:54 ` Georgy Kirichenko
  2018-10-08 10:44 ` [tarantool-patches] " Vladimir Davydov
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 12+ messages in thread
From: Georgy Kirichenko @ 2018-10-04 11:54 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

[-- Attachment #1: Type: text/plain, Size: 1861 bytes --]

Hi and thank you for the patch! Overall it looks good but I have some 
comments.
 - I would like if you created a dedicated fiber pool and cbus endpoint to
   perform IPROTO<->TX interconnection. In that case iproto and tx
   initialization could be more clearly splitted.
 - I think we could have only one hash table to locate a corresponding
   connection with "username@uri" instead of two-level routing.
 - It should be fine to introduce some error injections.
 - If you allocate and free a lots of same space objects then mempool
   is better option than (m/c)alloc/free

Also I have some comments inline.
> +
> +	instance->is_ro = is_ro;
> +	instance->is_local = is_local;
> +	if (is_local) {
> +		/* There can be only one local instance. */
> +		assert(local_instance == NULL);
> +		assert(box_is_configured());
This function works in the IPROTO thread and calling TX threaded function may 
lead to undefined behavior.
> +		local_instance = instance;
> +	} else if (uri_parse(&instance->uri, uri) || !instance->uri.service) {
> +			free(instance);
> +			diag_set(ClientError, ER_CFG, "uri",
> +				 "expected host:service or /unix.socket");
> +			return NULL;
> +	}
> +
> +	rlist_add_tail(&proxy_instances, &instance->link);
> +

> +
> +		if (!is_proxy_configured) {
> +			/*
> +			 * This can't throw, but should not be
> +			 * done in case of exception.
> +			 */
> +process_locally:
> +			cpipe_push_input(&tx_pipe, &msg->base);
> +			continue;
> +		}
> +
> +		struct proxy_instance *instance = proxy_find_instance(msg);
> +		if (instance == NULL) {
I think some actions with request should be done, at least clearing an input 
buffer space.
> +			diag_log();
> +			continue;
> +		}
> +		if (instance == local_instance) {
> +			if (msg->header.type != IPROTO_AUTH)
> +				goto process_locally;
> +			struct proxy_auth_msg *m = (struct proxy_auth_msg 

[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
  2018-10-03  8:49 ` Vladimir Davydov
  2018-10-04 11:54 ` [tarantool-patches] " Georgy Kirichenko
@ 2018-10-08 10:44 ` Vladimir Davydov
  2018-10-16 18:35   ` [tarantool-patches] " Konstantin Osipov
  2018-10-08 16:48 ` [tarantool-patches] " Vladimir Davydov
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-10-08 10:44 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches

On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> By default, upon new client connection, all requests from the client are
> forwarded through "guest" connection. Upon recieving an AUTH request,
> proxy processes it on a local instance (this can be done, since proxy is
> being run on one of the cluster instances and has access to user data).
> If auth is successful, new client requests are forwarded through
> corresponding user connection, otherwise, proxy keeps forwarding request
> through guest connection.

Before diving in the code, I'd like to inquire why you think that a
proxy should forward all user connections with the same credentials
through a single connection to the master. AFAIU this complicates the
implementation quite a bit and may negatively affect performance as
this design implies joggling with sync sequences.

What if the user deliberately created several connections so as to
avoid throttling certain transactions on hitting the network buffer
limit? I don't think that it's right to push all requests through
the same queue then.

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
                   ` (2 preceding siblings ...)
  2018-10-08 10:44 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-08 16:48 ` Vladimir Davydov
  2018-10-16 18:39   ` [tarantool-patches] " Konstantin Osipov
  2018-10-08 19:45 ` [tarantool-patches] " Vladimir Davydov
  2018-10-23 17:26 ` Konstantin Osipov
  5 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-10-08 16:48 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches

On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> Since salt proxy sends to a client differs from the salt it recieves
> from a remote instance, forwarding auth requests to establish non-guest
> connections is a little bit tricky:
> let hash1 = sha1(password),
>     hash2 = sha1(hash1)
> then upon auth proxy recieves such a string from the client:
>     reply = xor(hash1, sha1(proxy_salt, hash2))
> proxy has to send an auth request of such form to an instance:
>     request = xor(hash1, sha1(instance_salt, hash2))
> proxy fetches hash2 via a special message to tx thread (again, it is
> accessible, since proxy is run on one of the cluster instances).
> Then proxy computes hash1 = xor(reply, sha1(proxy_salt, hash2)) and
> computes the request using hash1, hash2 and instance_salt.

So unless the user is fine with guest access (which is rather unlikely
AFAIU), it doesn't make sense to run a proxy on a standalone instance,
does it?

If so, may be we could simplify both configuration and the code by
requiring a proxy to be a part of the replica set?

I mean instead of netbox.listen(), we could add a knob to box.cfg, say
box.cfg.proxy_enable = true|false. If this knob was set, the instance
would automatically forward all incoming iproto requests to members of
the replica set (including self). What do you think?

> Proxy may be configured like this:
> ```
> netbox = require("net.box")
> netbox.listen(uri_to_listen, {cluster={
> 	{uri=uri1, is_master=false},
> 	{uri=uri2, is_master=true},
> 	...
> 	}})
> ```

I don't like that the user has to explicitly configure which participant
is rw and which is ro. How will it work when box.ctl.promote is finally
implemented?

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
                   ` (3 preceding siblings ...)
  2018-10-08 16:48 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-08 19:45 ` Vladimir Davydov
  2018-10-16 18:42   ` [tarantool-patches] " Konstantin Osipov
  2018-10-23 17:26 ` Konstantin Osipov
  5 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-10-08 19:45 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: tarantool-patches

On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> +static void
> +proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg,
> +			struct proxy_connection *pcon)
> +{
> +	struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs));
> +	cs->sync = msg->header.sync;
> +	cs->con = con;
> +	msg->header.sync = ++pcon->sync_ctr;
> +	struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs};
> +	if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) ==
> +	    mh_end(pcon->sync_hash)) {
> +		diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash");
> +		diag_raise();
> +	}
> +	coio_write_xrow(&pcon->connection.io, &msg->header);
> +
> +	/*
> +	 * After forwarding the request, mark it as read and
> +	 * delete the msg.
> +	 */
> +	msg->p_ibuf->rpos += msg->len;
> +	iproto_msg_delete(msg);
> +}

> +static int
> +proxy_replier_f(va_list ap)
> +{
> +	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
> +
> +	struct ev_io io;
> +	coio_create(&io, pcon->connection.io.fd);
> +
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header row;
> +		coio_read_xrow(&io, &pcon->connection.ibuf, &row);
> +		uint64_t key = row.sync;
> +		mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL);
> +		if (i == mh_end(pcon->sync_hash)) {
> +			/*
> +			 * Some error. We recieved a reply with sync
> +			 * not corresponding to any connection
> +			 */
> +			say_warn("sync recieved from remote instance is not in sync_hash");
> +			continue;
> +		}
> +		struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i);
> +		if (row.type != IPROTO_CHUNK) {
> +			mh_i64ptr_remove(pcon->sync_hash, node, NULL);
> +		}
> +		struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val;
> +		struct iproto_connection *con = cs->con;
> +		uint64_t sync = cs->sync;
> +		row.sync = sync;
> +		free(cs);

I may be wrong, but I think that using a hash table for mapping client
connection syncs to server connection syncs, like you do here, may hurt
performance (think of thousands requests in flight). If we really want
to multiplex, I'd rather consider extending xrow_header with something
like connection_id. This might need some benchmarking though.

> +		coio_write_iproto_response(&con->output, &row);
> +
> +		fiber_gc();
> +	}
> +	return 0;
> +}

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-08 10:44 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-16 18:35   ` Konstantin Osipov
  0 siblings, 0 replies; 12+ messages in thread
From: Konstantin Osipov @ 2018-10-16 18:35 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/10/08 13:54]:
> On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> > By default, upon new client connection, all requests from the client are
> > forwarded through "guest" connection. Upon recieving an AUTH request,
> > proxy processes it on a local instance (this can be done, since proxy is
> > being run on one of the cluster instances and has access to user data).
> > If auth is successful, new client requests are forwarded through
> > corresponding user connection, otherwise, proxy keeps forwarding request
> > through guest connection.
> 
> Before diving in the code, I'd like to inquire why you think that a
> proxy should forward all user connections with the same credentials
> through a single connection to the master. AFAIU this complicates the
> implementation quite a bit and may negatively affect performance as
> this design implies joggling with sync sequences.
> 
> What if the user deliberately created several connections so as to
> avoid throttling certain transactions on hitting the network buffer
> limit? I don't think that it's right to push all requests through
> the same queue then.

It's a good question and we need to bench it.

Generally using fewer sockets to marshal requests is cheaper on
the server side, since it has to perform fewer syscalls to read
data from network.

But it depends on the actual load. For example, if all your
clients are already multiplexing requests, you're simply
performing useless work. But if your typical client is
"simple,stupid", e.g. request/response style, then you're
saving quite a bit of server CPU time. This can be easily  
seen in a  nosqlbench test.

I'd say a typical client is simple, stupid, so multiplexing for it is
useful. If you want to be real smart, you could turn multiplexing
on/off depending on request/response rate (turn it off for highly
loaded connections). But that's an optimization, as a general rule
the proxy has to do multiplexing.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-08 16:48 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-16 18:39   ` Konstantin Osipov
  2018-10-17  8:35     ` Vladimir Davydov
  0 siblings, 1 reply; 12+ messages in thread
From: Konstantin Osipov @ 2018-10-16 18:39 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/10/11 09:55]:
> On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> > Since salt proxy sends to a client differs from the salt it recieves
> > from a remote instance, forwarding auth requests to establish non-guest
> > connections is a little bit tricky:
> > let hash1 = sha1(password),
> >     hash2 = sha1(hash1)
> > then upon auth proxy recieves such a string from the client:
> >     reply = xor(hash1, sha1(proxy_salt, hash2))
> > proxy has to send an auth request of such form to an instance:
> >     request = xor(hash1, sha1(instance_salt, hash2))
> > proxy fetches hash2 via a special message to tx thread (again, it is
> > accessible, since proxy is run on one of the cluster instances).
> > Then proxy computes hash1 = xor(reply, sha1(proxy_salt, hash2)) and
> > computes the request using hash1, hash2 and instance_salt.
> 
> So unless the user is fine with guest access (which is rather unlikely
> AFAIU), it doesn't make sense to run a proxy on a standalone instance,
> does it?
> 
> If so, may be we could simplify both configuration and the code by
> requiring a proxy to be a part of the replica set?

It's OK to simplify the patch now and only make it work within a
replica set. In future we will have replication groups, remember,
and this way we'll get a standalone proxy for free.

> I mean instead of netbox.listen(), we could add a knob to box.cfg, say
> box.cfg.proxy_enable = true|false. If this knob was set, the instance
> would automatically forward all incoming iproto requests to members of
> the replica set (including self). What do you think?

We need a new knob for two reasons:
- we need to be able to turn on listen port before box.cfg; it's
  albeit separate, a popular user request.
- we need backward compatibility. i.e. old listen should work the
  old way, to not surprise users
> 
> > Proxy may be configured like this:
> > ```
> > netbox = require("net.box")
> > netbox.listen(uri_to_listen, {cluster={
> > 	{uri=uri1, is_master=false},
> > 	{uri=uri2, is_master=true},
> > 	...
> > 	}})
> > ```
> 
> I don't like that the user has to explicitly configure which participant
> is rw and which is ro. How will it work when box.ctl.promote is finally
> implemented?

Can we please implement a proxy in which there is no
{cluster=} option? As we were discussing with Vlad, proxy should
be integrated with box.ctl.promote() and learn about ro/rw masters
from it.

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-08 19:45 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-16 18:42   ` Konstantin Osipov
  0 siblings, 0 replies; 12+ messages in thread
From: Konstantin Osipov @ 2018-10-16 18:42 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/10/11 09:55]:
> I may be wrong, but I think that using a hash table for mapping client
> connection syncs to server connection syncs, like you do here, may hurt
> performance (think of thousands requests in flight). If we really want
> to multiplex, I'd rather consider extending xrow_header with something
> like connection_id. This might need some benchmarking though.

This is a typical approach used in other proxies. My guess is that
hashing an integer sync has very low overhead compared to the
overall cost of sending/receiving it. Besides, let's keep in mind
that a proxy is usually stateless so you can have many of them,
and CPU on a proxy is cheap. Basically, the principle should be
that as much work as possible should be done on a proxy.

> > +		coio_write_iproto_response(&con->output, &row);
> > +
> > +		fiber_gc();
> > +	}
> > +	return 0;
> > +}

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-16 18:39   ` [tarantool-patches] " Konstantin Osipov
@ 2018-10-17  8:35     ` Vladimir Davydov
  2018-10-17 15:31       ` Konstantin Osipov
  0 siblings, 1 reply; 12+ messages in thread
From: Vladimir Davydov @ 2018-10-17  8:35 UTC (permalink / raw)
  To: Konstantin Osipov; +Cc: tarantool-patches, Serge Petrenko

On Tue, Oct 16, 2018 at 09:39:58PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [18/10/11 09:55]:
> > On Tue, Oct 02, 2018 at 09:05:54PM +0300, Serge Petrenko wrote:
> > > Since salt proxy sends to a client differs from the salt it recieves
> > > from a remote instance, forwarding auth requests to establish non-guest
> > > connections is a little bit tricky:
> > > let hash1 = sha1(password),
> > >     hash2 = sha1(hash1)
> > > then upon auth proxy recieves such a string from the client:
> > >     reply = xor(hash1, sha1(proxy_salt, hash2))
> > > proxy has to send an auth request of such form to an instance:
> > >     request = xor(hash1, sha1(instance_salt, hash2))
> > > proxy fetches hash2 via a special message to tx thread (again, it is
> > > accessible, since proxy is run on one of the cluster instances).
> > > Then proxy computes hash1 = xor(reply, sha1(proxy_salt, hash2)) and
> > > computes the request using hash1, hash2 and instance_salt.
> > 
> > So unless the user is fine with guest access (which is rather unlikely
> > AFAIU), it doesn't make sense to run a proxy on a standalone instance,
> > does it?
> > 
> > If so, may be we could simplify both configuration and the code by
> > requiring a proxy to be a part of the replica set?
> 
> It's OK to simplify the patch now and only make it work within a
> replica set. In future we will have replication groups, remember,
> and this way we'll get a standalone proxy for free.
> 
> > I mean instead of netbox.listen(), we could add a knob to box.cfg, say
> > box.cfg.proxy_enable = true|false. If this knob was set, the instance
> > would automatically forward all incoming iproto requests to members of
> > the replica set (including self). What do you think?
> 
> We need a new knob for two reasons:
> - we need to be able to turn on listen port before box.cfg; it's
>   albeit separate, a popular user request.

Then it should be done in a separate patch before introducing proxy.

> - we need backward compatibility. i.e. old listen should work the
>   old way, to not surprise users

How? Two iproto threads?

> > 
> > > Proxy may be configured like this:
> > > ```
> > > netbox = require("net.box")
> > > netbox.listen(uri_to_listen, {cluster={
> > > 	{uri=uri1, is_master=false},
> > > 	{uri=uri2, is_master=true},
> > > 	...
> > > 	}})
> > > ```
> > 
> > I don't like that the user has to explicitly configure which participant
> > is rw and which is ro. How will it work when box.ctl.promote is finally
> > implemented?
> 
> Can we please implement a proxy in which there is no
> {cluster=} option? As we were discussing with Vlad, proxy should
> be integrated with box.ctl.promote() and learn about ro/rw masters
> from it.

Agree, but how can we implement proxy before promote then? May be, we
could use IPROTO_VOTE to inquire which replica is rw and which is ro?

^ permalink raw reply	[flat|nested] 12+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-17  8:35     ` Vladimir Davydov
@ 2018-10-17 15:31       ` Konstantin Osipov
  0 siblings, 0 replies; 12+ messages in thread
From: Konstantin Osipov @ 2018-10-17 15:31 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches, Serge Petrenko

* Vladimir Davydov <vdavydov.dev@gmail.com> [18/10/17 12:14]:

> > We need a new knob for two reasons:
> > - we need to be able to turn on listen port before box.cfg; it's
> >   albeit separate, a popular user request.
> 
> Then it should be done in a separate patch before introducing proxy.

Okay, but I was going to make the new knob behave differently
right from the start.
> 
> > - we need backward compatibility. i.e. old listen should work the
> >   old way, to not surprise users
> 
> How? Two iproto threads?

Why not run two event handlers in the same thread?

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

* [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.
  2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
                   ` (4 preceding siblings ...)
  2018-10-08 19:45 ` [tarantool-patches] " Vladimir Davydov
@ 2018-10-23 17:26 ` Konstantin Osipov
  5 siblings, 0 replies; 12+ messages in thread
From: Konstantin Osipov @ 2018-10-23 17:26 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Serge Petrenko

* Serge Petrenko <sergepetrenko@tarantool.org> [18/10/02 21:07]:

Some comments related to the patch itself, in addition to the
general comments which I sent out earlier.

First of all, the proxy has to be put into a separate module, with
a clear API. To work with iproto, we need to define an iproto API
which the proxy can use to instantiate itself in iproto thread. 

In other words, please try to separate clearly iproto.cc and
proxy.cc.

This will also ensure there is a clear API between proxy and tx,
as today there is between iproto and tx.

Iproto thread is simply a host/container thread for the proxy.
You could even  put the proxy in its own thread and move to
iproto in a separate patch. The code should be written in a way
that it doesn't change from moving it to a different thread.

Second, the proxy does not need to replace box.cfg{listen=}, the
latter can work without changes until it is deprecated and
removed. So there should be no need to change the bootstrap
procedure. 

> +void
> +box_init_iproto()
> +{
> +	if (!iproto_is_configured()) {
> +		/*
> +		 * Since iproto may be initted in a call to netbox.listen()
> +		 * before box is configured, we need to first establish
> +		 * a tx endpoint to write cbus messages.
> +		 */
> +		/* Join the cord interconnect as "tx" endpoint. */
> +		fiber_pool_create(&tx_fiber_pool, "tx",
> +				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
> +				  FIBER_POOL_IDLE_TIMEOUT);
> +		/* Add an extra endpoint for WAL wake up/rollback messages. */
> +		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
> +		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
> +		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
> +
> +		iproto_init();
> +	}
> +}

We have a terminology mess here. There is "net" thread which is
hosting "iproto" protocol. To run proxy, you need to initialize
the networking subsystem, and yes, since you don't know whether
netbox.listen() was called before box.cfg{} you need to make sure
it is initialized on demand. So you could put a refactoring which
ensures on-demand initialization of the net thread (the thread is
started, joins cbus, all the communication pipes between this
thread and tx are established) into a separate patch.

Please keep in mind that in future we will have a lot of work done
in the network thread - sharding, proxying, etc. So you could
create net.[hc] to separate the net thread logic from iproto/proxy
logic right away.

> -	iproto_init();
> +	box_init_iproto();

Don't understand the reason for this rename. 
>  	wal_thread_start();
>  
> +static bool is_iproto_configured = false;
> +static bool is_proxy_configured = false;

Why not allow multiple proxy instances? In other words, why not
have struct proxy encapsulating entire proxy state, (one listen
uri = one proxy)? 

>  
> +struct proxy_instance {

Missing comment. I could only guess what this data structure is
for. The name is tautological. Why not simply call it
struct proxy?

Turns out it stands for one of the proxy destinations. 

How do you deal with possible configuration loops? Imagine proxy 1
believes proxy 2 is rw, and proxy 2 believes proxy 1 is rw. It
could quickly bounce packets ad nauseam until routing tables at
all nodes are full. Do you have a test for this? 

We could establish a separate connection for the "peer" route and
ensure it never feeds requests directly to the tx thread - but
this could be dangerous when proxy configuration is changed.


> +	bool is_ro;

What is the API to change the local destination status? 

Let's imagine box.ctl.promote() is in place. It should begin by
suspending the routing at all nodes, and end by resuming it again.
The suspension should not affect control/status messages, which
should be passed directly to the destination node.

Did you implement a patch which adds an RO/RW flag to CALL? 

Finally, since we're adding SQL, and switching to top-down merges, 
we need to enable routing for SQL.

I understand these comments are more related to the scope of the
task, rather than the patch itself. You could address most of them
in separate patches.

> +	/** A map of all established connections to the instance. */
> +	struct mh_strnptr_t *fat_connections;

Why are they fat?

> +/**
> + * Search the proxie's instance list to find the one
> + * instance capable of processing the request in msg->header.
> + */
> +static struct proxy_instance *
> +proxy_find_instance(struct iproto_msg *msg)

The lookup should happen only when the client connection is
established, changes its credentials or there is an error/timeout.
Each proxy destination should maintain a separate connection for
each authenticated username, and the look up should simply match
the source connection to the destination one based on the user
name. 

Once the destination connections are "attached" to the source
ones, the messages should use the same connections again and
again.

In future users will want to set a policy for RO forwarding. 
For now, read only requests should always be forwarded to the
local instance. 

> +static void
> +proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg,
> +			struct proxy_connection *pcon)
> +{
> +	struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs));
> +	cs->sync = msg->header.sync;
> +	cs->con = con;
> +	msg->header.sync = ++pcon->sync_ctr;
> +	struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs};
> +	if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) ==
> +	    mh_end(pcon->sync_hash)) {
> +		diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash");
> +		diag_raise();
> +	}
> +	coio_write_xrow(&pcon->connection.io, &msg->header);

Writing rows to the output one by one is too slow.
The proxy client worker should work as follows:
Until there is available input:
 - read "readahead" bytes of input
 - populate output buffers in destination connections
 - flush output buffers in the destination connections
   in a non-blocking manner
 - check until there is more input and yield until
   either input or output is ready.

I doubt it is easy to do with fibers. I'd use callbacks, in the
same way as iproto does. In fact perhaps iproto code could be
re-factored to implement such I/O.

> +static int
> +proxy_forwarder_f(va_list ap)
> +	pcon->replier = fiber_new(name, proxy_replier_f);
> +	fiber_set_joinable(pcon->replier, true);
> +	fiber_start(pcon->replier, pcon);
> +
> +	while (!fiber_is_cancelled()) {
> +		struct rlist *head;
> +		while ((head = rlist_first(&pcon->request_queue)) ==
> +		       &pcon->request_queue) {
> +			fiber_cond_wait(&pcon->forwarder_cond);
> +		}
> +
> +		struct pqueue_entry *ent = rlist_shift_entry(&pcon->request_queue,
> +							     struct pqueue_entry,
> +							     link);
> +		struct iproto_connection *con = ent->con;
> +		struct iproto_msg * msg = ent->msg;
> +		free(ent);
> +
> +		proxy_forward_request(con, msg, pcon);
> +
> +		fiber_gc();
> +	}
> +	return 0;
> +}
> +	while (!rlist_empty(&proxy_instances)) {
> +		struct proxy_instance *instance =
> +		    rlist_shift_entry(&proxy_instances, struct proxy_instance, link);
> +		proxy_instance_delete(instance);
> +	}
> +}
> +static int
> +proxy_configure_f(struct cbus_call_msg *m)
> +{
> +	struct proxy_cfg_msg *msg = (struct proxy_cfg_msg *)m;
> +	/*
> +	 * If it is the second call to proxy_configure,
> +	 * first remove all the old instances and close
> +	 * connections, and do not init coio again.
> +	 */
> +	if (is_proxy_configured) {
> +		proxy_instance_list_purge();

Looks like you're dropping packets. This is not OK.

> +proxy_replier_f(va_list ap)
> +{
> +	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
> +
> +	struct ev_io io;
> +	coio_create(&io, pcon->connection.io.fd);
> +
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header row;
> +		coio_read_xrow(&io, &pcon->connection.ibuf, &row);
> +		uint64_t key = row.sync;
> +		mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL);
> +		if (i == mh_end(pcon->sync_hash)) {
> +			/*
> +			 * Some error. We recieved a reply with sync
> +			 * not corresponding to any connection
> +			 */
> +			say_warn("sync recieved from remote instance is not in sync_hash");
> +			continue;
> +		}
> +		struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i);
> +		if (row.type != IPROTO_CHUNK) {
> +			mh_i64ptr_remove(pcon->sync_hash, node, NULL);
> +		}
> +		struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val;
> +		struct iproto_connection *con = cs->con;
> +		uint64_t sync = cs->sync;
> +		row.sync = sync;
> +		free(cs);
> +		coio_write_iproto_response(&con->output, &row);
> +
> +		fiber_gc();

I think this should also be event driven. Please think about the
underlying system calls and try to minimize the amount of these.
> +	}
> +	return 0;
> +}
> --- a/src/scramble.c
> +++ b/src/scramble.c
> @@ -67,6 +67,29 @@ scramble_prepare(void *out, const void *salt, const void *password,
>  	xor(out, hash1, out, SCRAMBLE_SIZE);
>  }
>  
> +void
> +scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
> +		  const void *hash2)
> +{

Could easily go in a separate patch.

> +	unsigned char hash1[SCRAMBLE_SIZE];
> +	unsigned char sh[SCRAMBLE_SIZE];
> +	SHA1_CTX ctx;
> +
> +	SHA1Init(&ctx);
> +	SHA1Update(&ctx, salt, SCRAMBLE_SIZE);
> +	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
> +	SHA1Final(sh, &ctx);
> +
> +	xor(hash1, in, sh, SCRAMBLE_SIZE);
> +
> +	SHA1Init(&ctx);
> +	SHA1Update(&ctx, msalt, SCRAMBLE_SIZE);
> +	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
> +	SHA1Final(out, &ctx);
> +
> +	xor(out, hash1, out, SCRAMBLE_SIZE);
> +}
> +
>  int
>  scramble_check(const void *scramble, const void *salt, const void *hash2)
>  {
> diff --git a/src/scramble.h b/src/scramble.h
> index 7dee31483..2ca975bd7 100644

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 12+ messages in thread

end of thread, other threads:[~2018-10-23 17:26 UTC | newest]

Thread overview: 12+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-10-02 18:05 [tarantool-patches] [PATCH v2] iproto: introduce a proxy module Serge Petrenko
2018-10-03  8:49 ` Vladimir Davydov
2018-10-04 11:54 ` [tarantool-patches] " Georgy Kirichenko
2018-10-08 10:44 ` [tarantool-patches] " Vladimir Davydov
2018-10-16 18:35   ` [tarantool-patches] " Konstantin Osipov
2018-10-08 16:48 ` [tarantool-patches] " Vladimir Davydov
2018-10-16 18:39   ` [tarantool-patches] " Konstantin Osipov
2018-10-17  8:35     ` Vladimir Davydov
2018-10-17 15:31       ` Konstantin Osipov
2018-10-08 19:45 ` [tarantool-patches] " Vladimir Davydov
2018-10-16 18:42   ` [tarantool-patches] " Konstantin Osipov
2018-10-23 17:26 ` Konstantin Osipov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox