[tarantool-patches] [PATCH v2] iproto: introduce a proxy module.

Serge Petrenko sergepetrenko at tarantool.org
Tue Oct 2 21:05:54 MSK 2018


This patch introduces a proxy module which can be used to hide cluster
details from user.
A proxy runs on one of the cluster instances in iproto thread and
forwards rw requests, such as insert, replace to master and ro requests
to any suitable instance. The proxy also forwards requests to the
instance it is configured on, in case the instance is suitable for
performing such requests, and the proxy is configured accordingly.
When forwarding to a local instance, proxy doesn't use network and sends
requests to process directly to tx thread through cbus.
Proxy holds a single connection to a remote instance per every cluster
user.
By default, upon new client connection, all requests from the client are
forwarded through "guest" connection. Upon recieving an AUTH request,
proxy processes it on a local instance (this can be done, since proxy is
being run on one of the cluster instances and has access to user data).
If auth is successful, new client requests are forwarded through
corresponding user connection, otherwise, proxy keeps forwarding request
through guest connection.

Each instance connection has 2 corresponding fibers: one
writer("forwarder"), and one reader("replier"), a queue of requests
to be forwarded, a monotonically increasing sync counter, and a map of
client syncs and corresponding client connections to reply to. Upon recieving
a new request, proxy determines which instance is to process it by decoding
the request type.
Upon finding an instancea and an instance connection,
proxy adds the request and corresponding clientc onnection to send a reply
to to forwarder queue and signals the forwarder it has a new request
ready. Forwarder changes the request sync to a counter value and
remembers the old sync and the connection to reply to, once the reply is
recieved. Then forwarder sends modified request to the instance, and
proceeds to the next request.
Replier waits for input from instance connection. Upon recieving some,
it looks up the old sync value and the corresponding client connection
to write the reply to. Then it sends a reply to the client and continues
waiting for input from the instance.

Since salt proxy sends to a client differs from the salt it recieves
from a remote instance, forwarding auth requests to establish non-guest
connections is a little bit tricky:
let hash1 = sha1(password),
    hash2 = sha1(hash1)
then upon auth proxy recieves such a string from the client:
    reply = xor(hash1, sha1(proxy_salt, hash2))
proxy has to send an auth request of such form to an instance:
    request = xor(hash1, sha1(instance_salt, hash2))
proxy fetches hash2 via a special message to tx thread (again, it is
accessible, since proxy is run on one of the cluster instances).
Then proxy computes hash1 = xor(reply, sha1(proxy_salt, hash2)) and
computes the request using hash1, hash2 and instance_salt.

Part of #2625

@TarantoolBot document
Title: introduce a proxy module.

A proxy runs on one of the cluster instances in iproto thread and
forwards rw requests, such as insert, replace to master and ro requests
to any suitable instance. The proxy also forwards requests to the
instance it is configured on, in case the instance is suitable for
performing such requests, and the proxy is configured accordingly.
In such a case proxy doesn't use network and processes requests right away.

Proxy may be configured like this:
```
netbox = require("net.box")
netbox.listen(uri_to_listen, {cluster={
	{uri=uri1, is_master=false},
	{uri=uri2, is_master=true},
	...
	}})
```
---
Sorry, forgot to attach issue and branch in the last letter:
https://github.com/tarantool/tarantool/issues/2625
https://github.com/tarantool/tarantool/tree/sp/gh-2625-proxy

 src/box/box.cc          |  68 +++-
 src/box/box.h           |  13 +-
 src/box/errcode.h       |   2 +-
 src/box/iproto.cc       | 708 +++++++++++++++++++++++++++++++++++++++-
 src/box/iproto.h        |  23 ++
 src/box/lua/cfg.cc      |   2 +-
 src/box/lua/net_box.c   |  59 ++++
 src/box/lua/net_box.lua |   2 +
 src/box/xrow.c          |  46 +++
 src/box/xrow.h          |  44 +++
 src/box/xrow_io.cc      |  17 +
 src/box/xrow_io.h       |   6 +
 src/scramble.c          |  23 ++
 src/scramble.h          |   9 +
 14 files changed, 999 insertions(+), 23 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 804fc00e5..5a7f89930 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -741,13 +741,25 @@ box_set_replication_skip_conflict(void)
 }
 
 void
-box_listen(void)
+box_listen_xc(const char *uri)
 {
-	const char *uri = cfg_gets("listen");
+	if (uri == NULL)
+		uri = cfg_gets("listen");
 	box_check_uri(uri, "listen");
 	iproto_listen(uri);
 }
 
