[tarantool-patches] [PATCH] iproto: init coio watcher before join/subscribe
Alexander Turenko
alexander.turenko at tarantool.org
Thu May 16 14:46:53 MSK 2019
On Thu, May 16, 2019 at 11:38:17AM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko at tarantool.org> [19/05/16 09:53]:
> > box_process_join() and box_process_subscribe() use coio_write_xrow(),
> > which calls coio_writev_timeout() under hood. If a socket will block at
> > write() the function calls ev_io_start() to wake the fiber up when the
> > socket will be ready to write. This code assumes that the watcher
> > (struct ev_io) is initialized as coio watcher, i.e. coio_create() has
> > been called.
>
> the same comment is missing from the source code.
Looks like it is the contract of box_process_* functions, so I added comments
to them. After that the change in iproto.cc is obvious and I think don't
requires a comment.
See the full patch below.
diff --git a/src/box/box.h b/src/box/box.h
index 53d88ab71..ddcfbe2e5 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -172,9 +172,25 @@ box_reset_stat(void);
void
box_process_auth(struct auth_request *request, const char *salt);
+/**
+ * Join a replica.
+ *
+ * Register a replica and feed it with data.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param JOIN packet header
+ */
void
box_process_join(struct ev_io *io, struct xrow_header *header);
+/**
+ * Subscribe a replica.
+ *
+ * Perform necessary checks and start a relay thread.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param SUBSCRIBE packet header
+ */
void
box_process_subscribe(struct ev_io *io, struct xrow_header *header);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 02b558ede..8f899fed8 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1693,6 +1693,8 @@ tx_process_join_subscribe(struct cmsg *m)
{
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
+ struct ev_io io;
+ coio_create(&io, con->input.fd);
try {
switch (msg->header.type) {
case IPROTO_JOIN:
@@ -1701,7 +1703,7 @@ tx_process_join_subscribe(struct cmsg *m)
* the lambda in the beginning of the block
* will re-activate the watchers for us.
*/
- box_process_join(&con->input, &msg->header);
+ box_process_join(&io, &msg->header);
break;
case IPROTO_SUBSCRIBE:
/*
@@ -1710,7 +1712,7 @@ tx_process_join_subscribe(struct cmsg *m)
* the write watcher will be re-activated
* the same way as for JOIN.
*/
- box_process_subscribe(&con->input, &msg->header);
+ box_process_subscribe(&io, &msg->header);
break;
default:
unreachable();
More information about the Tarantool-patches
mailing list