From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 733786CAD4; Fri, 6 Aug 2021 15:59:35 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 733786CAD4 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628254775; bh=WAX8FADoCC8GGQV4E6/TZZmjUNtv7h+crTkRkoMxeYk=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=uVpVO/tdHPFZZ/fK4vWg8wi0aB4A9gU6EeCtDAg26fl5Cq48zTPvHuvx5b6FbEj89 uvrB7x4abOaxt3qFC7hM9El5nIhDSBVkJEzUj9FnaD1InWjnhkLvQlUQ/qb7/ZVO8D W261fl+Tn1LMFF9rgBxTr19dg69KImFpu0bZaVRE= Received: from smtpng2.i.mail.ru (smtpng2.i.mail.ru [94.100.179.3]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id C0D926CAD4 for ; Fri, 6 Aug 2021 15:59:33 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C0D926CAD4 Received: by smtpng2.m.smailru.net with esmtpa (envelope-from ) id 1mBzRk-0004k9-Ao; Fri, 06 Aug 2021 15:59:32 +0300 Date: Fri, 6 Aug 2021 15:59:30 +0300 To: mechanik20051988 Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org, mechanik20051988 Message-ID: <20210806125930.h4u3zpbrxbapwlfz@esperanza> References: <06bcadd06bca14d32391e5612427b7b3f9084d26.1628184138.git.mechanik20.05.1988@gmail.com> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <06bcadd06bca14d32391e5612427b7b3f9084d26.1628184138.git.mechanik20.05.1988@gmail.com> X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9BCE6B93DE0C6C3914462CDB1732D383C182A05F538085040AC21E5B48D9EC8B0147650AF5A42E91BCA15576DFAC4770427E5B0D18EA797AD X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE78C7BFD8663541045EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F79006375C4806A08D329A618638F802B75D45FF36EB9D2243A4F8B5A6FCA7DBDB1FC311F39EFFDF887939037866D6147AF826D87995459E8C08325775A26700B94D5FC5117882F4460429724CE54428C33FAD305F5C1EE8F4F765FCAE9A1BBD95851C5BA471835C12D1D9774AD6D5ED66289B52BA9C0B312567BB23117882F446042972877693876707352026055571C92BF10F618001F51B5FD3F9D2E47CDBA5A96583BA9C0B312567BB231DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6A45692FFBBD75A6A089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-C1DE0DAB: 0D63561A33F958A5A2FA42A3BB9946BDEF16A832B8BB3B2E4A788EA44BCAE90AD59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA7501A9DF589746230F410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D34A08F09726D78E07B6606B43CC40E3E43508C1B2BE09116115909DFC5B4D0CC295170858B4FEC2C531D7E09C32AA3244CB7E6F54F6C263481F8131C6366F195E7C3B3ADDA61883BB583B48618A63566E0 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2biojFhlvmGwdUwS8uZ5QZorRUg== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5DDADA97A44CC42239579C980AFACEE6E8274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" On Thu, Aug 05, 2021 at 09:17:44PM +0300, mechanik20051988 wrote: > From: mechanik20051988 > > Implement interactive transactions over iproto streams. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. If any request fails during the > transaction, it will not affect the other requests in the transaction. > If disconnect occurs when there is some active transaction in stream, > this transaction will be rollbacked, if it does not have time to commit > before this moment. > > Part of #5860 > > @TarantoolBot document > Title: interactive transactions was implemented over iproto streams. > The main purpose of streams is transactions via iproto. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. There are multiple ways to begin, > commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL > with corresponding function (box.begin, box.commit and box.rollback), > IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START', > 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT, > IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is > some active transaction in stream, this transaction will be rollbacked, if it > does not have time to commit before this moment. > Add new command codes for begin, commit and rollback transactions: > `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and > `IPROTO_TRANSACTION_ROLLBACK 16` accordingly. > --- > src/box/call.c | 10 +- > src/box/errcode.h | 1 + > src/box/iproto.cc | 249 +++++++++++++++++++++++++- > src/box/iproto_constants.c | 6 + > src/box/iproto_constants.h | 10 +- > test/box-tap/feedback_daemon.test.lua | 2 +- > test/box/error.result | 1 + > test/box/misc.result | 5 +- > 8 files changed, 274 insertions(+), 10 deletions(-) > > diff --git a/src/box/call.c b/src/box/call.c > index a6384efe2..9ba0fa9ac 100644 > --- a/src/box/call.c > +++ b/src/box/call.c > @@ -141,8 +141,9 @@ box_process_call(struct call_request *request, struct port *port) > const char *name = request->name; > assert(name != NULL); > uint32_t name_len = mp_decode_strl(&name); > - /* Transaction is not started. */ > - assert(!in_txn()); > + uint64_t stream_id = request->header->stream_id; > + /* Transaction is not started for not stream requests. */ > + assert(stream_id != 0 || !in_txn()); > > int rc; > struct port args; > @@ -157,7 +158,7 @@ box_process_call(struct call_request *request, struct port *port) > } > if (rc != 0) > return -1; > - if (in_txn() != NULL) { > + if (in_txn() != NULL && stream_id == 0) { > diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); > port_destroy(port); > return -1; > @@ -177,9 +178,10 @@ box_process_eval(struct call_request *request, struct port *port) > request->args_end - request->args); > const char *expr = request->expr; > uint32_t expr_len = mp_decode_strl(&expr); > + uint64_t stream_id = request->header->stream_id; > if (box_lua_eval(expr, expr_len, &args, port) != 0) > return -1; > - if (in_txn() != 0) { > + if (in_txn() != 0 && stream_id == 0) { > diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); > port_destroy(port); > return -1; > diff --git a/src/box/errcode.h b/src/box/errcode.h > index 6c8c00256..e76fd6442 100644 > --- a/src/box/errcode.h > +++ b/src/box/errcode.h > @@ -279,6 +279,7 @@ struct errcode_record { > /*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\ > /*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \ > /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \ > + /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process this type (%u) of requests out of stream") \ The same comment as for ER_UNABLE_TO_PROCESS_IN_STREAM: I think we should report a human-readable request type name, not just an id. > > /* > * !IMPORTANT! Please follow instructions at start of the file > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 3642cbd02..37715ab7f 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -65,6 +65,7 @@ > #include "tt_static.h" > #include "salad/stailq.h" > #include "assoc.h" > +#include "txn.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -78,6 +79,8 @@ enum { > struct iproto_connection; > > struct iproto_stream { > + /** Currently active stream transaction or NULL */ > + struct txn *txn; > /** > * Queue of pending requests (iproto messages) for this stream, > * processed sequentially. This field is accesable only from > @@ -88,6 +91,8 @@ struct iproto_stream { > uint64_t id; > /** This stream connection */ > struct iproto_connection *connection; > + /** Pre-allocated disconnect msg to gracefully destroy stream */ > + struct cmsg on_disconnect; > }; > > /** > @@ -134,6 +139,10 @@ struct iproto_thread { > /** > * Static routes for this iproto thread > */ > + struct cmsg_hop begin_route[2]; > + struct cmsg_hop commit_route[2]; > + struct cmsg_hop rollback_route[2]; > + struct cmsg_hop destroy_stream_on_disconnect_route[2]; > struct cmsg_hop destroy_route[2]; > struct cmsg_hop disconnect_route[2]; > struct cmsg_hop misc_route[2]; > @@ -622,12 +631,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) > return NULL; > } > errinj_stream_count_add(1); > + stream->txn = NULL; > stailq_create(&stream->pending_requests); > stream->id = stream_id; > stream->connection = connection; > return stream; > } > > +static inline void > +iproto_stream_push_on_disconnect(struct iproto_stream *stream) > +{ > + struct iproto_connection *conn = stream->connection; > + struct iproto_thread *iproto_thread = conn->iproto_thread; > + struct cmsg_hop *route = > + iproto_thread->destroy_stream_on_disconnect_route; > + cmsg_init(&stream->on_disconnect, route); > + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); > +} > + > /** > * Return true if we have not enough spare messages > * in the message pool. > @@ -651,6 +672,7 @@ static void > iproto_stream_delete(struct iproto_stream *stream) > { > assert(stailq_empty(&stream->pending_requests)); > + assert(stream->txn == NULL); > errinj_stream_count_add(-1); > mempool_free(&stream->connection->iproto_thread->stream_pool, stream); > } > @@ -697,6 +719,7 @@ static inline bool > iproto_connection_is_idle(struct iproto_connection *con) > { > return con->long_poll_count == 0 && > + mh_size(con->streams) == 0 && Why? A message in a stream should pin the input buffer, shouldn't it? > ibuf_used(&con->ibuf[0]) == 0 && > ibuf_used(&con->ibuf[1]) == 0; > } > @@ -786,6 +809,23 @@ iproto_connection_close(struct iproto_connection *con) > * is done only once. > */ > con->p_ibuf->wpos -= con->parse_size; > + mh_int_t node; > + mh_foreach(con->streams, node) { > + struct iproto_stream *stream = (struct iproto_stream *) > + mh_i64ptr_node(con->streams, node)->val; > + /** > + * If stream requests queue is empty, it means that > + * that there is some active transaction which was > + * not commited yet. We need to rollback it, since > + * we push on_disconnect message to tx thread here. > + * If stream requests queue is not empty, it means > + * that stream processing some request in tx thread > + * now. We destroy stream in `net_send_msg` after > + * processing all requests. > + */ > + if (stailq_empty(&stream->pending_requests)) > + iproto_stream_push_on_disconnect(stream); > + } This should be done by disconnect_msg. There shouldn't be a separate route for destroying unfinished transactions. > cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg); > assert(con->state == IPROTO_CONNECTION_ALIVE); > con->state = IPROTO_CONNECTION_CLOSED; > @@ -946,6 +986,7 @@ iproto_msg_set_stream(struct iproto_msg *msg) > */ > errinj_stream_msg_count_add(1); > stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; > + assert(stream != NULL); > msg->stream = stream; > /* > * If the request queue in the stream is not empty, it means > @@ -1388,6 +1429,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > uint64_t stream_id; > uint8_t type; > bool request_is_not_for_stream; > + bool request_is_only_for_stream; > struct iproto_thread *iproto_thread = msg->connection->iproto_thread; > > if (xrow_header_decode(&msg->header, pos, reqend, true)) > @@ -1399,11 +1441,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > request_is_not_for_stream = > ((type > IPROTO_TYPE_STAT_MAX && > type != IPROTO_PING) || type == IPROTO_AUTH); > + request_is_only_for_stream = > + (type == IPROTO_TRANSACTION_BEGIN || > + type == IPROTO_TRANSACTION_COMMIT || > + type == IPROTO_TRANSACTION_ROLLBACK); > > if (stream_id != 0 && request_is_not_for_stream) { > diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, > (uint32_t) type); > goto error; > + } else if (stream_id == 0 && request_is_only_for_stream) { > + diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, > + (uint32_t) type); > + goto error; > } > > /* > @@ -1418,6 +1468,9 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, > case IPROTO_UPDATE: > case IPROTO_DELETE: > case IPROTO_UPSERT: > + case IPROTO_TRANSACTION_BEGIN: > + case IPROTO_TRANSACTION_COMMIT: > + case IPROTO_TRANSACTION_ROLLBACK: > if (xrow_decode_dml(&msg->header, &msg->dml, > dml_request_key_map(type))) Why do you need to decode a body for these messages? AFAIU they don't have a body. > goto error; > @@ -1498,6 +1551,37 @@ tx_fiber_init(struct session *session, uint64_t sync) > fiber_set_user(f, &session->credentials); > } > > +static void > +tx_process_destroy_stream_on_disconnect(struct cmsg *m) > +{ > + struct iproto_stream *stream = > + container_of(m, struct iproto_stream, > + on_disconnect); > + > + if (stream->txn != NULL) { > + tx_fiber_init(stream->connection->session, 0); > + fiber_set_txn(fiber(), stream->txn); > + if (box_txn_rollback() != 0) > + panic("failed to rollback transaction on disconnect"); > + stream->txn = NULL; > + } > +} > + > +static void > +net_finish_destroy_stream_on_disconnect(struct cmsg *m) > +{ > + struct iproto_stream *stream = > + container_of(m, struct iproto_stream, > + on_disconnect); > + struct iproto_connection *con = stream->connection; > + > + struct mh_i64ptr_node_t node = { stream->id, NULL }; > + mh_i64ptr_remove(con->streams, &node, 0); > + iproto_stream_delete(stream); > + assert(!evio_has_fd(&con->input)); > + if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) > + iproto_connection_try_to_start_destroy(con); > +} > > static void > tx_process_disconnect(struct cmsg *m) > @@ -1632,15 +1716,53 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) > } > } > > +/** > + * Since the processing of requests within a transaction > + * for a stream can occur in different fibers, we store > + * a pointer to transaction in the stream structure. > + * Check if message belongs to stream and there is active > + * transaction for this stream. In case it is so, sets this > + * transaction for current fiber. > + */ > +static inline void > +tx_prepare_transaction_for_request(struct iproto_msg *msg) The function is used in just one place. Should be fine to inline it. > +{ > + if (msg->stream != NULL) { > + struct txn *txn = msg->stream->txn; > + /* > + * When we do any operations (which are written to `wal`) > + * outside of transaction, we consider each such operation > + * as a small transaction and write it to `wal` immediately. > + * When operation is performed as part of transaction, we > + * write all transaction to `wal` at the commit. In this case, > + * `is_commit` flag will be set when writing to `wal` for the > + * last operation in transaction, the rest operations must have > + * this flag set to false, to mark that they all belongs to the > + * same transaction. > + */ > + if (txn != NULL) > + msg->header.is_commit = false; AFAIU this flag is used only when a statement is written to WAL. Please remove. > + fiber_set_txn(fiber(), txn); I would clear stream->txn here: a call() may start a new transaction, in which case stream->txn will refer to feed memory, which is error-prone. You wouldn't need to clear stream->txn after commit and rollback if you cleared it here. > + } > +} > + > static inline struct iproto_msg * > tx_accept_msg(struct cmsg *m) > { > struct iproto_msg *msg = (struct iproto_msg *) m; > tx_accept_wpos(msg->connection, &msg->wpos); > tx_fiber_init(msg->connection->session, msg->header.sync); > + tx_prepare_transaction_for_request(msg); > return msg; > } > > +static inline void > +tx_end_msg(struct iproto_msg *msg) > +{ > + if (msg->stream != NULL) > + msg->stream->txn = txn_detach(); > +} > + > /** > * Write error message to the output buffer and advance > * write position. Doesn't throw. > @@ -1666,6 +1788,7 @@ tx_reply_iproto_error(struct cmsg *m) > iproto_reply_error(out, diag_last_error(&msg->diag), > msg->header.sync, ::schema_version); > iproto_wpos_create(&msg->wpos, out); > + tx_end_msg(msg); > } > > /** Inject a short delay on tx request processing for testing. */ > @@ -1678,6 +1801,79 @@ tx_inject_delay(void) > }); > } > > +static void > +tx_process_begin(struct cmsg *m) > +{ > + struct iproto_msg *msg = tx_accept_msg(m); > + struct obuf *out; > + struct iproto_stream *stream = msg->stream; > + > + if (tx_check_schema(msg->header.schema_version)) > + goto error; > + > + if (box_txn_begin() != 0) > + goto error; > + > + stream->txn = txn_detach(); Can you always do it in just one place - tx_end_msg? > + assert(stream->txn != NULL); > + > + out = msg->connection->tx.p_obuf; > + iproto_reply_ok(out, msg->header.sync, ::schema_version); > + iproto_wpos_create(&msg->wpos, out); > + return; > +error: > + tx_reply_error(msg); > + tx_end_msg(msg); It looks inconsistent that sometimes we call tx_end_msg after processing a request, sometimes we don't. Let's call tx_end_msg after processing every message. Also, I think it would be better if we raised an error in tx_end_msg if tx is active, but stream is unset. Then we wouldn't need to check stream_id in box_process_call/eval. > +} > + > +static void > +tx_process_commit(struct cmsg *m) > +{ > + struct iproto_msg *msg = tx_accept_msg(m); > + struct obuf *out; > + struct iproto_stream *stream = msg->stream; > + > + if (tx_check_schema(msg->header.schema_version)) > + goto error; > + > + if (box_txn_commit() != 0) { > + stream->txn = in_txn(); > + goto error; > + } > + > + stream->txn = NULL; > + out = msg->connection->tx.p_obuf; > + iproto_reply_ok(out, msg->header.sync, ::schema_version); > + iproto_wpos_create(&msg->wpos, out); > + return; > +error: > + tx_reply_error(msg); > + tx_end_msg(msg); > +} > + > +static void > +tx_process_rollback(struct cmsg *m) > +{ > + struct iproto_msg *msg = tx_accept_msg(m); > + struct obuf *out; > + struct iproto_stream *stream = msg->stream; > + > + if (tx_check_schema(msg->header.schema_version)) > + goto error; > + > + if (box_txn_rollback() != 0) > + goto error; > + > + stream->txn = NULL; > + out = msg->connection->tx.p_obuf; > + iproto_reply_ok(out, msg->header.sync, ::schema_version); > + iproto_wpos_create(&msg->wpos, out); > + return; > +error: > + tx_reply_error(msg); > + tx_end_msg(msg); > +} > + > static void > tx_process1(struct cmsg *m) > { > @@ -1699,9 +1895,11 @@ tx_process1(struct cmsg *m) > iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, > tuple != 0); > iproto_wpos_create(&msg->wpos, out); > + tx_end_msg(msg); > return; > error: > tx_reply_error(msg); > + tx_end_msg(msg); > } > > static void > @@ -1742,9 +1940,11 @@ tx_process_select(struct cmsg *m) > iproto_reply_select(out, &svp, msg->header.sync, > ::schema_version, count); > iproto_wpos_create(&msg->wpos, out); > + tx_end_msg(msg); > return; > error: > tx_reply_error(msg); > + tx_end_msg(msg); > } > > static int > @@ -1829,11 +2029,13 @@ tx_process_call(struct cmsg *m) > goto error; > } > > + tx_end_msg(msg); > iproto_reply_select(out, &svp, msg->header.sync, > ::schema_version, count); > iproto_wpos_create(&msg->wpos, out); > return; > error: > + tx_end_msg(msg); > tx_reply_error(msg); > } > > @@ -1843,6 +2045,7 @@ tx_process_misc(struct cmsg *m) > struct iproto_msg *msg = tx_accept_msg(m); > struct iproto_connection *con = msg->connection; > struct obuf *out = con->tx.p_obuf; > + assert(!(msg->header.type != IPROTO_PING && in_txn())); > if (tx_check_schema(msg->header.schema_version)) > goto error; > > @@ -1875,9 +2078,11 @@ tx_process_misc(struct cmsg *m) > } catch (Exception *e) { > tx_reply_error(msg); > } > + tx_end_msg(msg); > return; > error: > tx_reply_error(msg); > + tx_end_msg(msg); > } > > static void > @@ -1969,10 +2174,12 @@ tx_process_sql(struct cmsg *m) > goto error; > } > port_destroy(&port); > + tx_end_msg(msg); > iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); > iproto_wpos_create(&msg->wpos, out); > return; Sometimes you call tx_end_msg after writing to obuf, sometimes before. Please be consistent. > error: > + tx_end_msg(msg); > tx_reply_error(msg); > } > > @@ -1983,6 +2190,7 @@ tx_process_replication(struct cmsg *m) > struct iproto_connection *con = msg->connection; > struct ev_io io; > coio_create(&io, con->input.fd); > + assert(!in_txn()); > try { > switch (msg->header.type) { > case IPROTO_JOIN: > @@ -2043,9 +2251,24 @@ net_send_msg(struct cmsg *m) > errinj_stream_msg_count_add(-1); > > if (stailq_empty(&stream->pending_requests)) { > - struct mh_i64ptr_node_t node = { stream->id, NULL }; > - mh_i64ptr_remove(con->streams, &node, 0); > - iproto_stream_delete(stream); > + /* > + * If no more messages for the current stream > + * and no transaction started, then delete it. > + */ > + if (stream->txn == NULL) { > + struct mh_i64ptr_node_t node = { stream->id, NULL }; > + mh_i64ptr_remove(con->streams, &node, 0); > + iproto_stream_delete(stream); > + } else if (!evio_has_fd(&con->input)) { > + /* > + * Here we are in case when connection was closed, > + * there is no messages in stream queue, but there > + * is some active transaction in stream. > + * Send disconnect message to rollback this > + * transaction. > + */ > + iproto_stream_push_on_disconnect(stream); > + } Should be done by disconnect_msg. > } else { > /* > * If there are new messages for this stream > @@ -2374,6 +2597,23 @@ iproto_session_push(struct session *session, struct port *port) > static inline void > iproto_thread_init_routes(struct iproto_thread *iproto_thread) > { > + iproto_thread->begin_route[0] = > + { tx_process_begin, &iproto_thread->net_pipe }; > + iproto_thread->begin_route[1] = > + { net_send_msg, NULL }; > + iproto_thread->commit_route[0] = > + { tx_process_commit, &iproto_thread->net_pipe }; > + iproto_thread->commit_route[1] = > + { net_send_msg, NULL }; > + iproto_thread->rollback_route[0] = > + { tx_process_rollback, &iproto_thread->net_pipe }; > + iproto_thread->rollback_route[1] = > + { net_send_msg, NULL }; > + iproto_thread->destroy_stream_on_disconnect_route[0] = > + { tx_process_destroy_stream_on_disconnect, > + &iproto_thread->net_pipe }; > + iproto_thread->destroy_stream_on_disconnect_route[1] = > + { net_finish_destroy_stream_on_disconnect, NULL }; > iproto_thread->destroy_route[0] = > { tx_process_destroy, &iproto_thread->net_pipe }; > iproto_thread->destroy_route[1] = > @@ -2437,6 +2677,9 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread) > iproto_thread->dml_route[12] = NULL; > /* IPROTO_PREPARE */ > iproto_thread->dml_route[13] = iproto_thread->sql_route; > + iproto_thread->dml_route[14] = iproto_thread->begin_route; > + iproto_thread->dml_route[15] = iproto_thread->commit_route; > + iproto_thread->dml_route[16] = iproto_thread->rollback_route; > iproto_thread->connect_route[0] = > { tx_process_connect, &iproto_thread->net_pipe }; > iproto_thread->connect_route[1] = { net_send_greeting, NULL }; > diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c > index f2902946a..913a64de5 100644 > --- a/src/box/iproto_constants.c > +++ b/src/box/iproto_constants.c > @@ -166,6 +166,9 @@ const char *iproto_type_strs[] = > "EXECUTE", > NULL, /* NOP */ > "PREPARE", > + "BEGIN", > + "COMMIT", > + "ROLLBACK", > }; > > #define bit(c) (1ULL< @@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = { > 0, /* EXECUTE */ > 0, /* NOP */ > 0, /* PREPARE */ > + 0, /* BEGIN */ > + 0, /* COMMIT */ > + 0, /* ROLLBACK */ > }; > #undef bit > > diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h > index 59e8574f3..d8b242f1a 100644 > --- a/src/box/iproto_constants.h > +++ b/src/box/iproto_constants.h > @@ -237,6 +237,12 @@ enum iproto_type { > IPROTO_NOP = 12, > /** Prepare SQL statement. */ > IPROTO_PREPARE = 13, > + /* Begin transaction */ > + IPROTO_TRANSACTION_BEGIN = 14, > + /* Commit transaction */ > + IPROTO_TRANSACTION_COMMIT = 15, > + /* Rollback transaction */ > + IPROTO_TRANSACTION_ROLLBACK = 16, Why not simply IPROTO_BEGIN/COMMIT/ROLLBACK? > /** The maximum typecode used for box.stat() */ > IPROTO_TYPE_STAT_MAX, > > @@ -345,7 +351,9 @@ static inline bool > iproto_type_is_dml(uint16_t type) > { > return (type >= IPROTO_SELECT && type <= IPROTO_DELETE) || > - type == IPROTO_UPSERT || type == IPROTO_NOP; > + type == IPROTO_UPSERT || type == IPROTO_NOP || > + (type >= IPROTO_TRANSACTION_BEGIN && > + type <= IPROTO_TRANSACTION_ROLLBACK); BEGIN, COMMIT, ROLLBACK are not DML statements. Please remove.