[tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module.

Konstantin Osipov kostja at tarantool.org
Tue Oct 23 20:26:28 MSK 2018


* Serge Petrenko <sergepetrenko at tarantool.org> [18/10/02 21:07]:

Some comments related to the patch itself, in addition to the
general comments which I sent out earlier.

First of all, the proxy has to be put into a separate module, with
a clear API. To work with iproto, we need to define an iproto API
which the proxy can use to instantiate itself in iproto thread. 

In other words, please try to separate clearly iproto.cc and
proxy.cc.

This will also ensure there is a clear API between proxy and tx,
as today there is between iproto and tx.

Iproto thread is simply a host/container thread for the proxy.
You could even  put the proxy in its own thread and move to
iproto in a separate patch. The code should be written in a way
that it doesn't change from moving it to a different thread.

Second, the proxy does not need to replace box.cfg{listen=}, the
latter can work without changes until it is deprecated and
removed. So there should be no need to change the bootstrap
procedure. 

> +void
> +box_init_iproto()
> +{
> +	if (!iproto_is_configured()) {
> +		/*
> +		 * Since iproto may be initted in a call to netbox.listen()
> +		 * before box is configured, we need to first establish
> +		 * a tx endpoint to write cbus messages.
> +		 */
> +		/* Join the cord interconnect as "tx" endpoint. */
> +		fiber_pool_create(&tx_fiber_pool, "tx",
> +				  IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR,
> +				  FIBER_POOL_IDLE_TIMEOUT);
> +		/* Add an extra endpoint for WAL wake up/rollback messages. */
> +		cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint);
> +		rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX);
> +		rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST);
> +
> +		iproto_init();
> +	}
> +}

We have a terminology mess here. There is "net" thread which is
hosting "iproto" protocol. To run proxy, you need to initialize
the networking subsystem, and yes, since you don't know whether
netbox.listen() was called before box.cfg{} you need to make sure
it is initialized on demand. So you could put a refactoring which
ensures on-demand initialization of the net thread (the thread is
started, joins cbus, all the communication pipes between this
thread and tx are established) into a separate patch.

Please keep in mind that in future we will have a lot of work done
in the network thread - sharding, proxying, etc. So you could
create net.[hc] to separate the net thread logic from iproto/proxy
logic right away.

> -	iproto_init();
> +	box_init_iproto();

Don't understand the reason for this rename. 
>  	wal_thread_start();
>  
> +static bool is_iproto_configured = false;
> +static bool is_proxy_configured = false;

Why not allow multiple proxy instances? In other words, why not
have struct proxy encapsulating entire proxy state, (one listen
uri = one proxy)? 

>  
> +struct proxy_instance {

Missing comment. I could only guess what this data structure is
for. The name is tautological. Why not simply call it
struct proxy?

Turns out it stands for one of the proxy destinations. 

How do you deal with possible configuration loops? Imagine proxy 1
believes proxy 2 is rw, and proxy 2 believes proxy 1 is rw. It
could quickly bounce packets ad nauseam until routing tables at
all nodes are full. Do you have a test for this? 

We could establish a separate connection for the "peer" route and
ensure it never feeds requests directly to the tx thread - but
this could be dangerous when proxy configuration is changed.


> +	bool is_ro;

What is the API to change the local destination status? 

Let's imagine box.ctl.promote() is in place. It should begin by
suspending the routing at all nodes, and end by resuming it again.
The suspension should not affect control/status messages, which
should be passed directly to the destination node.

Did you implement a patch which adds an RO/RW flag to CALL? 

Finally, since we're adding SQL, and switching to top-down merges, 
we need to enable routing for SQL.

I understand these comments are more related to the scope of the
task, rather than the patch itself. You could address most of them
in separate patches.

> +	/** A map of all established connections to the instance. */
> +	struct mh_strnptr_t *fat_connections;

Why are they fat?

> +/**
> + * Search the proxie's instance list to find the one
> + * instance capable of processing the request in msg->header.
> + */
> +static struct proxy_instance *
> +proxy_find_instance(struct iproto_msg *msg)

The lookup should happen only when the client connection is
established, changes its credentials or there is an error/timeout.
Each proxy destination should maintain a separate connection for
each authenticated username, and the look up should simply match
the source connection to the destination one based on the user
name. 

Once the destination connections are "attached" to the source
ones, the messages should use the same connections again and
again.

In future users will want to set a policy for RO forwarding. 
For now, read only requests should always be forwarded to the
local instance. 

> +static void
> +proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg,
> +			struct proxy_connection *pcon)
> +{
> +	struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs));
> +	cs->sync = msg->header.sync;
> +	cs->con = con;
> +	msg->header.sync = ++pcon->sync_ctr;
> +	struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs};
> +	if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) ==
> +	    mh_end(pcon->sync_hash)) {
> +		diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash");
> +		diag_raise();
> +	}
> +	coio_write_xrow(&pcon->connection.io, &msg->header);