+int
+box_listen(const char *uri)
+{
+	try {
+		box_listen_xc(uri);
+	} catch (Exception * e) {
+		return -1;
+	}
+	return 0;
+}
+
 void
 box_set_log_level(void)
 {
@@ -1810,7 +1822,7 @@ bootstrap(const struct tt_uuid *instance_uuid,
 	 * Begin listening on the socket to enable
 	 * master-master replication leader election.
 	 */
-	box_listen();
+	box_listen_xc(NULL);
 	/*
 	 * Wait for the cluster to start up.
 	 *
@@ -1881,7 +1893,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 	recovery_scan(recovery, &replicaset.vclock);
 
 	if (wal_dir_lock >= 0) {
-		box_listen();
+		box_listen_xc(NULL);
 		box_sync_replication(false);
 
 		struct replica *master;
@@ -1944,7 +1956,7 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		 * applied in hot standby mode.
 		 */
 		vclock_copy(&replicaset.vclock, &recovery->vclock);
-		box_listen();
+		box_listen_xc(NULL);
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
@@ -1971,6 +1983,28 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
 	cbus_process(endpoint);
 }
 
+void
+box_init_iproto()
+{
+	if (!iproto_is_configured()) {
+		/*
+		 * Since iproto may be initted in a call to netbox.listen()
+		 * before box is configured, we need to first establish
+		 * a tx endpoint to write cbus messages.
+		 */
+		/* Join the cord interconnect as "tx" endpoint. */
+		fiber_pool_create(&tx_fiber_pool, "tx",
+				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
+				  FIBER_POOL_IDLE_TIMEOUT);
+		/* Add an extra endpoint for WAL wake up/rollback messages. */
+		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
+		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
+		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+
+		iproto_init();
+	}
+}
+
 void
 box_init(void)
 {
@@ -1999,16 +2033,20 @@ box_is_configured(void)
 static inline void
 box_cfg_xc(void)
 {
-	/* Join the cord interconnect as "tx" endpoint. */
-	fiber_pool_create(&tx_fiber_pool, "tx",
-			  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
-			  FIBER_POOL_IDLE_TIMEOUT);
-	/* Add an extra endpoint for WAL wake up/rollback messages. */
-	cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
-
-	rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
-	rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+	/*
+	 * We may have already done this in box_init_iproto()
+	 */
+	if (!iproto_is_configured()) {
+		/* Join the cord interconnect as "tx" endpoint. */
+		fiber_pool_create(&tx_fiber_pool, "tx",
+				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
+				  FIBER_POOL_IDLE_TIMEOUT);
+		/* Add an extra endpoint for WAL wake up/rollback messages. */
+		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
 
+		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
+		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
+	}
 	gc_init();
 	engine_init();
 	if (module_init() != 0)
@@ -2016,7 +2054,7 @@ box_cfg_xc(void)
 	schema_init();
 	replication_init();
 	port_init();
-	iproto_init();
+	box_init_iproto();
 	wal_thread_start();
 
 	title("loading");
diff --git a/src/box/box.h b/src/box/box.h
index 9930d4a1a..cfbcc033e 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -51,6 +51,12 @@ struct obuf;
 struct ev_io;
 struct auth_request;
 
+/*
+ * Init iproto thread if it wasn't initted yet.
+ */
+void
+box_init_iproto(void);
+
 /*
  * Initialize box library
  * @throws C++ exception
@@ -155,6 +161,12 @@ const char *box_status(void);
 void
 box_reset_stat(void);
 
+void
+box_listen_xc(const char *uri);
+
+int
+box_listen(const char *uri);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
@@ -177,7 +189,6 @@ box_process_vote(struct ballot *ballot);
 void
 box_check_config();
 
-void box_listen(void);
 void box_set_replication(void);
 void box_set_log_level(void);
 void box_set_log_format(void);
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 4115e6b65..c49018c5e 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -205,7 +205,7 @@ struct errcode_record {
 	/*150 */_(ER_CANT_CREATE_COLLATION,	"Failed to initialize collation: %s.") \
 	/*151 */_(ER_WRONG_COLLATION_OPTIONS,	"Wrong collation options (field %u): %s") \
 	/*152 */_(ER_NULLABLE_PRIMARY,		"Primary index of the space '%s' can not contain nullable parts") \
-	/*153 */_(ER_UNUSED,			"") \
+	/*153 */_(ER_NO_SUCH_INSTANCE,		"No instance found to proxy request to") \
 	/*154 */_(ER_TRANSACTION_YIELD,		"Transaction has been aborted by a fiber yield") \
 	/*155 */_(ER_NO_SUCH_GROUP,		"Replication group '%s' does not exist") \
 
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3d13bad2f..0bce58a02 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -46,6 +46,7 @@
 #include "sio.h"
 #include "evio.h"
 #include "coio.h"
+#include "coio_task.h"
 #include "scoped_guard.h"
 #include "memory.h"
 #include "random.h"
@@ -56,11 +57,14 @@
 #include "tuple_convert.h"
 #include "session.h"
 #include "xrow.h"
+#include "xrow_io.h"
 #include "schema.h" /* schema_version */
 #include "replication.h" /* instance_uuid */
 #include "iproto_constants.h"
 #include "rmean.h"
 #include "errinj.h"
+#include "assoc.h"
+#include "uri.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -127,6 +131,22 @@ unsigned iproto_readahead = 16320;
 /* The maximal number of iproto messages in fly. */
 static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
 
+
+static bool is_iproto_configured = false;
+static bool is_proxy_configured = false;
+
+bool
+iproto_is_configured(void)
+{
+    return is_iproto_configured;
+}
+
+bool
+proxy_is_configured(void)
+{
+	return is_proxy_configured;
+}
+
 /**
  * How big is a buffer which needs to be shrunk before
  * it is put back into buffer cache.
@@ -154,6 +174,224 @@ iproto_reset_input(struct ibuf *ibuf)
 	}
 }
 
+struct proxy_instance {
+	bool is_ro;
+	/**
+	 * A flag indicating this instance refers to a local
+	 * instance the proxy is being run on. Needed to remember
+	 * not to establish connections to such an instance,
+	 * because all the forwarding will be done directly to tx
+	 * thread over cbus.
+	 */
+	bool is_local;
+	struct uri uri;
+
+	/** Instance's link in a list of all known instances. */
+	struct rlist link;
+	/** A map of all established connections to the instance. */
+	struct mh_strnptr_t *fat_connections;
+};
+
+struct sync_hash_entry {
+	uint64_t sync;
+	struct iproto_connection *con;
+};
+
+/** A single connection to a remote instance. */
+struct proxy_connection {
+	/** The instance we hold a connection to. */
+	struct proxy_instance *instance;
+	char * username;
+	/**
+	 * Monotonically increasing sync value for all requests
+	 * forwarded to instance through this connection.
+	 */
+	uint64_t sync_ctr;
+	/** Connection to the instance. */
+	struct {
+		struct ev_io io;
+		struct ibuf ibuf;
+	} connection;
+	/**
+	 * A map containing pairs of translated syncs plus
+	 * corresponding connections and original syncs.
+	 */
+	struct mh_i64ptr_t *sync_hash;
+	/**
+	 * A fiber recieving responses from the instance and
+	 * forwarding them back to clients.
+	 */
+	struct fiber *replier;
+	/** A fiber forwarding requests to the instance. */
+	struct fiber *forwarder;
+	/**
+	 * A cond to be signaled upon adding new entries to the request queue.
+	 */
+	struct fiber_cond forwarder_cond;
+	/*
+	 * A queue of all the requests to be forwarded through
+	 * the connection.
+	 */
+	struct rlist request_queue;
+	/** Salt recieved in instance's greeting. */
+	uint8_t salt[IPROTO_SALT_SIZE];
+};
+
+/** A single entry in proxy_connection request queue. */
+struct pqueue_entry {
+	/** Entry link in queue. */
+	struct rlist link;
+	/** The iproto connection a request was recieved from. */
+	struct iproto_connection *con;
+	/**
+	 * An iproto_msg corresponding to the request, as parsed
+	 * in iproto_msg_decode().
+	 */
+	struct iproto_msg *msg;
+};
+
+/**
+ * The local instance. Set only if proxy is run on one of the
+ * cluster nodes.
+ */
+struct proxy_instance *local_instance = NULL;
+
+/** A list of all instances known to proxy. */
+static RLIST_HEAD(proxy_instances);
+
+static struct proxy_instance *
+proxy_instance_new(bool is_ro, bool is_local, const char *uri)
+{
+	struct proxy_instance *instance = (struct proxy_instance *)
+		calloc(1, sizeof(*instance));
+	if (instance == NULL) {
+		diag_set(OutOfMemory, sizeof(*instance), "malloc",
+			 "struct proxy_instance");
+		return NULL;
+	}
+
+	instance->fat_connections = mh_strnptr_new();
+	if (instance->fat_connections == NULL) {
+		free(instance);
+		diag_set(OutOfMemory, sizeof(*instance->fat_connections), "malloc",
+			 "struct mh_strnptr_t");
+		return NULL;
+	}
+
+	instance->is_ro = is_ro;
+	instance->is_local = is_local;
+	if (is_local) {
+		/* There can be only one local instance. */
+		assert(local_instance == NULL);
+		assert(box_is_configured());
+		local_instance = instance;
+	} else if (uri_parse(&instance->uri, uri) || !instance->uri.service) {
+			free(instance);
+			diag_set(ClientError, ER_CFG, "uri",
+				 "expected host:service or /unix.socket");
+			return NULL;
+	}
+
+	rlist_add_tail(&proxy_instances, &instance->link);
+
+	return instance;
+}
+
+static void
+proxy_connection_delete(struct proxy_connection *pcon);
+
+static void
+proxy_instance_delete(struct proxy_instance *instance)
+{
+	if (instance->is_local) {
+		assert(local_instance != NULL);
+		local_instance = NULL;
+	}
+	mh_int_t i;
+	mh_foreach(instance->fat_connections, i) {
+		struct proxy_connection *pcon = (struct proxy_connection *) mh_strnptr_node(instance->fat_connections, i)->val;
+		mh_strnptr_del(instance->fat_connections, i, 0);
+		proxy_connection_delete(pcon);
+	}
+	free(instance);
+}
+
+static int
+proxy_replier_f(va_list ap);
+
+static int
+proxy_forwarder_f(va_list ap);
+
+/**
+ * Initiate a new connection to a remote instance.
+ */
+static struct proxy_connection *
+proxy_connection_new(struct proxy_instance *instance, const char *username,
+		     const char *scramble, const char *salt)
+{
+	static const char *guest = "guest";
+	if (username == NULL)
+		username = guest;
+	/* We do not establish connections to a local instance. */
+	assert(!instance->is_local && instance != local_instance);
+	struct proxy_connection *pcon = (struct proxy_connection *)
+		calloc(1, sizeof(*pcon));
+	if (pcon == NULL) {
+		diag_set(OutOfMemory, sizeof(*pcon), "malloc",
+			 "struct proxy_connection");
+		return NULL;
+	}
+	pcon->instance = instance;
+	pcon->username = (char *)malloc(strlen(username) + 1);
+	strcpy(pcon->username, username);
+	pcon->sync_ctr = 0;
+	ibuf_create(&pcon->connection.ibuf, cord_slab_cache(), iproto_readahead);
+
+	pcon->sync_hash = mh_i64ptr_new();
+	rlist_create(&pcon->request_queue);
+	fiber_cond_create(&pcon->forwarder_cond);
+
+	char name[FIBER_NAME_MAX];
+	int pos = snprintf(name, sizeof(name), "forwarder/");
+	pos += snprintf(name + pos, sizeof(name) - pos, "%s", username);
+	pos += snprintf(name + pos, sizeof(name) - pos, "@");
+	uri_format(name + pos, sizeof(name) - pos, &instance->uri, false);
+	pcon->forwarder = fiber_new(name, proxy_forwarder_f);
+	fiber_set_joinable(pcon->forwarder, true);
+	fiber_start(pcon->forwarder, pcon, scramble, salt);
+
+	uint32_t hash = mh_strn_hash(username, strlen(username));
+	struct mh_strnptr_node_t node = {username, strlen(username), hash, pcon};
+	struct mh_strnptr_node_t *ret;
+	mh_strnptr_put(instance->fat_connections, &node, &ret, NULL);
+	/*
+	 * Make sure we didn't have a connection under the same
+	 * user previously.
+	 */
+	assert(ret == NULL);
+	return pcon;
+}
+
+static void
+proxy_connection_delete(struct proxy_connection *pcon)
+{
+	free(pcon->username);
+	ibuf_destroy(&pcon->connection.ibuf);
+	fiber_cancel(pcon->forwarder);
+	fiber_cancel(pcon->replier);
+	fiber_cond_destroy(&pcon->forwarder_cond);
+	mh_int_t i;
+	mh_foreach(pcon->sync_hash, i) {
+		struct sync_hash_entry *cs = (struct sync_hash_entry *)
+					     mh_i64ptr_node(pcon->sync_hash,
+							    i)->val;
+		mh_i64ptr_del(pcon->sync_hash, i, 0);
+		free(cs);
+	}
+
+	free(pcon);
+}
+
 /* {{{ iproto_msg - declaration */
 
 /**
@@ -303,6 +541,30 @@ static const struct cmsg_hop push_route[] = {
 	{ tx_end_push, NULL }
 };
 
+struct proxy_auth_msg {
+	struct cmsg base;
+	struct iproto_msg *iproto_msg;
+	bool success;
+};
+
+/**
+ * Process an auth locally remembering whether it was successful or not.
+ */
+static void
+tx_process_proxy_auth(struct cmsg *m);
+
+/**
+ * Send a reply with auth result to the client.
+ * Also upon a successful auth remember scramble
+ * recieved from a client.
+ */
+static void
+proxy_finish_auth(struct cmsg *m);
+
+static const struct cmsg_hop proxy_auth_route[] = {
+	{ tx_process_proxy_auth, &net_pipe },
+	{ proxy_finish_auth, NULL },
+};
 
 /* }}} */
 
@@ -449,6 +711,11 @@ struct iproto_connection
 		 */
 		bool is_push_pending;
 	} tx;
+	struct {
+		char *user_name;
+		bool authenticated;
+		char scramble[SCRAMBLE_SIZE];
+	} proxy;
 	/** Authentication salt. */
 	char salt[IPROTO_SALT_SIZE];
 };
