From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from localhost (localhost [127.0.0.1]) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTP id 0F2512CD60 for ; Tue, 23 Oct 2018 13:26:39 -0400 (EDT) Received: from turing.freelists.org ([127.0.0.1]) by localhost (turing.freelists.org [127.0.0.1]) (amavisd-new, port 10024) with ESMTP id AQpYFWd8a77q for ; Tue, 23 Oct 2018 13:26:38 -0400 (EDT) Received: from smtp40.i.mail.ru (smtp40.i.mail.ru [94.100.177.100]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by turing.freelists.org (Avenir Technologies Mail Multiplex) with ESMTPS id 20A852CD15 for ; Tue, 23 Oct 2018 13:26:38 -0400 (EDT) Date: Tue, 23 Oct 2018 20:26:28 +0300 From: Konstantin Osipov Subject: [tarantool-patches] Re: [PATCH v2] iproto: introduce a proxy module. Message-ID: <20181023172628.GA10044@chai> References: <20181002180554.1142-1-sergepetrenko@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20181002180554.1142-1-sergepetrenko@tarantool.org> Sender: tarantool-patches-bounce@freelists.org Errors-to: tarantool-patches-bounce@freelists.org Reply-To: tarantool-patches@freelists.org List-help: List-unsubscribe: List-software: Ecartis version 1.0.0 List-Id: tarantool-patches List-subscribe: List-owner: List-post: List-archive: To: tarantool-patches@freelists.org Cc: Serge Petrenko * Serge Petrenko [18/10/02 21:07]: Some comments related to the patch itself, in addition to the general comments which I sent out earlier. First of all, the proxy has to be put into a separate module, with a clear API. To work with iproto, we need to define an iproto API which the proxy can use to instantiate itself in iproto thread. In other words, please try to separate clearly iproto.cc and proxy.cc. This will also ensure there is a clear API between proxy and tx, as today there is between iproto and tx. Iproto thread is simply a host/container thread for the proxy. You could even put the proxy in its own thread and move to iproto in a separate patch. The code should be written in a way that it doesn't change from moving it to a different thread. Second, the proxy does not need to replace box.cfg{listen=}, the latter can work without changes until it is deprecated and removed. So there should be no need to change the bootstrap procedure. > +void > +box_init_iproto() > +{ > + if (!iproto_is_configured()) { > + /* > + * Since iproto may be initted in a call to netbox.listen() > + * before box is configured, we need to first establish > + * a tx endpoint to write cbus messages. > + */ > + /* Join the cord interconnect as "tx" endpoint. */ > + fiber_pool_create(&tx_fiber_pool, "tx", > + IPROTO_MSG_MAX_MIN * IPROTO_FIBER_POOL_SIZE_FACTOR, > + FIBER_POOL_IDLE_TIMEOUT); > + /* Add an extra endpoint for WAL wake up/rollback messages. */ > + cbus_endpoint_create(&tx_prio_endpoint, "tx_prio", tx_prio_cb, &tx_prio_endpoint); > + rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX); > + rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST); > + > + iproto_init(); > + } > +} We have a terminology mess here. There is "net" thread which is hosting "iproto" protocol. To run proxy, you need to initialize the networking subsystem, and yes, since you don't know whether netbox.listen() was called before box.cfg{} you need to make sure it is initialized on demand. So you could put a refactoring which ensures on-demand initialization of the net thread (the thread is started, joins cbus, all the communication pipes between this thread and tx are established) into a separate patch. Please keep in mind that in future we will have a lot of work done in the network thread - sharding, proxying, etc. So you could create net.[hc] to separate the net thread logic from iproto/proxy logic right away. > - iproto_init(); > + box_init_iproto(); Don't understand the reason for this rename. > wal_thread_start(); > > +static bool is_iproto_configured = false; > +static bool is_proxy_configured = false; Why not allow multiple proxy instances? In other words, why not have struct proxy encapsulating entire proxy state, (one listen uri = one proxy)? > > +struct proxy_instance { Missing comment. I could only guess what this data structure is for. The name is tautological. Why not simply call it struct proxy? Turns out it stands for one of the proxy destinations. How do you deal with possible configuration loops? Imagine proxy 1 believes proxy 2 is rw, and proxy 2 believes proxy 1 is rw. It could quickly bounce packets ad nauseam until routing tables at all nodes are full. Do you have a test for this? We could establish a separate connection for the "peer" route and ensure it never feeds requests directly to the tx thread - but this could be dangerous when proxy configuration is changed. > + bool is_ro; What is the API to change the local destination status? Let's imagine box.ctl.promote() is in place. It should begin by suspending the routing at all nodes, and end by resuming it again. The suspension should not affect control/status messages, which should be passed directly to the destination node. Did you implement a patch which adds an RO/RW flag to CALL? Finally, since we're adding SQL, and switching to top-down merges, we need to enable routing for SQL. I understand these comments are more related to the scope of the task, rather than the patch itself. You could address most of them in separate patches. > + /** A map of all established connections to the instance. */ > + struct mh_strnptr_t *fat_connections; Why are they fat? > +/** > + * Search the proxie's instance list to find the one > + * instance capable of processing the request in msg->header. > + */ > +static struct proxy_instance * > +proxy_find_instance(struct iproto_msg *msg) The lookup should happen only when the client connection is established, changes its credentials or there is an error/timeout. Each proxy destination should maintain a separate connection for each authenticated username, and the look up should simply match the source connection to the destination one based on the user name. Once the destination connections are "attached" to the source ones, the messages should use the same connections again and again. In future users will want to set a policy for RO forwarding. For now, read only requests should always be forwarded to the local instance. > +static void > +proxy_forward_request(struct iproto_connection *con, struct iproto_msg *msg, > + struct proxy_connection *pcon) > +{ > + struct sync_hash_entry *cs = (struct sync_hash_entry *)malloc(sizeof(*cs)); > + cs->sync = msg->header.sync; > + cs->con = con; > + msg->header.sync = ++pcon->sync_ctr; > + struct mh_i64ptr_node_t node = {pcon->sync_ctr, cs}; > + if (mh_i64ptr_put(pcon->sync_hash, &node, NULL, NULL) == > + mh_end(pcon->sync_hash)) { > + diag_set(OutOfMemory, sizeof(node), "malloc", "sync_hash"); > + diag_raise(); > + } > + coio_write_xrow(&pcon->connection.io, &msg->header); Writing rows to the output one by one is too slow. The proxy client worker should work as follows: Until there is available input: - read "readahead" bytes of input - populate output buffers in destination connections - flush output buffers in the destination connections in a non-blocking manner - check until there is more input and yield until either input or output is ready. I doubt it is easy to do with fibers. I'd use callbacks, in the same way as iproto does. In fact perhaps iproto code could be re-factored to implement such I/O. > +static int > +proxy_forwarder_f(va_list ap) > + pcon->replier = fiber_new(name, proxy_replier_f); > + fiber_set_joinable(pcon->replier, true); > + fiber_start(pcon->replier, pcon); > + > + while (!fiber_is_cancelled()) { > + struct rlist *head; > + while ((head = rlist_first(&pcon->request_queue)) == > + &pcon->request_queue) { > + fiber_cond_wait(&pcon->forwarder_cond); > + } > + > + struct pqueue_entry *ent = rlist_shift_entry(&pcon->request_queue, > + struct pqueue_entry, > + link); > + struct iproto_connection *con = ent->con; > + struct iproto_msg * msg = ent->msg; > + free(ent); > + > + proxy_forward_request(con, msg, pcon); > + > + fiber_gc(); > + } > + return 0; > +} > + while (!rlist_empty(&proxy_instances)) { > + struct proxy_instance *instance = > + rlist_shift_entry(&proxy_instances, struct proxy_instance, link); > + proxy_instance_delete(instance); > + } > +} > +static int > +proxy_configure_f(struct cbus_call_msg *m) > +{ > + struct proxy_cfg_msg *msg = (struct proxy_cfg_msg *)m; > + /* > + * If it is the second call to proxy_configure, > + * first remove all the old instances and close > + * connections, and do not init coio again. > + */ > + if (is_proxy_configured) { > + proxy_instance_list_purge(); Looks like you're dropping packets. This is not OK. > +proxy_replier_f(va_list ap) > +{ > + struct proxy_connection *pcon = va_arg(ap, struct proxy_connection *); > + > + struct ev_io io; > + coio_create(&io, pcon->connection.io.fd); > + > + while (!fiber_is_cancelled()) { > + struct xrow_header row; > + coio_read_xrow(&io, &pcon->connection.ibuf, &row); > + uint64_t key = row.sync; > + mh_int_t i = mh_i64ptr_find(pcon->sync_hash, key, NULL); > + if (i == mh_end(pcon->sync_hash)) { > + /* > + * Some error. We recieved a reply with sync > + * not corresponding to any connection > + */ > + say_warn("sync recieved from remote instance is not in sync_hash"); > + continue; > + } > + struct mh_i64ptr_node_t *node = mh_i64ptr_node(pcon->sync_hash, i); > + if (row.type != IPROTO_CHUNK) { > + mh_i64ptr_remove(pcon->sync_hash, node, NULL); > + } > + struct sync_hash_entry * cs = (struct sync_hash_entry *)node->val; > + struct iproto_connection *con = cs->con; > + uint64_t sync = cs->sync; > + row.sync = sync; > + free(cs); > + coio_write_iproto_response(&con->output, &row); > + > + fiber_gc(); I think this should also be event driven. Please think about the underlying system calls and try to minimize the amount of these. > + } > + return 0; > +} > --- a/src/scramble.c > +++ b/src/scramble.c > @@ -67,6 +67,29 @@ scramble_prepare(void *out, const void *salt, const void *password, > xor(out, hash1, out, SCRAMBLE_SIZE); > } > > +void > +scramble_reencode(void *out, const void *in, const void *salt, const void *msalt, > + const void *hash2) > +{ Could easily go in a separate patch. > + unsigned char hash1[SCRAMBLE_SIZE]; > + unsigned char sh[SCRAMBLE_SIZE]; > + SHA1_CTX ctx; > + > + SHA1Init(&ctx); > + SHA1Update(&ctx, salt, SCRAMBLE_SIZE); > + SHA1Update(&ctx, hash2, SCRAMBLE_SIZE); > + SHA1Final(sh, &ctx); > + > + xor(hash1, in, sh, SCRAMBLE_SIZE); > + > + SHA1Init(&ctx); > + SHA1Update(&ctx, msalt, SCRAMBLE_SIZE); > + SHA1Update(&ctx, hash2, SCRAMBLE_SIZE); > + SHA1Final(out, &ctx); > + > + xor(out, hash1, out, SCRAMBLE_SIZE); > +} > + > int > scramble_check(const void *scramble, const void *salt, const void *hash2) > { > diff --git a/src/scramble.h b/src/scramble.h > index 7dee31483..2ca975bd7 100644 -- Konstantin Osipov, Moscow, Russia, +7 903 626 22 32 http://tarantool.io - www.twitter.com/kostja_osipov