From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Thu, 16 May 2019 14:46:53 +0300 From: Alexander Turenko Subject: Re: [tarantool-patches] [PATCH] iproto: init coio watcher before join/subscribe Message-ID: <20190516114652.t3vn3tke37cm6wjr@tkn_work_nb> References: <26ee4f4b5fcc01bba7fb5b33fd97983fef1ac6f0.1557952229.git.alexander.turenko@tarantool.org> <20190516083817.GN26670@atlas> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Disposition: inline In-Reply-To: <20190516083817.GN26670@atlas> To: Konstantin Osipov Cc: tarantool-patches@freelists.org, Vladimir Davydov List-ID: On Thu, May 16, 2019 at 11:38:17AM +0300, Konstantin Osipov wrote: > * Alexander Turenko [19/05/16 09:53]: > > box_process_join() and box_process_subscribe() use coio_write_xrow(), > > which calls coio_writev_timeout() under hood. If a socket will block at > > write() the function calls ev_io_start() to wake the fiber up when the > > socket will be ready to write. This code assumes that the watcher > > (struct ev_io) is initialized as coio watcher, i.e. coio_create() has > > been called. > > the same comment is missing from the source code. Looks like it is the contract of box_process_* functions, so I added comments to them. After that the change in iproto.cc is obvious and I think don't requires a comment. See the full patch below. diff --git a/src/box/box.h b/src/box/box.h index 53d88ab71..ddcfbe2e5 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -172,9 +172,25 @@ box_reset_stat(void); void box_process_auth(struct auth_request *request, const char *salt); +/** + * Join a replica. + * + * Register a replica and feed it with data. + * + * \param io coio watcher (initialized with coio_create()) + * \param JOIN packet header + */ void box_process_join(struct ev_io *io, struct xrow_header *header); +/** + * Subscribe a replica. + * + * Perform necessary checks and start a relay thread. + * + * \param io coio watcher (initialized with coio_create()) + * \param SUBSCRIBE packet header + */ void box_process_subscribe(struct ev_io *io, struct xrow_header *header); diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 02b558ede..8f899fed8 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1693,6 +1693,8 @@ tx_process_join_subscribe(struct cmsg *m) { struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; + struct ev_io io; + coio_create(&io, con->input.fd); try { switch (msg->header.type) { case IPROTO_JOIN: @@ -1701,7 +1703,7 @@ tx_process_join_subscribe(struct cmsg *m) * the lambda in the beginning of the block * will re-activate the watchers for us. */ - box_process_join(&con->input, &msg->header); + box_process_join(&io, &msg->header); break; case IPROTO_SUBSCRIBE: /* @@ -1710,7 +1712,7 @@ tx_process_join_subscribe(struct cmsg *m) * the write watcher will be re-activated * the same way as for JOIN. */ - box_process_subscribe(&con->input, &msg->header); + box_process_subscribe(&io, &msg->header); break; default: unreachable();