@@ -672,6 +939,12 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	return new_ibuf;
 }
 
+static struct proxy_instance *
+proxy_find_instance(struct iproto_msg *msg);
+
+static struct proxy_connection *
+proxy_find_connection(struct proxy_instance *instance, const char *username);
+
 /**
  * Enqueue all requests which were read up. If a request limit is
  * reached - stop the connection input even if not the whole batch
@@ -734,16 +1007,63 @@ err_msgpack:
 		msg->len = reqend - reqstart; /* total request length */
 
 		iproto_msg_decode(msg, &pos, reqend, &stop_input);
-		/*
-		 * This can't throw, but should not be
-		 * done in case of exception.
-		 */
-		cpipe_push_input(&tx_pipe, &msg->base);
+
 		n_requests++;
 		/* Request is parsed */
 		assert(reqend > reqstart);
 		assert(con->parse_size >= (size_t) (reqend - reqstart));
 		con->parse_size -= reqend - reqstart;
+
+		if (!is_proxy_configured) {
+			/*
+			 * This can't throw, but should not be
+			 * done in case of exception.
+			 */
+process_locally:
+			cpipe_push_input(&tx_pipe, &msg->base);
+			continue;
+		}
+
+		struct proxy_instance *instance = proxy_find_instance(msg);
+		if (instance == NULL) {
+			diag_log();
+			continue;
+		}
+		if (instance == local_instance) {
+			if (msg->header.type != IPROTO_AUTH)
+				goto process_locally;
+			struct proxy_auth_msg *m = (struct proxy_auth_msg *)malloc(sizeof(*m));
+			m->iproto_msg = msg;
+			cmsg_init(&m->base, proxy_auth_route);
+			cpipe_push_input(&tx_pipe, &m->base);
+			continue;
+		}
+		const char *username = NULL;
+		if (con->proxy.authenticated) {
+			username = con->proxy.user_name;
+		}
+		struct proxy_connection *pcon = proxy_find_connection(instance,
+								      username);
+
+		if (pcon == NULL) {
+			pcon = proxy_connection_new(instance, username,
+						    con->proxy.scramble,
+						    con->salt);
+			if (pcon == NULL)
+				diag_raise();
+		}
+
+		struct pqueue_entry *ent = (struct pqueue_entry *)
+					   calloc(1, sizeof(*ent));
+		if (ent == NULL) {
+			diag_set(OutOfMemory, sizeof(*ent), "malloc",
+				 "struct pqueue_entry");
+			diag_raise();
+		}
+		ent->msg = msg;
+		ent->con = con;
+		rlist_add_tail(&pcon->request_queue, &ent->link);
+		fiber_cond_signal(&pcon->forwarder_cond);
 	}
 	if (stop_input) {
 		/**
@@ -996,6 +1316,8 @@ iproto_connection_new(int fd)
 	con->is_disconnected = false;
 	con->tx.is_push_pending = false;
 	con->tx.is_push_sent = false;
+	con->proxy.user_name = NULL;
+	con->proxy.authenticated = false;
 	return con;
 }
 
@@ -1932,6 +2254,7 @@ iproto_init()
 		/* .sync = */ iproto_session_sync,
 	};
 	session_vtab_registry[SESSION_TYPE_BINARY] = iproto_session_vtab;
