Понедельник, 14 сентября 2020, 19:17 +03:00 от Leonid Vasiliev <lvasiliev@tarantool.org>:
Hi! Thank you for the patch.
Look some comments/questions below:
- Please add @ChangeLog
- Look like "@TarantoolBot document" needs to be added to update
https://www.tarantool.io/en/doc/2.5/reference/reference_lua/net_box/#net-box-on-connect
- How do you check the problem is gone? Any stress test?
On 14.08.2020 13:47, Ilya Kosarev wrote:
> On connection, an evio service callback is invoked to accept it. The
> next step after acception was to process connection to tx thread
> through cbus. This meant that any connection interaction involves
> tx thread even before we get to decode what does the client want
> from us. Consequently, a number of problems appears. The main one
> is that we might get descriptor leak in case of unresponsive tx thread
> (for example, when building secondary index). There are some other
> cases where we might not want to spend precious tx time to process
> the connection in case iproto can do it all alone.
> This patch allows iproto to accept connection and send greeting by
> itself. The connection is initialized in tx thread when the real
> request comes through iproto_msg_decode(). In case request type was not
> recognized we can also send reply with an error without using tx. It is
> planned to add more iproto logic to prevent extra interaction with
> tx thread. This patch already to some extent solves descriptors leakage
> problem as far as connection establishes and stays in fetch_schema
> state while tx thread is unresponsive.
> The other user visible change is that on_connect triggers won't run on
> connections that don't provide any input, as reflected in
> bad_trigger.test.py.
- Is it ok from exploitation point of view? Did you have a conversation
with Mons?
>
> Part of #3776
> ---
> Branch: https://github.com/tarantool/tarantool/tree/i.kosarev/gh-3776-handling-connections-in-iproto
> Issue: https://github.com/tarantool/tarantool/issues/3776
>
> src/box/iproto.cc | 287 ++++++++++++++++++++------------
> test/box-py/bad_trigger.result | 1 +
> test/box-py/bad_trigger.test.py | 22 ++-
> 3 files changed, 202 insertions(+), 108 deletions(-)
>
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index b8f65e5eca..a027d15c1d 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
- "on_connect trigger must be processed before any other request on this
connection." (from comment) - It looks like it's worth clarifying this
guarantee.
> @@ -245,7 +245,7 @@ iproto_msg_new(struct iproto_connection *con);
> static void
> iproto_resume(void);
>
> -static void
> +static int
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input);
>
> @@ -256,6 +256,9 @@ iproto_msg_delete(struct iproto_msg *msg)
> iproto_resume();
> }
>
> +static inline void
> +iproto_connection_delete(struct iproto_connection *con);
> +
> /**
> * A single global queue for all requests in all connections. All
> * requests from all connections are processed concurrently.
> @@ -280,6 +283,11 @@ static struct cord net_cord;
> * in the tx thread.
> */
> static struct slab_cache net_slabc;
> +/**
> + * Slab cache used for allocating memory for output network buffers
> + * in the iproto thread.
> + */
> +static struct slab_cache iproto_slabc;
>
> struct rmean *rmean_net;
>
> @@ -298,6 +306,9 @@ const char *rmean_net_strings[IPROTO_LAST] = {
> "REQUESTS",
> };
>
> +static int
> +tx_init_connect(struct iproto_msg *msg);
> +
> static void
> tx_process_destroy(struct cmsg *m);
>
> @@ -650,7 +661,18 @@ iproto_connection_try_to_start_destroy(struct iproto_connection *con)
> * other parts of the connection.
> */
> con->state = IPROTO_CONNECTION_DESTROYED;
> - cpipe_push(&tx_pipe, &con->destroy_msg);
> + if (con->session != NULL)
> + cpipe_push(&tx_pipe, &con->destroy_msg);
> + else {
> + /*
> + * In case session was not created we can safely destroy
> + * not involving tx thread. Thus we also need to destroy
> + * obuf, which still belongs to iproto thread.
> + */
> + obuf_destroy(&con->obuf[0]);
> + obuf_destroy(&con->obuf[1]);
> + iproto_connection_delete(con);
> + }
> }
>
> /**
> @@ -677,9 +699,18 @@ iproto_connection_close(struct iproto_connection *con)
> * is done only once.
> */
> con->p_ibuf->wpos -= con->parse_size;
> - cpipe_push(&tx_pipe, &con->disconnect_msg);
> assert(con->state == IPROTO_CONNECTION_ALIVE);
> con->state = IPROTO_CONNECTION_CLOSED;
> + rlist_del(&con->in_stop_list);
- Is done below(rlist_del()). Or is there some kind of trick?
> + if (con->session != NULL)
> + cpipe_push(&tx_pipe, &con->disconnect_msg);
> + else
> + /*
> + * In case session was not created we can safely
> + * try to start destroy not involving tx thread.
> + */
> + iproto_connection_try_to_start_destroy(con);
> + return;
> } else if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) {
> iproto_connection_try_to_start_destroy(con);
> } else {
> @@ -809,6 +840,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
> assert(rlist_empty(&con->in_stop_list));
> int n_requests = 0;
> bool stop_input = false;
> + bool obuf_in_iproto = (con->session == NULL);
> const char *errmsg;
> while (con->parse_size != 0 && !stop_input) {
> if (iproto_check_msg_max()) {
> @@ -853,12 +885,20 @@ err_msgpack:
>
> msg->len = reqend - reqstart; /* total request length */
>
> - iproto_msg_decode(msg, &pos, reqend, &stop_input);
> - /*
> - * This can't throw, but should not be
> - * done in case of exception.
> - */
> - cpipe_push_input(&tx_pipe, &msg->base);
> + if (iproto_msg_decode(msg, &pos, reqend, &stop_input) == 0) {
> + if (obuf_in_iproto) {
> + /*
> + * If session was not created yet and obuf is
> + * still in iproto we need to destroy it. New
> + * one will be created in tx thread if needed.
> + */
> + obuf_destroy(&con->obuf[0]);
> + obuf_destroy(&con->obuf[1]);
> + obuf_in_iproto = false; > + }
> + cpipe_push_input(&tx_pipe, &msg->base);
> + }
> +
> n_requests++;
> /* Request is parsed */
> assert(reqend > reqstart);
> @@ -1105,8 +1145,8 @@ iproto_connection_new(int fd)
> ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
> ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
> ibuf_create(&con->ibuf[1], cord_slab_cache(), iproto_readahead);
> - obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> - obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> + obuf_create(&con->obuf[0], &iproto_slabc, iproto_readahead);
> + obuf_create(&con->obuf[1], &iproto_slabc, iproto_readahead);
> con->p_ibuf = &con->ibuf[0];
> con->tx.p_obuf = &con->obuf[0];
> iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> @@ -1134,10 +1174,6 @@ iproto_connection_delete(struct iproto_connection *con)
> assert(!evio_has_fd(&con->input));
> assert(con->session == NULL);
> assert(con->state == IPROTO_CONNECTION_DESTROYED);
> - /*
> - * The output buffers must have been deleted
> - * in tx thread.
> - */
> ibuf_destroy(&con->ibuf[0]);
> ibuf_destroy(&con->ibuf[1]);
> assert(con->obuf[0].pos == 0 &&
> @@ -1172,6 +1208,9 @@ tx_reply_error(struct iproto_msg *msg);
> static void
> tx_reply_iproto_error(struct cmsg *m);
>
> +static void
> +tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos);
> +
> static void
> net_send_msg(struct cmsg *msg);
>
> @@ -1244,7 +1283,7 @@ static const struct cmsg_hop error_route[] = {
> { net_send_error, NULL },
> };
>
> -static void
> +static int
> iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> bool *stop_input)
> {
> @@ -1314,13 +1353,27 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> (uint32_t) type);
> goto error;
> }
> - return;
> + return 0;
> error:
> /** Log and send the error. */
> diag_log();
> diag_create(&msg->diag);
> diag_move(&fiber()->diag, &msg->diag);
- Unneeded in case session == NULL. Use net_send_msg() instead of
net_send_error().
> - cmsg_init(&msg->base, error_route);
> + if (msg->connection->session != NULL) {
> + cmsg_init(&msg->base, error_route);
> + return 0;
- May be return -1 (decode fail). What does the return value mean
otherwise?
> + }
> + /*
> + * In case session was not created we can process error path
> + * without tx thread.
> + */
> + tx_accept_wpos(msg->connection, &msg->wpos);
> + struct obuf *out = msg->connection->tx.p_obuf;
> + iproto_reply_error(out, diag_last_error(&msg->diag),
> + msg->header.sync, ::schema_version);
> + iproto_wpos_create(&msg->wpos, out);
> + net_send_error(&(msg->base));
> + return -1;
> }
>
> static void
> @@ -1382,10 +1435,6 @@ tx_process_destroy(struct cmsg *m)
> session_destroy(con->session);
> con->session = NULL; /* safety */
> }
> - /*
> - * Got to be done in iproto thread since
> - * that's where the memory is allocated.
> - */
> obuf_destroy(&con->obuf[0]);
> obuf_destroy(&con->obuf[1]);
> }
> @@ -1478,13 +1527,21 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> -static inline struct iproto_msg *
> -tx_accept_msg(struct cmsg *m)
> +static inline int
> +tx_accept_msg(struct cmsg *m, struct iproto_msg **msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - tx_accept_wpos(msg->connection, &msg->wpos);
> - tx_fiber_init(msg->connection->session, msg->header.sync);
> - return msg;
> + *msg = (struct iproto_msg *) m;
> + /*
> + * In case session was not created we need to init connection in tx and
> + * create it here.
> + */
> + if ((*msg)->connection->session == NULL && tx_init_connect(*msg) != 0) {
> + (*msg)->close_connection = true;
> + return -1;
> + }
> + tx_accept_wpos((*msg)->connection, &(*msg)->wpos);
> + tx_fiber_init((*msg)->connection->session, (*msg)->header.sync);
> + return 0;
> }
>
> /**
> @@ -1507,7 +1564,8 @@ tx_reply_error(struct iproto_msg *msg)
> static void
> tx_reply_iproto_error(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + tx_accept_msg(m, &msg);
- Please add a comment why the return value check is unnecessary.
> struct obuf *out = msg->connection->tx.p_obuf;
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> @@ -1527,7 +1585,9 @@ tx_inject_delay(void)
> static void
> tx_process1(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1553,17 +1613,20 @@ error:
> static void
> tx_process_select(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + struct request *req;
- Unneeded change.
> struct obuf *out;
> struct obuf_svp svp;
> struct port port;
> int count;
> int rc;
> - struct request *req = &msg->dml;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> tx_inject_delay();
> + req = &msg->dml;
> rc = box_select(req->space_id, req->index_id,
> req->iterator, req->offset, req->limit,
> req->key, req->key_end, &port);
> @@ -1607,7 +1670,9 @@ tx_process_call_on_yield(struct trigger *trigger, void *event)
> static void
> tx_process_call(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1686,13 +1751,15 @@ error:
> static void
> tx_process_misc(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> - struct iproto_connection *con = msg->connection;
> - struct obuf *out = con->tx.p_obuf;
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> try {
> + struct iproto_connection *con = msg->connection;
> + struct obuf *out = con->tx.p_obuf;
> struct ballot ballot;
> switch (msg->header.type) {
> case IPROTO_AUTH:
> @@ -1729,7 +1796,7 @@ error:
> static void
> tx_process_sql(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> struct obuf *out;
> struct port port;
> struct sql_bind *bind = NULL;
> @@ -1738,6 +1805,8 @@ tx_process_sql(struct cmsg *m)
> uint32_t len;
> bool is_unprepare = false;
>
> + if (tx_accept_msg(m, &msg) != 0)
> + goto error;
> if (tx_check_schema(msg->header.schema_version))
> goto error;
> assert(msg->header.type == IPROTO_EXECUTE ||
> @@ -1825,7 +1894,11 @@ error:
> static void
> tx_process_replication(struct cmsg *m)
> {
> - struct iproto_msg *msg = tx_accept_msg(m);
> + struct iproto_msg *msg;
> + if (tx_accept_msg(m, &msg) != 0) {
> + tx_reply_error(msg);
> + return;
> + }
> struct iproto_connection *con = msg->connection;
> struct ev_io io;
> coio_create(&io, con->input.fd);
> @@ -1865,6 +1938,29 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +static int
> +net_check_connection(struct iproto_msg *msg)
- Seems like the "bool" type should be used for net_check_connection
(https://www.tarantool.io/en/doc/1.10/dev_guide/c_style_guide/#chapter-16-function-return-values-and-names).
In the function, you not only check a connection status, but also close
it if necessary. I think it would be nice to add a description of the
function.
> +{
> + if (!msg->close_connection)
> + return 0;
> +
> + struct iproto_connection *con = msg->connection;
> + struct obuf *out = msg->wpos.obuf;
> + int64_t nwr = sio_writev(con->output.fd, out->iov,
> + obuf_iovcnt(out));
> +
> + if (nwr > 0) {
> + /* Count statistics. */
> + rmean_collect(rmean_net, IPROTO_SENT, nwr);
> + } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> + diag_log();
> + }
> + assert(iproto_connection_is_idle(con));
> + iproto_connection_close(con);
> + iproto_msg_delete(msg);
> + return -1;
> +}
> +
> static void
> net_send_msg(struct cmsg *m)
> {
> @@ -1881,6 +1977,9 @@ net_send_msg(struct cmsg *m)
> }
> con->wend = msg->wpos;
>
> + if (net_check_connection(msg) != 0)
> + return;
> +
> if (evio_has_fd(&con->output)) {
> if (! ev_is_active(&con->output))
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> @@ -1910,6 +2009,10 @@ net_end_join(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (net_check_connection(msg) != 0)
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1928,6 +2031,10 @@ net_end_subscribe(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
>
> msg->p_ibuf->rpos += msg->len;
> +
> + if (net_check_connection(msg) != 0)
> + return;
> +
> iproto_msg_delete(msg);
>
> assert(! ev_is_active(&con->input));
> @@ -1936,81 +2043,49 @@ net_end_subscribe(struct cmsg *m)
> }
>
> /**
> - * Handshake a connection: invoke the on-connect trigger
> - * and possibly authenticate. Try to send the client an error
> - * upon a failure.
> + * Handshake a connection: prepare greeting for it.
> */
> static void
> -tx_process_connect(struct cmsg *m)
> +iproto_process_connect(struct iproto_msg *msg)
> {
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> struct obuf *out = msg->connection->tx.p_obuf;
> - try { /* connect. */
- Why "try - catch" has been deleted?
> - con->session = session_create(SESSION_TYPE_BINARY);
> - if (con->session == NULL)
> - diag_raise();
> - con->session->meta.connection = con;
> - tx_fiber_init(con->session, 0);
> - char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> - /* TODO: dirty read from tx thread */
> - struct tt_uuid uuid = INSTANCE_UUID;
> - random_bytes(con->salt, IPROTO_SALT_SIZE);
> - greeting_encode(greeting, tarantool_version_id(), &uuid,
> - con->salt, IPROTO_SALT_SIZE);
> - obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> - if (! rlist_empty(&session_on_connect)) {
> - if (session_run_on_connect_triggers(con->session) != 0)
> - diag_raise();
> - }
> - iproto_wpos_create(&msg->wpos, out);
> - } catch (Exception *e) {
> - tx_reply_error(msg);
> - msg->close_connection = true;
> - }
> -}
> -
> -/**
> - * Send a response to connect to the client or close the
> - * connection in case on_connect trigger failed.
> - */
> -static void
> -net_send_greeting(struct cmsg *m)
> -{
> - struct iproto_msg *msg = (struct iproto_msg *) m;
> - struct iproto_connection *con = msg->connection;
> - if (msg->close_connection) {
> - struct obuf *out = msg->wpos.obuf;
> - int64_t nwr = sio_writev(con->output.fd, out->iov,
> - obuf_iovcnt(out));
> -
> - if (nwr > 0) {
> - /* Count statistics. */
> - rmean_collect(rmean_net, IPROTO_SENT, nwr);
> - } else if (nwr < 0 && ! sio_wouldblock(errno)) {
> - diag_log();
> - }
> - assert(iproto_connection_is_idle(con));
> - iproto_connection_close(con);
> - iproto_msg_delete(msg);
> - return;
> - }
> + char *greeting = (char *) static_alloc(IPROTO_GREETING_SIZE);
> + struct tt_uuid uuid = INSTANCE_UUID;
> + random_bytes(con->salt, IPROTO_SALT_SIZE);
> + greeting_encode(greeting, tarantool_version_id(), &uuid,
> + con->salt, IPROTO_SALT_SIZE);
> + obuf_dup_xc(out, greeting, IPROTO_GREETING_SIZE);
> + iproto_wpos_create(&msg->wpos, out);
> con->wend = msg->wpos;
> - /*
> - * Connect is synchronous, so no one could have been
> - * messing up with the connection while it was in
> - * progress.
> - */
> assert(evio_has_fd(&con->output));
> - /* Handshake OK, start reading input. */
> ev_feed_event(con->loop, &con->output, EV_WRITE);
> iproto_msg_delete(msg);
> }
>
> -static const struct cmsg_hop connect_route[] = {
> - { tx_process_connect, &net_pipe },
> - { net_send_greeting, NULL },
> -};
> +static int
> +tx_init_connect(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + obuf_create(&con->obuf[0], &net_slabc, iproto_readahead);
> + obuf_create(&con->obuf[1], &net_slabc, iproto_readahead);
> + con->tx.p_obuf = &con->obuf[0];
> + iproto_wpos_create(&con->wpos, con->tx.p_obuf);
> + iproto_wpos_create(&con->wend, con->tx.p_obuf);
> +
> + con->session = session_create(SESSION_TYPE_BINARY);
> + if (con->session == NULL)
> + return -1;
> + con->session->meta.connection = con;
> +
> + tx_fiber_init(con->session, 0);
> + if (! rlist_empty(&session_on_connect)) {
> + if (session_run_on_connect_triggers(con->session) != 0)
> + return -1;
> + }
> +
> + return 0;
> +}
>
> /** }}} */
>
> @@ -2037,11 +2112,10 @@ iproto_on_accept(struct evio_service * /* service */, int fd,
> mempool_free(&iproto_connection_pool, con);
> return -1;
> }
> - cmsg_init(&msg->base, connect_route);
> msg->p_ibuf = con->p_ibuf;
> msg->wpos = con->wpos;
> msg->close_connection = false;
> - cpipe_push(&tx_pipe, &msg->base);
> + iproto_process_connect(msg);
> return 0;
> }
>
> @@ -2054,6 +2128,8 @@ static struct evio_service binary; /* iproto binary listener */
> static int
> net_cord_f(va_list /* ap */)
> {
> + slab_cache_create(&iproto_slabc, &runtime);
> +
> mempool_create(&iproto_msg_pool, &cord()->slabc,
> sizeof(struct iproto_msg));
> mempool_create(&iproto_connection_pool, &cord()->slabc,
> @@ -2297,7 +2373,8 @@ iproto_listen(const char *uri)
> size_t
> iproto_mem_used(void)
> {
> - return slab_cache_used(&net_cord.slabc) + slab_cache_used(&net_slabc);
> + return slab_cache_used(&net_cord.slabc)
> + + slab_cache_used(&net_slabc) + slab_cache_used(&iproto_slabc);
> }
>
> size_t
> diff --git a/test/box-py/bad_trigger.result b/test/box-py/bad_trigger.result
> index 5d064b7648..bfa9c2b759 100644
> --- a/test/box-py/bad_trigger.result
> +++ b/test/box-py/bad_trigger.result
> @@ -14,6 +14,7 @@ type(box.session.on_connect(f1))
> - function
> ...
> greeting: True
> +Nothing to read yet: Resource temporarily unavailable
> fixheader: True
> error code 32
> error message: [string "function f1() nosuchfunction() end"]:1: attempt to call global 'nosuchfunction' (a nil value)
> diff --git a/test/box-py/bad_trigger.test.py b/test/box-py/bad_trigger.test.py
> index 7d200b9218..7f45f5e713 100644
> --- a/test/box-py/bad_trigger.test.py
> +++ b/test/box-py/bad_trigger.test.py
> @@ -2,7 +2,7 @@ from lib.box_connection import BoxConnection
> from lib.tarantool_connection import TarantoolConnection
> from tarantool import NetworkError
> from tarantool.const import IPROTO_GREETING_SIZE, IPROTO_CODE, IPROTO_ERROR, \
> - REQUEST_TYPE_ERROR
> + REQUEST_TYPE_ERROR, REQUEST_TYPE_PING
> import socket
> import msgpack
>
> @@ -26,9 +26,25 @@ s = conn.socket
> # Read greeting
> print 'greeting: ', len(s.recv(IPROTO_GREETING_SIZE)) == IPROTO_GREETING_SIZE
>
> -# Read error packet
> +# Check soscket
- Typo "socket"
> IPROTO_FIXHEADER_SIZE = 5
> -fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +s.setblocking(False)
> +fixheader = None
> +try:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> +except socket.error as err:
> + print 'Nothing to read yet:', str(err).split(']')[1]
> +else:
> + print 'Received fixheader'
> +s.setblocking(True)
> +
> +# Send ping
> +query = msgpack.dumps({ IPROTO_CODE : REQUEST_TYPE_PING })
> +s.send(msgpack.dumps(len(query)) + query)
> +
> +# Read error packet
> +if not fixheader:
> + fixheader = s.recv(IPROTO_FIXHEADER_SIZE)
> print 'fixheader: ', len(fixheader) == IPROTO_FIXHEADER_SIZE
> unpacker.feed(fixheader)
> packet_len = unpacker.unpack()
>