Writing rows to the output one by one is too slow.
The proxy client worker should work as follows:
Until there is available input:
 - read "readahead" bytes of input
 - populate output buffers in destination connections
 - flush output buffers in the destination connections
   in a non-blocking manner
 - check until there is more input and yield until
   either input or output is ready.

I doubt it is easy to do with fibers. I'd use callbacks, in the
same way as iproto does. In fact perhaps iproto code could be
re-factored to implement such I/O.

> +static int
> +proxy_forwarder_f(va_list ap)
> +	pcon->replier = fiber_new(name, proxy_replier_f);
> +	fiber_set_joinable(pcon->replier, true);
> +	fiber_start(pcon->replier, pcon);
> +
> +	while (!fiber_is_cancelled()) {
> +		struct rlist *head;
> +		while ((head = rlist_first(&pcon->request_queue)) ==
> +		       &pcon->request_queue) {
> +			fiber_cond_wait(&pcon->forwarder_cond);
> +		}
> +
> +		struct pqueue_entry *ent = rlist_shift_entry(&pcon->request_queue,
> +							     struct pqueue_entry,
> +							     link);
> +		struct iproto_connection *con = ent->con;
> +		struct iproto_msg * msg = ent->msg;
> +		free(ent);
> +
> +		proxy_forward_request(con, msg, pcon);
> +
> +		fiber_gc();
> +	}
> +	return 0;
> +}
> +	while (!rlist_empty(&proxy_instances)) {
> +		struct proxy_instance *instance =
> +		    rlist_shift_entry(&proxy_instances, struct proxy_instance, link);
> +		proxy_instance_delete(instance);
> +	}
> +}
> +static int
> +proxy_configure_f(struct cbus_call_msg *m)
> +{
> +	struct proxy_cfg_msg *msg = (struct proxy_cfg_msg *)m;
> +	/*
> +	 * If it is the second call to proxy_configure,
> +	 * first remove all the old instances and close
> +	 * connections, and do not init coio again.
> +	 */
> +	if (is_proxy_configured) {
> +		proxy_instance_list_purge();

Looks like you're dropping packets. This is not OK.

> +proxy_replier_f(va_list ap)
> +{
> +	struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *);
> +
> +	struct ev_io io;
> +	coio_create(&io, pcon->connection.io.fd);
> +
> +	while (!fiber_is_cancelled()) {
> +		struct xrow_header row;
> +		coio_read_xrow(&io, &pcon->connection.ibuf, &row);
> +		uint64_t key = row.sync;
> +		mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL);
> +		if (i == mh_end(pcon->sync_hash)) {
> +			/*
> +			 * Some error. We recieved a reply with sync
> +			 * not corresponding to any connection
> +			 */
> +			say_warn("sync recieved from remote instance is not in sync_hash");
> +			continue;
> +		}
> +		struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i);
> +		if (row.type != IPROTO_CHUNK) {
> +			mh_i64ptr_remove(pcon->sync_hash, node, NULL);
> +		}
> +		struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val;
> +		struct iproto_connection *con = cs->con;
> +		uint64_t sync = cs->sync;
> +		row.sync = sync;
> +		free(cs);
> +		coio_write_iproto_response(&con->output, &row);
> +
> +		fiber_gc();

I think this should also be event driven. Please think about the
underlying system calls and try to minimize the amount of these.
> +	}
> +	return 0;
> +}
> --- a/src/scramble.c
> +++ b/src/scramble.c
> @@ -67,6 +67,29 @@ scramble_prepare(void *out, const void *salt, const void *password,
>  	xor(out, hash1, out, SCRAMBLE_SIZE);
>  }
>  
> +void
> +scramble_reencode(void *out, const void *in, const void *salt, const void *msalt,
> +		  const void *hash2)
> +{

Could easily go in a separate patch.

> +	unsigned char hash1[SCRAMBLE_SIZE];
> +	unsigned char sh[SCRAMBLE_SIZE];
> +	SHA1_CTX ctx;
> +
> +	SHA1Init(&ctx);
> +	SHA1Update(&ctx, salt, SCRAMBLE_SIZE);
> +	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
> +	SHA1Final(sh, &ctx);
> +
> +	xor(hash1, in, sh, SCRAMBLE_SIZE);
> +
> +	SHA1Init(&ctx);
> +	SHA1Update(&ctx, msalt, SCRAMBLE_SIZE);
> +	SHA1Update(&ctx, hash2, SCRAMBLE_SIZE);
> +	SHA1Final(out, &ctx);
> +
> +	xor(out, hash1, out, SCRAMBLE_SIZE);
> +}
> +
>  int
>  scramble_check(const void *scramble, const void *salt, const void *hash2)
>  {
> diff --git a/src/scramble.h b/src/scramble.h
> index 7dee31483..2ca975bd7 100644

-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov




More information about the Tarantool-patches mailing list