+	is_iproto_configured = true;
 }
 
 /** Available iproto configuration changes. */
@@ -2041,3 +2364,378 @@ iproto_set_msg_max(int new_iproto_msg_max)
 	iproto_do_cfg(&cfg_msg);
 	cpipe_set_max_input(&net_pipe, new_iproto_msg_max / 2);
 }
+
+/**
+ * Search the proxie's instance list to find the one
+ * instance capable of processing the request in msg->header.
+ */
+static struct proxy_instance *
+proxy_find_instance(struct iproto_msg *msg)
+{
+	bool request_is_ro;
+	uint8_t type = msg->header.type;
+
+	switch (type) {
+	case IPROTO_SELECT:
+	case IPROTO_AUTH:
+		request_is_ro = true;
+		break;
+	case IPROTO_INSERT:
+	case IPROTO_REPLACE:
+	case IPROTO_UPDATE:
+	case IPROTO_DELETE:
+	case IPROTO_UPSERT:
+	case IPROTO_EVAL:
+	case IPROTO_CALL:
+	case IPROTO_CALL_16:
+		request_is_ro = false;
+		break;
+	default:
+		/*
+		 * ping, join, subscribe,
+		 * vote, vote_deprecated go here
+		 */
+		if (local_instance != NULL)
+			return local_instance;
+		else
+			request_is_ro = false;
+	}
+
+	if (local_instance != NULL && (!local_instance->is_ro || request_is_ro))
+		return local_instance;
+
+	struct proxy_instance *instance;
+	rlist_foreach_entry(instance, &proxy_instances, link) {
+		/* Rules can be more complex in future. */
+		if (!instance->is_ro || request_is_ro) {
+			return instance;
+		}
+	}
+	diag_set(ClientError, ER_NO_SUCH_INSTANCE);
+	return NULL;
+}
+
+/**
+ * Find an established connection authenticated as specified user.
+ */
+static struct proxy_connection *
+proxy_find_connection(struct proxy_instance *instance, const char *username)
+{
+	static const char *guest = "guest";
+	if (username == NULL)
+		username = guest;
+	uint32_t hash = mh_strn_hash(username, strlen(username));
+	struct mh_strnptr_key_t key = {username, strlen(username), hash};
+	mh_int_t i = mh_strnptr_find(instance->fat_connections, &key, NULL);
+	if (i != mh_end(instance->fat_connections)) {
+		struct mh_strnptr_node_t *node = mh_strnptr_node(instance->fat_connections, i);
+		return (struct proxy_connection *)node->val;
+	} else
+		return NULL;
+}
+
+static void
+tx_process_proxy_auth(struct cmsg *m)
+{
+	struct proxy_auth_msg *msg = (struct proxy_auth_msg *)m;
+	struct iproto_connection *con = msg->iproto_msg->connection;
+	try {
+		box_process_auth(&msg->iproto_msg->auth, con->salt);
+	} catch (Exception *e) {
+		msg->success = false;
+		tx_reply_error(msg->iproto_msg);
+		return;
+	}
+	msg->success = true;
+
+	struct obuf *out = con->tx.p_obuf;
+	iproto_reply_ok_xc(out, msg->iproto_msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->iproto_msg->wpos, out);
+}
+
+static void
+proxy_finish_auth(struct cmsg *m)
+{
+	struct proxy_auth_msg *msg = (struct proxy_auth_msg *)m;
+	struct iproto_connection *con = msg->iproto_msg->connection;
+
+	con->proxy.authenticated = msg->success;
+
+	if(msg->success) {
+		const char *user_name = msg->iproto_msg->auth.user_name;
+		const char *scramble = msg->iproto_msg->auth.scramble;
+		uint32_t username_len = mp_decode_strl(&user_name);
+		con->proxy.user_name = (char *)malloc(1 + username_len);
+		strncpy(con->proxy.user_name, user_name, username_len);
+		uint32_t part_count = mp_decode_array(&scramble);
+		/* These conditions were checked in box_process_auth() */
+		assert(part_count == 0 && strcmp(con->proxy.user_name, "guest") == 0 ||
+		       part_count == 2);
+		if (part_count < 2) {
+			memset(con->proxy.scramble, 0, SCRAMBLE_SIZE);
+		} else {
+			mp_next(&scramble);
+			uint32_t scramble_len;
+			if (mp_typeof(*scramble) == MP_STR) {
+				scramble = mp_decode_str(&scramble, &scramble_len);
+			} else {
+				/* type is MP_BIN, as checked in authenticate() */
+				scramble = mp_decode_bin(&scramble, &scramble_len);
+			}
+			assert(scramble_len == SCRAMBLE_SIZE);
+			strncpy(con->proxy.scramble, scramble,
+				scramble_len);
+		}
+	}
+
+	net_send_msg(&msg->iproto_msg->base);
+}
+
+/**
+ * Forward the request specified in msg->header
+ * to a specified instance connection.
+ */
+static void
+proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg,
+			struct proxy_connection *pcon)
+{
+	struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs));
+	cs->sync = msg->header.sync;
+	cs->con = con;
+	msg->header.sync = ++pcon->sync_ctr;
+	struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs};
+	if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) ==
+	    mh_end(pcon->sync_hash)) {
+		diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash");
+		diag_raise();
+	}
+	coio_write_xrow(&pcon->connection.io, &msg->header);
+
+	/*
+	 * After forwarding the request, mark it as read and
+	 * delete the msg.
+	 */
+	msg->p_ibuf->rpos += msg->len;
+	iproto_msg_delete(msg);
+}
+
+/**
+ * A message used to fetch user's hash2 from tx thread.
+ */
+struct proxy_hash2_msg {
+	struct cbus_call_msg base;
+	char *username;
+	size_t username_len;
+	char hash2[SCRAMBLE_SIZE];
+};
+
+static int
+proxy_get_hash2(struct cbus_call_msg *msg)
+{
+	struct proxy_hash2_msg *m = (struct proxy_hash2_msg *)msg;
+	struct user *user = user_find_by_name(m->username, m->username_len);
+	if (user == NULL)
+		return -1;
+	strncpy(m->hash2, user->def->hash2, SCRAMBLE_SIZE);
+	return 0;
+}
+
+/**
+ * Given a proxy_connection, establish an actual connection to a
+ * remote instance: connect and authenticate.
+ */
+static int
+proxy_establish_connection(struct proxy_connection *pcon, const char *scramble,
+			   const char *salt)
+{
+	coio_create(&pcon->connection.io, -1);
+	coio_connect(&pcon->connection.io, &pcon->instance->uri, NULL, NULL);
+
+	char greetingbuf[IPROTO_GREETING_SIZE];
+	struct greeting greeting;
+	coio_readn(&pcon->connection.io, greetingbuf, IPROTO_GREETING_SIZE);
+	if (greeting_decode(greetingbuf, &greeting) != 0) {
+		tnt_raise(LoggedError, ER_PROTOCOL, "Invalid greeting");
+	}
+	memcpy(pcon->salt, greeting.salt, IPROTO_SALT_SIZE);
+	if (strcmp(pcon->username, "guest")) {
+		size_t username_len = strlen(pcon->username);
+		struct xrow_header row;
+		memset(&row, 0, sizeof(row));
+		if (scramble != NULL) {
+			struct proxy_hash2_msg *msg = (struct proxy_hash2_msg *)
+			    calloc(1, sizeof(*msg));
+			msg->username = pcon->username;
+			msg->username_len = username_len;
+			if (cbus_call(&tx_pipe, &net_pipe, &msg->base,
+				      proxy_get_hash2, NULL, TIMEOUT_INFINITY) != 0)
+				diag_raise();
+
+			xrow_reencode_auth_xc(&row, salt, IPROTO_SALT_SIZE,
+					      greeting.salt,
+					      greeting.salt_len,  pcon->username,
+					      username_len,
+					      scramble, msg->hash2);
+			free(msg);
+		} else {
+			xrow_encode_auth_xc(&row, greeting.salt, greeting.salt_len,
+					    pcon->username, username_len, scramble, 0);
+		}
+
+		coio_write_xrow(&pcon->connection.io, &row);
+		coio_read_xrow(&pcon->connection.io, &pcon->connection.ibuf, &row);
+		if (row.type != IPROTO_OK) {
+			xrow_decode_error_xc(&row);
+		}
+	}
+	return 0;
+}
+
+static int
+proxy_forwarder_f(va_list ap)
+{
+	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
+	const char *scramble = va_arg(ap, char *);
+	const char *salt = va_arg(ap, char *);
+
+	proxy_establish_connection(pcon, scramble, salt);
+
+	char name[FIBER_NAME_MAX];
+	int pos = snprintf(name, sizeof(name), "replier/");
+	pos += snprintf(name + pos, sizeof(name) - pos, "@");
+	uri_format(name + pos, sizeof(name) - pos, &pcon->instance->uri, false);
+	pcon->replier = fiber_new(name, proxy_replier_f);
+	fiber_set_joinable(pcon->replier, true);
+	fiber_start(pcon->replier, pcon);
+
+	while (!fiber_is_cancelled()) {
+		struct rlist *head;
+		while ((head = rlist_first(&pcon->request_queue)) ==
+		       &pcon->request_queue) {
+			fiber_cond_wait(&pcon->forwarder_cond);
+		}
+
+		struct pqueue_entry *ent = rlist_shift_entry(&pcon->request_queue,
+							     struct pqueue_entry,
+							     link);
+		struct iproto_connection *con = ent->con;
+		struct iproto_msg * msg = ent->msg;
+		free(ent);
+
+		proxy_forward_request(con, msg, pcon);
+
+		fiber_gc();
+	}
+	return 0;
+}
+
+/**
+ * Delete all the instances from instance list.
+ */
+static void
+proxy_instance_list_purge()
+{
+	while (!rlist_empty(&proxy_instances)) {
+		struct proxy_instance *instance =
+		    rlist_shift_entry(&proxy_instances, struct proxy_instance, link);
+		proxy_instance_delete(instance);
+	}
+}
+
+/**
+ * A message used to pass proxy configuration parameters to iproto thread.
+ */
+struct proxy_cfg_msg: public cbus_call_msg
+{
+	struct proxy_cfg *config;
+};
+
+static int
+proxy_configure_f(struct cbus_call_msg *m)
+{
+	struct proxy_cfg_msg *msg = (struct proxy_cfg_msg *)m;
+	/*
+	 * If it is the second call to proxy_configure,
+	 * first remove all the old instances and close
+	 * connections, and do not init coio again.
+	 */
+	if (is_proxy_configured) {
+		proxy_instance_list_purge();
+	} else {
+		coio_init();
+		coio_enable();
+	}
+	for(size_t i = 0; msg->config[i].uri != NULL; ++i) {
+		/* Will do for now??? */
+		bool is_ro = !msg->config[i].is_master;
+		bool is_local = msg->config[i].is_local;
+		struct proxy_instance *instance =
+			proxy_instance_new(is_ro, is_local, msg->config[i].uri);
+		if (instance == NULL) {
+			return -1;
+		}
+		/*
+		 * Init a guest connection to an instance:
+		 * It will be the first connection in instance's
+		 * fat connecitons list.
+		 */
+		if (!is_local) {
+			struct proxy_connection *con =
+				proxy_connection_new(instance, NULL, NULL, NULL);
+			if (con == NULL) {
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
+
+void
+proxy_configure(struct proxy_cfg *config)
+{
+	struct proxy_cfg_msg msg;
+	msg.config = config;
+	int rc = cbus_call(&net_pipe, &tx_pipe, &msg, proxy_configure_f, NULL,
+			   TIMEOUT_INFINITY);
+	free(config);
+	if (rc != 0)
+		diag_raise();
+	is_proxy_configured = true;
+}
+
+static int
+proxy_replier_f(va_list ap)
+{
+	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
+
+	struct ev_io io;
+	coio_create(&io, pcon->connection.io.fd);
+
+	while (!fiber_is_cancelled()) {
+		struct xrow_header row;
+		coio_read_xrow(&io, &pcon->connection.ibuf, &row);
+		uint64_t key = row.sync;
+		mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL);
+		if (i == mh_end(pcon->sync_hash)) {
+			/*
+			 * Some error. We recieved a reply with sync
+			 * not corresponding to any connection
+			 */
+			say_warn("sync recieved from remote instance is not in sync_hash");
+			continue;
+		}
+		struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i);
+		if (row.type != IPROTO_CHUNK) {
+			mh_i64ptr_remove(pcon->sync_hash, node, NULL);
+		}
+		struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val;
+		struct iproto_connection *con = cs->con;
+		uint64_t sync = cs->sync;
+		row.sync = sync;
+		free(cs);
+		coio_write_iproto_response(&con->output, &row);
+
+		fiber_gc();
+	}
+	return 0;
+}
diff --git a/src/box/iproto.h b/src/box/iproto.h
index b9a6cf8f7..6bb76ae2f 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -56,6 +56,12 @@ enum {
 
 extern unsigned iproto_readahead;
 
+bool
+iproto_is_configured(void);
+
+bool
+proxy_is_configured(void);
+
 /**
  * Return size of memory used for storing network buffers.
  */
@@ -68,6 +74,23 @@ iproto_mem_used(void);
 void
 iproto_reset_stat(void);
 
+struct proxy_cfg {
+	const char *uri;
+	bool is_master;
+	/*
+	 * A flag set to true for the instance
+	 * proxy is being run on.
+	 */
+	bool is_local;
+};
+
+/*
+ * Configure proxy. Initiate guest connections to instances from
+ * config and start proxying requests.
+ */
+void
+proxy_configure(struct proxy_cfg *config);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index c3825591c..19f6ec0f6 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -69,7 +69,7 @@ static int
 lbox_cfg_set_listen(struct lua_State *L)
 {
 	try {
-		box_listen();
+		box_listen_xc(NULL);
 	} catch (Exception *) {
 		luaT_error(L);
 	}
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index ad47f724b..36b24716a 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -35,10 +35,12 @@
 #include <msgpuck.h> /* mp_store_u32() */
 #include "scramble.h"
 
+#include "box/box.h" /* box_listen() */
 #include "box/iproto_constants.h"
 #include "box/lua/tuple.h" /* luamp_convert_tuple() / luamp_convert_key() */
 #include "box/xrow.h"
 #include "box/tuple.h"
+#include "box/iproto.h" /* proxy_configure() / struct proxy_cfg */
 
 #include "lua/msgpack.h"
 #include "third_party/base64.h"
@@ -593,6 +595,62 @@ netbox_decode_body(struct lua_State *L)
 	return 2;
 }
 
+static int
+netbox_listen(struct lua_State *L)
+{
+	int opts_count = lua_gettop(L);
+	if (opts_count == 0 || (opts_count == 1 && !box_is_configured()))
+		return luaL_error(L, "Usage: netbox.listen(uri, {cluster="\
+				     "{{uri=uri1, is_master1=true/false}, ...}})");
+
+	const char *uri = lua_tostring(L, 1);
+	box_init_iproto();
+
+	if (opts_count > 2) {
+		lua_settop(L, 2);
+	} else if (opts_count == 1) {
+		if(box_listen(uri) == -1)
+			luaT_error(L);
+		return 0;
+	}
+	luaL_checktype(L, 2, LUA_TTABLE);
+	lua_getfield(L, 2, "cluster");
+	luaL_checktype(L, -1, LUA_TTABLE);
+	int opts_len = lua_objlen(L, -1);
+	for(int i = 1; i <= opts_len; ++i) {
+		lua_rawgeti(L, 3, i);
+		luaL_checktype(L, -1, LUA_TTABLE);
+		lua_getfield(L, -1, "uri");
+		lua_getfield(L, -2, "is_master");
+	}
+	struct proxy_cfg *config = malloc((opts_len + 1) * sizeof(*config));
+	if (config == NULL) {
+		diag_set(OutOfMemory, (opts_len + 1) * sizeof(*config),
+			 "malloc", "struct proxy_cfg");
+		luaT_error(L);
+	}
+	size_t pos = 0;
+	for(int i = - opts_len * 3; i < 0; i += 3) {
+		config[pos].uri = lua_tostring(L, i + 1);
+		config[pos].is_master = lua_toboolean(L, i + 2);
+		config[pos].is_local = strcmp(uri, config[pos].uri) == 0;
+		if (config[pos].is_local && !box_is_configured()) {
+			free(config);
+			return luaL_error(L, "You cannot proxy to local instance "\
+					     "before box is configured");
+		}
+		++pos;
+	}
+	config[pos].uri = NULL;
+
+	if(box_listen(uri) == -1)
+		luaT_error(L);
+
+	proxy_configure(config);
+
+	return 0;
+}
+
 int
 luaopen_net_box(struct lua_State *L)
 {
@@ -611,6 +669,7 @@ luaopen_net_box(struct lua_State *L)
 		{ "decode_greeting",netbox_decode_greeting },
 		{ "communicate",    netbox_communicate },
 		{ "decode_body",    netbox_decode_body },
+		{ "listen",	    netbox_listen },
 		{ NULL, NULL}
 	};
 	/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index fd6ebf9de..b1d220dde 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,7 @@ local communicate     = internal.communicate
 local encode_auth     = internal.encode_auth
 local encode_select   = internal.encode_select
 local decode_greeting = internal.decode_greeting
+local listen          = internal.listen
 
 local TIMEOUT_INFINITY = 500 * 365 * 86400
 local VSPACE_ID        = 281
@@ -1422,6 +1423,7 @@ local this_module = {
     new = connect, -- Tarantool < 1.7.1 compatibility,
     wrap = wrap,
     establish_connection = establish_connection,
+    listen = listen,
 }
 
 function this_module.timeout(timeout, ...)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 4473acfe3..2f7742138 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -275,6 +275,16 @@ iproto_header_encode(char *out, uint32_t type, uint64_t sync,
 	memcpy(out, &header, sizeof(header));
 }
 
+void
+iproto_header_to_iovec(const struct xrow_header *header, struct iovec *out,
+		       uint32_t bodylen)
+{
+	out->iov_base = region_alloc(&fiber()->gc, IPROTO_HEADER_LEN);
+	iproto_header_encode((char *)out->iov_base, header->type, header->sync,
+			     header->schema_version, bodylen);
+	out->iov_len = IPROTO_HEADER_LEN;
+}
+
 struct PACKED iproto_body_bin {
 	uint8_t m_body;                    /* MP_MAP */
 	uint8_t k_data;                    /* IPROTO_DATA or IPROTO_ERROR */
@@ -852,6 +862,42 @@ xrow_encode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
 	return 0;
 }
 
+int
+xrow_reencode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2)
+{
+	size_t buf_size = XROW_BODY_LEN_MAX + login_len + SCRAMBLE_SIZE;
+	char *buf = (char *) region_alloc(&fiber()->gc, buf_size);
+	if (buf == NULL) {
+		diag_set(OutOfMemory, buf_size, "region_alloc", "buf");
+		return -1;
+	}
+
+	char *d = buf;
+	d = mp_encode_map(d, 2);
+	d = mp_encode_uint(d, IPROTO_USER_NAME);
+	d = mp_encode_str(d, login, login_len);
+	assert(salt_len >= SCRAMBLE_SIZE);
+	assert(msalt_len >= SCRAMBLE_SIZE);
+	(void)salt_len;
+	(void)msalt_len;
+
+	char new_scramble[SCRAMBLE_SIZE];
+	scramble_reencode(new_scramble, scramble, salt, msalt, hash2);
+	d = mp_encode_uint(d, IPROTO_TUPLE);
+	d = mp_encode_array(d, 2);
+	d = mp_encode_str(d, "chap-sha1", strlen("chap-sha1"));
+	d = mp_encode_str(d, new_scramble, SCRAMBLE_SIZE);
+
+	assert(d <= buf + buf_size);
+	packet->body[0].iov_base = buf;
+	packet->body[0].iov_len = (d - buf);
+	packet->bodycnt = 1;
+	packet->type = IPROTO_AUTH;
+	return 0;
+}
+
 void
 xrow_decode_error(struct xrow_header *row)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8d..b70f16038 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -226,6 +226,27 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len,
 		 const char *login, size_t login_len, const char *password,
 		 size_t password_len);
 
+/**
+ * Reencode an AUTH request recieved from a client
+ * to send it to another instance.
+ * @param[out] packet.
+ * @param salt Salt a proxy sent to client.
+ * @param salt_len length of @salt.
+ * @param msalt Salt a proxy recieved from an instance.
+ * @param msalt_len length of @msalt.
+ * @param login User login.
+ * @param login_len Length of @login.
+ * @param scramble Scramble recieved from client.
+ * @param hash2 hash2 of the user, i.e. sha1(sha1(password))
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+xrow_reencode_auth(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2);
+
 /** Reply to IPROTO_VOTE request. */
 struct ballot {
 	/** Set if the instance is running in read-only mode. */
@@ -515,6 +536,18 @@ greeting_decode(const char *greetingbuf, struct greeting *greeting);
 int
 xrow_to_iovec(const struct xrow_header *row, struct iovec *out);
 
+/**
+ * Create an iproto_header based on values from @row and encode it
+ * to specified iovec.
+ *
+ * @param row Row to take values from.
+ * @param[out] out Encoded iproto_header basing on @row values.
+ * @param bodylen Lengh of row body to be encoded in header.
+ */
+void
+iproto_header_to_iovec(const struct xrow_header *row, struct iovec *out,
+		       uint32_t bodylen);
+
 /**
  * Decode ERROR and set it to diagnostics area.
  * @param row Encoded error.
@@ -631,6 +664,17 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len,
 		diag_raise();
 }
 
+/** @copydoc xrow_reencode_auth. */
+static inline void
+xrow_reencode_auth_xc(struct xrow_header *packet, const char *salt, size_t salt_len,
+		   const char *msalt, size_t msalt_len, const char *login,
+		   size_t login_len, const char *scramble, const char *hash2)
+{
+	if (xrow_reencode_auth(packet, salt, salt_len, msalt, msalt_len, login,
+			       login_len, scramble, hash2) != 0)
+		diag_raise();
+}
+
 /** @copydoc xrow_decode_ballot. */
 static inline void
 xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot)
diff --git a/src/box/xrow_io.cc b/src/box/xrow_io.cc
index e9ee6b0c8..48fe6dc11 100644
--- a/src/box/xrow_io.cc
+++ b/src/box/xrow_io.cc
@@ -101,3 +101,20 @@ coio_write_xrow(struct ev_io *coio, const struct xrow_header *row)
 	coio_writev(coio, iov, iovcnt, 0);
 }
 
