From: Leonid Vasiliev <lvasiliev@tarantool.org> To: Ilya Kosarev <i.kosarev@tarantool.org>, alyapunov@tarantool.org Cc: tarantool-patches@dev.tarantool.org Subject: Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx Date: Mon, 21 Sep 2020 18:51:29 +0300 [thread overview] Message-ID: <280d8634-36ce-611c-faab-c8e887c5f74c@tarantool.org> (raw) In-Reply-To: <20200918130942.16546-1-i.kosarev@tarantool.org> Hi! Thank you for the patch. LGTM. All comments below are at your discretion: On 9/18/20 4:09 PM, Ilya Kosarev wrote: > On connection, an evio service callback is invoked to accept it. The > next step after acception was to process connection to tx thread > through cbus. This meant that any connection interaction involves > tx thread even before we get to decode what does the client want > from us. Consequently, a number of problems appears. The main one > is that we might get descriptor leak in case of unresponsive tx thread > (for example, when building secondary index). There are some other > cases where we might not want to spend precious tx time to process > the connection in case iproto can do it all alone. > This patch allows iproto to accept connection and send greeting by > itself. The connection is initialized in tx thread when the real > request comes through iproto_msg_decode(). In case request type was not > recognized we can also send reply with an error without using tx. It is > planned to add more iproto logic to prevent extra interaction with > tx thread. This patch already to some extent solves descriptors leakage > problem as far as connection establishes and stays in fetch_schema > state while tx thread is unresponsive. > The other user visible change is that on_connect triggers won't run on > connections that don't provide any input, as reflected in > bad_trigger.test.py. > > Part of #3776 > > @TarantoolBot document > Title: iproto: on_connect triggers execution > Update the documentation for on_connect triggers to reflect that they > are now being executed only with the first request. Though the triggers > are still the first thing to be executed on a new connection. While it > is quite a synthetic case to establish a connection without making > any requests it is technically possible and now your triggers won't be > executed in this case. Some request is required to start their > execution. > --- > Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto > Issue: https://github.com/tarantool/tarantool/issues/3776 > > @ChangeLog: > * Make iproto thread more independent from tx (gh-3776). - Changelog about changes visible to the user. I would suggest writing about on_connect triggers here. on_disconnect triggers will also not work if there was not one message has been sent, as I understand it. > > Changes in v2: > - docbot request provided > - ChangeLog provided > - net_send_msg() used instead of net_send_error() where needed > - error cases made clear in iproto_msg_decode() > - replaced net_check_connection() with iproto_connection_fail() predicate > - improved tx session initialization fail processing to avoid extra tries > - fixed obuf_dup() fail processing in iproto_process_connect() > - added some comments > - some comments style fixed > > src/box/iproto.cc | 304 +++++++++++++++++++++----------- > test/box-py/bad_trigger.result | 1 + > test/box-py/bad_trigger.test.py | 22 ++- > 3 files changed, 223 insertions(+), 104 deletions(-) > > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index b8f65e5eca..67bbd357a3 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con); > static void > iproto_resume(void); > > -static void > +static int > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input); > > @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg) > iproto_resume(); > } > > +static inline void > +iproto_connection_delete(struct iproto_connection *con); > + > /** > * A single global queue for all requests in all connections. All > * requests from all connections are processed concurrently. > @@ -280,6 +283,11 @@ static struct cord net_cord; > * in the tx thread. > */ > static struct slab_cache net_slabc; > +/** > + * Slab cache used for allocating memory for output network buffers > + * in the iproto thread. > + */ > +static struct slab_cache iproto_slabc; > > struct rmean *rmean_net; > > @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = { > "REQUESTS", > }; > > +static int > +tx_init_connect(struct iproto_msg *msg); > + > static void > tx_process_destroy(struct cmsg *m); > > @@ -463,6 +474,7 @@ struct iproto_connection > struct ev_io output; > /** Logical session. */ > struct session *session; > + bool init_failed; > ev_loop *loop; > /** > * Pre-allocated disconnect msg. Is sent right after > @@ -650,7 +662,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con) > * other parts of the connection. > */ > con->state = IPROTO_CONNECTION_DESTROYED; > - cpipe_push(&tx_pipe, &con->destroy_msg); > + if (con->session != NULL) > + cpipe_push(&tx_pipe, &con->destroy_msg); > + else { > + /* > + * In case session was not created we can safely destroy > + * not involving tx thread. Thus we also need to destroy > + * obuf, which still belongs to iproto thread. > + */ > + obuf_destroy(&con->obuf[0]); > + obuf_destroy(&con->obuf[1]); > + iproto_connection_delete(con); > + } > } > > /** > @@ -677,9 +700,18 @@ iproto_connection_close(struct iproto_connection *con) > * is done only once. > */ > con->p_ibuf->wpos -= con->parse_size; > - cpipe_push(&tx_pipe, &con->disconnect_msg); > assert(con->state == IPROTO_CONNECTION_ALIVE); > con->state = IPROTO_CONNECTION_CLOSED; > + rlist_del(&con->in_stop_list); > + if (con->session != NULL) > + cpipe_push(&tx_pipe, &con->disconnect_msg); > + else > + /* > + * In case session was not created we can safely > + * try to start destroy not involving tx thread. > + */ > + iproto_connection_try_to_start_destroy(con); > + return; > } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) { > iproto_connection_try_to_start_destroy(con); > } else { > @@ -809,6 +841,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) > assert(rlist_empty(&con->in_stop_list)); > int n_requests = 0; > bool stop_input = false; > + bool obuf_in_iproto = (con->session == NULL); > const char *errmsg; > while (con->parse_size != 0 && !stop_input) { > if (iproto_check_msg_max()) { > @@ -853,12 +886,20 @@ err_msgpack: > > msg->len = reqend - reqstart; /* total request length */ > > - iproto_msg_decode(msg, &pos, reqend, &stop_input); > - /* > - * This can't throw, but should not be > - * done in case of exception. > - */ > - cpipe_push_input(&tx_pipe, &msg->base); > + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) != -1) { > + if (obuf_in_iproto) { > + /* > + * If session was not created yet and obuf is > + * still in iproto we need to destroy it. New > + * one will be created in tx thread if needed. > + */ > + obuf_destroy(&con->obuf[0]); > + obuf_destroy(&con->obuf[1]); > + obuf_in_iproto = false; > + } > + cpipe_push_input(&tx_pipe, &msg->base); > + } > + > n_requests++; > /* Request is parsed */ > assert(reqend > reqstart); > @@ -1105,8 +1146,8 @@ iproto_connection_new(int fd) > ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE); > ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead); > ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead); > - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); > - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); > + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead); > + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead); > con->p_ibuf = &con->ibuf[0]; > con->tx.p_obuf = &con->obuf[0]; > iproto_wpos_create(&con->wpos, con->tx.p_obuf); > @@ -1114,6 +1155,7 @@ iproto_connection_new(int fd) > con->parse_size = 0; > con->long_poll_count = 0; > con->session = NULL; > + con->init_failed = false; > rlist_create(&con->in_stop_list); > /* It may be very awkward to allocate at close. */ > cmsg_init(&con->destroy_msg, destroy_route); > @@ -1134,10 +1176,6 @@ iproto_connection_delete(struct iproto_connection *con) > assert(!evio_has_fd(&con->input)); > assert(con->session == NULL); > assert(con->state == IPROTO_CONNECTION_DESTROYED); > - /* > - * The output buffers must have been deleted > - * in tx thread. > - */ > ibuf_destroy(&con->ibuf[0]); > ibuf_destroy(&con->ibuf[1]); > assert(con->obuf[0].pos == 0 && > @@ -1172,6 +1210,9 @@ tx_reply_error(struct iproto_msg *msg); > static void > tx_reply_iproto_error(struct cmsg *m); > > +static void > +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos); > + > static void > net_send_msg(struct cmsg *msg); > > @@ -1244,7 +1285,7 @@ static const struct cmsg_hop error_route[] = { > { net_send_error, NULL }, > }; > > -static void > +static int > iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > bool *stop_input) > { > @@ -1314,13 +1355,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > (uint32_t) type); - You have three possible return values. Maybe add a function description? > goto error; > } > - return; > + return 0; > error: > /** Log and send the error. */ > diag_log(); > diag_create(&msg->diag); > diag_move(&fiber()->diag, &msg->diag); > - cmsg_init(&msg->base, error_route); > + if (msg->connection->session != NULL) { > + cmsg_init(&msg->base, error_route); > + return 1; > + } > + /* > + * In case session was not created we can process error path > + * without tx thread. > + */ > + tx_accept_wpos(msg->connection, &msg->wpos); > + struct obuf *out = msg->connection->tx.p_obuf; > + iproto_reply_error(out, diag_last_error(&msg->diag), > + msg->header.sync, ::schema_version); > + iproto_wpos_create(&msg->wpos, out); > + net_send_msg(&(msg->base)); > + return -1; > } - I meant something like: /** Log and send the error. */ diag_log(); if (msg->connection->session != NULL) { diag_create(&msg->diag); diag_move(&fiber()->diag, &msg->diag); cmsg_init(&msg->base, error_route); return 1; } /* * In case session was not created we can process error path * without tx thread. */ tx_accept_wpos(msg->connection, &msg->wpos); struct obuf *out = msg->connection->tx.p_obuf; iproto_reply_error(out, diag_last_error(&fiber()->diag), msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out); net_send_msg(&(msg->base)); return -1; > > static void > @@ -1382,10 +1437,6 @@ tx_process_destroy(struct cmsg *m) > session_destroy(con->session); > con->session = NULL; /* safety */ > } > - /* > - * Got to be done in iproto thread since > - * that's where the memory is allocated. > - */ > obuf_destroy(&con->obuf[0]); > obuf_destroy(&con->obuf[1]); > } > @@ -1478,13 +1529,27 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) > } > } > > -static inline struct iproto_msg * > -tx_accept_msg(struct cmsg *m) > +static inline int > +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg) > { > - struct iproto_msg *msg = (struct iproto_msg *) m; > - tx_accept_wpos(msg->connection, &msg->wpos); > - tx_fiber_init(msg->connection->session, msg->header.sync); > - return msg; > + *msg = (struct iproto_msg *) m; > + /* > + * In case connection init failed we don't need to try anymore. > + */ > + if ((*msg)->connection->init_failed) > + return -1; > + /* > + * In case session was not created we need to init connection in tx and > + * create it here. > + */ > + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) { > + (*msg)->connection->init_failed = true; > + (*msg)->close_connection = true; > + return -1; > + } > + tx_accept_wpos((*msg)->connection, &(*msg)->wpos); > + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync); > + return 0; > } > > /** > @@ -1507,7 +1572,14 @@ tx_reply_error(struct iproto_msg *msg) > static void > tx_reply_iproto_error(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + /* > + * We don't need to check tx_accept_msg() return value here > + * as far as if we might only process iproto error in tx > + * in case connection session is already created and > + * thus tx_accept_msg() can't fail. > + */ > + tx_accept_msg(m, &msg); > struct obuf *out = msg->connection->tx.p_obuf; > iproto_reply_error(out, diag_last_error(&msg->diag), > msg->header.sync, ::schema_version); > @@ -1527,7 +1599,9 @@ tx_inject_delay(void) > static void > tx_process1(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > @@ -1553,17 +1627,20 @@ error: > static void > tx_process_select(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + struct request *req; > struct obuf *out; > struct obuf_svp svp; > struct port port; > int count; > int rc; > - struct request *req = &msg->dml; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > tx_inject_delay(); > + req = &msg->dml; > rc = box_select(req->space_id, req->index_id, > req->iterator, req->offset, req->limit, > req->key, req->key_end, &port); > @@ -1607,7 +1684,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event) > static void > tx_process_call(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > @@ -1686,13 +1765,15 @@ error: > static void > tx_process_misc(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > - struct iproto_connection *con = msg->connection; > - struct obuf *out = con->tx.p_obuf; > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > > try { > + struct iproto_connection *con = msg->connection; > + struct obuf *out = con->tx.p_obuf; > struct ballot ballot; > switch (msg->header.type) { > case IPROTO_AUTH: > @@ -1729,7 +1810,7 @@ error: > static void > tx_process_sql(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > struct obuf *out; > struct port port; > struct sql_bind *bind = NULL; > @@ -1738,6 +1819,8 @@ tx_process_sql(struct cmsg *m) > uint32_t len; > bool is_unprepare = false; > > + if (tx_accept_msg(m, &msg) != 0) > + goto error; > if (tx_check_schema(msg->header.schema_version)) > goto error; > assert(msg->header.type == IPROTO_EXECUTE || > @@ -1825,7 +1908,11 @@ error: > static void > tx_process_replication(struct cmsg *m) > { > - struct iproto_msg *msg = tx_accept_msg(m); > + struct iproto_msg *msg; > + if (tx_accept_msg(m, &msg) != 0) { > + tx_reply_error(msg); > + return; > + } > struct iproto_connection *con = msg->connection; > struct ev_io io; > coio_create(&io, con->input.fd); > @@ -1865,6 +1952,29 @@ tx_process_replication(struct cmsg *m) > } > } > > +/** > + * Check connection health and try to send an error to the client > + * in case of internal connection init or on_connect trigger failure. > + */ > +static bool > +iproto_connection_fail(struct iproto_msg *msg) > +{ > + if (!msg->close_connection) > + return false; > + struct iproto_connection *con = msg->connection; > + int64_t nwr = sio_writev(con->output.fd, msg->wpos.obuf->iov, > + obuf_iovcnt(msg->wpos.obuf)); > + if (nwr > 0) { > + /* Count statistics. */ > + rmean_collect(rmean_net, IPROTO_SENT, nwr); > + } else if (nwr < 0 && ! sio_wouldblock(errno)) { > + diag_log(); > + } > + iproto_connection_close(con); > + iproto_msg_delete(msg); > + return true; > +} > + > static void > net_send_msg(struct cmsg *m) > { > @@ -1881,6 +1991,9 @@ net_send_msg(struct cmsg *m) > } > con->wend = msg->wpos; > > + if (iproto_connection_fail(msg)) > + return; > + > if (evio_has_fd(&con->output)) { > if (! ev_is_active(&con->output)) > ev_feed_event(con->loop, &con->output, EV_WRITE); > @@ -1910,6 +2023,10 @@ net_end_join(struct cmsg *m) > struct iproto_connection *con = msg->connection; > > msg->p_ibuf->rpos += msg->len; > + > + if (iproto_connection_fail(msg)) > + return; > + > iproto_msg_delete(msg); > > assert(! ev_is_active(&con->input)); > @@ -1928,6 +2045,10 @@ net_end_subscribe(struct cmsg *m) > struct iproto_connection *con = msg->connection; > > msg->p_ibuf->rpos += msg->len; > + > + if (iproto_connection_fail(msg)) > + return; > + > iproto_msg_delete(msg); > > assert(! ev_is_active(&con->input)); > @@ -1936,81 +2057,60 @@ net_end_subscribe(struct cmsg *m) > } > > /** > - * Handshake a connection: invoke the on-connect trigger > - * and possibly authenticate. Try to send the client an error > - * upon a failure. > + * Handshake a connection: prepare greeting for it. > */ > static void > -tx_process_connect(struct cmsg *m) > +iproto_process_connect(struct iproto_msg *msg) > { > - struct iproto_msg *msg = (struct iproto_msg *) m; > struct iproto_connection *con = msg->connection; > struct obuf *out = msg->connection->tx.p_obuf; > - try { /* connect. */ > - con->session = session_create(SESSION_TYPE_BINARY); > - if (con->session == NULL) > - diag_raise(); > - con->session->meta.connection = con; > - tx_fiber_init(con->session, 0); > - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); > - /* TODO: dirty read from tx thread */ > - struct tt_uuid uuid = INSTANCE_UUID; > - random_bytes(con->salt, IPROTO_SALT_SIZE); > - greeting_encode(greeting, tarantool_version_id(), &uuid, > - con->salt, IPROTO_SALT_SIZE); > - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE); > - if (! rlist_empty(&session_on_connect)) { > - if (session_run_on_connect_triggers(con->session) != 0) > - diag_raise(); > - } > + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE); > + /* TODO: dirty read from tx thread */ > + struct tt_uuid uuid = INSTANCE_UUID; > + random_bytes(con->salt, IPROTO_SALT_SIZE); > + greeting_encode(greeting, tarantool_version_id(), &uuid, > + con->salt, IPROTO_SALT_SIZE); > + if (obuf_dup(out, greeting, IPROTO_GREETING_SIZE) > + != IPROTO_GREETING_SIZE) { > + diag_set(OutOfMemory, IPROTO_GREETING_SIZE, > + "greeting obuf", "dup"); > + iproto_reply_error(out, diag_last_error(&fiber()->diag), > + msg->header.sync, ::schema_version); > iproto_wpos_create(&msg->wpos, out); > - } catch (Exception *e) { > - tx_reply_error(msg); > msg->close_connection = true; > - } > -} > - > -/** > - * Send a response to connect to the client or close the > - * connection in case on_connect trigger failed. > - */ > -static void > -net_send_greeting(struct cmsg *m) > -{ > - struct iproto_msg *msg = (struct iproto_msg *) m; > - struct iproto_connection *con = msg->connection; > - if (msg->close_connection) { > - struct obuf *out = msg->wpos.obuf; > - int64_t nwr = sio_writev(con->output.fd, out->iov, > - obuf_iovcnt(out)); > - > - if (nwr > 0) { > - /* Count statistics. */ > - rmean_collect(rmean_net, IPROTO_SENT, nwr); > - } else if (nwr < 0 && ! sio_wouldblock(errno)) { > - diag_log(); > - } > - assert(iproto_connection_is_idle(con)); > - iproto_connection_close(con); > - iproto_msg_delete(msg); > + iproto_connection_fail(msg); > return; > } > + iproto_wpos_create(&msg->wpos, out); > con->wend = msg->wpos; > - /* > - * Connect is synchronous, so no one could have been > - * messing up with the connection while it was in > - * progress. > - */ > assert(evio_has_fd(&con->output)); > - /* Handshake OK, start reading input. */ > ev_feed_event(con->loop, &con->output, EV_WRITE); > iproto_msg_delete(msg); > } > > -static const struct cmsg_hop connect_route[] = { > - { tx_process_connect, &net_pipe }, > - { net_send_greeting, NULL }, > -}; > +static int > +tx_init_connect(struct iproto_msg *msg) > +{ > + struct iproto_connection *con = msg->connection; > + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead); > + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead); > + con->tx.p_obuf = &con->obuf[0]; > + iproto_wpos_create(&con->wpos, con->tx.p_obuf); > + iproto_wpos_create(&con->wend, con->tx.p_obuf); > + > + con->session = session_create(SESSION_TYPE_BINARY); > + if (con->session == NULL) > + return -1; > + con->session->meta.connection = con; > + > + tx_fiber_init(con->session, 0); > + if (! rlist_empty(&session_on_connect)) { > + if (session_run_on_connect_triggers(con->session) != 0) > + return -1; > + } > + > + return 0; > +} > > /** }}} */ > > @@ -2037,11 +2137,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd, > mempool_free(&iproto_connection_pool, con); > return -1; > } > - cmsg_init(&msg->base, connect_route); > msg->p_ibuf = con->p_ibuf; > msg->wpos = con->wpos; > msg->close_connection = false; > - cpipe_push(&tx_pipe, &msg->base); > + iproto_process_connect(msg); > return 0; > } > > @@ -2054,6 +2153,8 @@ static struct evio_service binary; /* iproto binary listener */ > static int > net_cord_f(va_list /* ap */) > { > + slab_cache_create(&iproto_slabc, &runtime); > + > mempool_create(&iproto_msg_pool, &cord()->slabc, > sizeof(struct iproto_msg)); > mempool_create(&iproto_connection_pool, &cord()->slabc, > @@ -2297,7 +2398,8 @@ iproto_listen(const char *uri) > size_t > iproto_mem_used(void) > { > - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc); > + return slab_cache_used(&net_cord.slabc) > + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc); > } > > size_t > diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result > index 5d064b7648..bfa9c2b759 100644 > --- a/test/box-py/bad_trigger.result > +++ b/test/box-py/bad_trigger.result > @@ -14,6 +14,7 @@ type(box.session.on_connect(f1)) > - function > ... > greeting: True > +Nothing to read yet: Resource temporarily unavailable > fixheader: True > error code 32 > error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value) > diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py > index 7d200b9218..4739dfe136 100644 > --- a/test/box-py/bad_trigger.test.py > +++ b/test/box-py/bad_trigger.test.py > @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection > from lib.tarantool_connection import TarantoolConnection > from tarantool import NetworkError > from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \ > - REQUEST_TYPE_ERROR > + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING > import socket > import msgpack > > @@ -26,9 +26,25 @@ s = conn.socket > # Read greeting > print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE > > -# Read error packet > +# Check socket > IPROTO_FIXHEADER_SIZE = 5 > -fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > +s.setblocking(False) > +fixheader = None > +try: > + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > +except socket.error as err: > + print 'Nothing to read yet:', str(err).split(']')[1] > +else: > + print 'Received fixheader' > +s.setblocking(True) > + > +# Send ping > +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING }) > +s.send(msgpack.dumps(len(query)) + query) > + > +# Read error packet > +if not fixheader: > + fixheader = s.recv(IPROTO_FIXHEADER_SIZE) > print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE > unpacker.feed(fixheader) > packet_len = unpacker.unpack() >
next prev parent reply other threads:[~2020-09-21 15:52 UTC|newest] Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top 2020-09-18 13:09 Ilya Kosarev 2020-09-21 15:51 ` Leonid Vasiliev [this message] 2020-09-25 22:57 ` Ilya Kosarev
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=280d8634-36ce-611c-faab-c8e887c5f74c@tarantool.org \ --to=lvasiliev@tarantool.org \ --cc=alyapunov@tarantool.org \ --cc=i.kosarev@tarantool.org \ --cc=tarantool-patches@dev.tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v2] iproto: make iproto thread more independent from tx' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox