[tarantool-patches] [PATCH] iproto: init coio watcher before join/subscribe

Alexander Turenko alexander.turenko at tarantool.org
Thu May 16 14:46:53 MSK 2019


On Thu, May 16, 2019 at 11:38:17AM +0300, Konstantin Osipov wrote:
> * Alexander Turenko <alexander.turenko at tarantool.org> [19/05/16 09:53]:
> > box_process_join() and box_process_subscribe() use coio_write_xrow(),
> > which calls coio_writev_timeout() under hood. If a socket will block at
> > write() the function calls ev_io_start() to wake the fiber up when the
> > socket will be ready to write. This code assumes that the watcher
> > (struct ev_io) is initialized as coio watcher, i.e. coio_create() has
> > been called.
> 
> the same comment is missing from the source code.

Looks like it is the contract of box_process_* functions, so I added comments
to them. After that the change in iproto.cc is obvious and I think don't
requires a comment.

See the full patch below.

diff --git a/src/box/box.h b/src/box/box.h
index 53d88ab71..ddcfbe2e5 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -172,9 +172,25 @@ box_reset_stat(void);
 void
 box_process_auth(struct auth_request *request, const char *salt);
 
+/**
+ * Join a replica.
+ *
+ * Register a replica and feed it with data.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param JOIN packet header
+ */
 void
 box_process_join(struct ev_io *io, struct xrow_header *header);
 
+/**
+ * Subscribe a replica.
+ *
+ * Perform necessary checks and start a relay thread.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param SUBSCRIBE packet header
+ */
 void
 box_process_subscribe(struct ev_io *io, struct xrow_header *header);
 
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 02b558ede..8f899fed8 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1693,6 +1693,8 @@ tx_process_join_subscribe(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
+	struct ev_io io;
+	coio_create(&io, con->input.fd);
 	try {
 		switch (msg->header.type) {
 		case IPROTO_JOIN:
@@ -1701,7 +1703,7 @@ tx_process_join_subscribe(struct cmsg *m)
 			 * the lambda in the beginning of the block
 			 * will re-activate the watchers for us.
 			 */
-			box_process_join(&con->input, &msg->header);
+			box_process_join(&io, &msg->header);
 			break;
 		case IPROTO_SUBSCRIBE:
 			/*
@@ -1710,7 +1712,7 @@ tx_process_join_subscribe(struct cmsg *m)
 			 * the write watcher will be re-activated
 			 * the same way as for JOIN.
 			 */
-			box_process_subscribe(&con->input, &msg->header);
+			box_process_subscribe(&io, &msg->header);
 			break;
 		default:
 			unreachable();



More information about the Tarantool-patches mailing list