+void
+coio_write_iproto_response(struct ev_io *coio, const struct xrow_header *row)
+{
+	struct iovec iov[XROW_IOVMAX];
+	int iovcnt = 0;
+	uint32_t bodylen = row->body[0].iov_len;
+	for (int i = 1; i < row->bodycnt; ++i) {
+		bodylen += row->body[i].iov_len;
+	}
+	iproto_header_to_iovec(row, iov + iovcnt++, bodylen);
+	assert(row->bodycnt + 1 <= XROW_IOVMAX);
+	for (int i = 0; i < row->bodycnt; ++i) {
+		iov[i+1].iov_base = row->body[i].iov_base;
+		iov[i+1].iov_len = row->body[i].iov_len;
+	}
+	coio_writev(coio, iov, row->bodycnt + 1, 0);
+}
diff --git a/src/box/xrow_io.h b/src/box/xrow_io.h
index 0eb7a8ace..c199e838e 100644
--- a/src/box/xrow_io.h
+++ b/src/box/xrow_io.h
@@ -48,6 +48,12 @@ coio_read_xrow_timeout_xc(struct ev_io *coio, struct ibuf *in,
 void
 coio_write_xrow(struct ev_io *coio, const struct xrow_header *row);
 
+/**
+ * Just like coio_write_xrow, but also write schema_version from
+ * row in the response header.
+ */
+void
+coio_write_iproto_response(struct ev_io *coio, const struct xrow_header *row);
 
 #if defined(__cplusplus)
 } /* extern "C" */
diff --git a/src/scramble.c b/src/scramble.c
index ca1f98793..6dc43932b 100644
--- a/src/scramble.c
+++ b/src/scramble.c
@@ -67,6 +67,29 @@ scramble_prepare(void *out, const void *salt, const void *password,
 	xor(out, hash1, out, SCRAMBLE_SIZE);
 }
 
+void
+scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
+		  const void *hash2)
+{
+	unsigned char hash1[SCRAMBLE_SIZE];
+	unsigned char sh[SCRAMBLE_SIZE];
+	SHA1_CTX ctx;
+
+	SHA1Init(&ctx);
+	SHA1Update(&ctx, salt, SCRAMBLE_SIZE);
+	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
+	SHA1Final(sh, &ctx);
+
+	xor(hash1, in, sh, SCRAMBLE_SIZE);
+
+	SHA1Init(&ctx);
+	SHA1Update(&ctx, msalt, SCRAMBLE_SIZE);
+	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
+	SHA1Final(out, &ctx);
+
+	xor(out, hash1, out, SCRAMBLE_SIZE);
+}
+
 int
 scramble_check(const void *scramble, const void *salt, const void *hash2)
 {
diff --git a/src/scramble.h b/src/scramble.h
index 7dee31483..2ca975bd7 100644
--- a/src/scramble.h
+++ b/src/scramble.h
@@ -89,6 +89,15 @@ scramble_check(const void *scramble, const void *salt, const void *hash2);
 void
 password_prepare(const char *password, int len, char *out, int out_len);
 
+/**
+ * Given a scrambble recieved from a client, salt sent to client,
+ * salt recieved from another instance and user hash2, recalculate
+ * a scramble to be sent to a remote instance for authentication.
+ */
+void
+scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
+		  const void *hash2);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif
-- 
2.17.1 (Apple Git-112)







More information about the Tarantool-patches mailing list