From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id 170666EC40; Wed, 11 Aug 2021 15:39:50 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 170666EC40 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628685590; bh=aj2/x6h5FSNu4kipU7FaY0acySWy277dNcZSXrrFPUo=; h=Date:To:Cc:References:In-Reply-To:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=OzAMO18brv+SimDwjmaDBAwq4Hm4Q6L8fxMMroNwGcOxQmvPVfEap/pTT1gy9wRab 19iuTicaqeekd3T0cXwVeNhf89bcCo+cs/Okb3BFzdO3b0r0XNTfZdI7/EwqgCBHYa /fOJznWzhK3JakUOHAle/aJpPN5NqxE5S1sWd/7Y= Received: from smtpng1.i.mail.ru (smtpng1.i.mail.ru [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 2649F6EC40 for ; Wed, 11 Aug 2021 15:39:49 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 2649F6EC40 Received: by smtpng1.m.smailru.net with esmtpa (envelope-from ) id 1mDnWM-0001Os-0V; Wed, 11 Aug 2021 15:39:46 +0300 Date: Wed, 11 Aug 2021 15:39:44 +0300 To: mechanik20051988 Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org, mechanik20051988 Message-ID: <20210811123944.grespqlur2nztyv3@esperanza> References: <06356e3ba5ad12dc98f0aeaa122af0fe022e8822.1628671235.git.mechanik20051988@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <06356e3ba5ad12dc98f0aeaa122af0fe022e8822.1628671235.git.mechanik20051988@tarantool.org> X-4EC0790: 10 X-7564579A: 646B95376F6C166E X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD906AB4890CDABF0C5CB76CEE71D3E4007182A05F53808504061E2C7F4EE7EA19228A0C91EDFEDCB0C057B2E85AACB6F8C430C4C3401C63215 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE7A0175C48BD57B26BC2099A533E45F2D0395957E7521B51C2CFCAF695D4D8E9FCEA1F7E6F0F101C6778DA827A17800CE763424119D34F5CBFEA1F7E6F0F101C6723150C8DA25C47586E58E00D9D99D84E1BDDB23E98D2D38BBCA57AF85F7723F20FD87D5D497B3CE73FA7CCCF0F60D924CC7F00164DA146DAFE8445B8C89999728AA50765F7900637CAEE156C82D3D7D9389733CBF5DBD5E9C8A9BA7A39EFB766F5D81C698A659EA7CC7F00164DA146DA9985D098DBDEAEC8BC0ADEB1C81BB362F6B57BC7E6449061A352F6E88A58FB86F5D81C698A659EA7E827F84554CEF5019E625A9149C048EE9ECD01F8117BC8BEE2021AF6380DFAD18AA50765F790063735872C767BF85DA227C277FBC8AE2E8BDCE939D40DBB93CA75ECD9A6C639B01B4E70A05D1297E1BBCB5012B2E24CD356 X-C1DE0DAB: 0D63561A33F958A53827066AE2EE97F86D711B34ACA0FB3929DCB8587151DC02D59269BC5F550898D99A6476B3ADF6B47008B74DF8BB9EF7333BD3B22AA88B938A852937E12ACA753177526CD55AFC11410CA545F18667F91A7EA1CDA0B5A7A0 X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D3480665FBD8F49180C321EF5E9B7BE3ED2E7CA4014739E2123C633C3F45C21F4B234D620E10FA9EC8F1D7E09C32AA3244CEE9184905309949DE96DA43ED9224D25F2F5F14F68F1805B83B48618A63566E0 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj6qlzQV0oSZMllCNve2EVPQ== X-Mailru-Sender: 689FA8AB762F7393C37E3C1AEC41BA5D424B88E16D8295E751F49D4D5AC58891274CEFED1673C562683ABF942079399BFB559BB5D741EB966A65DFF43FF7BE03240331F90058701C67EA787935ED9F1B X-Mras: Ok Subject: Re: [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: Vladimir Davydov via Tarantool-patches Reply-To: Vladimir Davydov Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" On Wed, Aug 11, 2021 at 11:56:57AM +0300, mechanik20051988 wrote: > From: mechanik20051988 > > Implement interactive transactions over iproto streams. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. If any request fails during the > transaction, it will not affect the other requests in the transaction. > If disconnect occurs when there is some active transaction in stream, > this transaction will be rollbacked, if it does not have time to commit > before this moment. > > Part of #5860 > > @TarantoolBot document > Title: interactive transactions was implemented over iproto streams. > The main purpose of streams is transactions via iproto. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. There are multiple ways to begin, > commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL > with corresponding function (box.begin, box.commit and box.rollback), > IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START', > 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT, > IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is > some active transaction in stream, this transaction will be rollbacked, if it > does not have time to commit before this moment. > Add new command codes for begin, commit and rollback transactions: > `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and > `IPROTO_TRANSACTION_ROLLBACK 16` accordingly. The suggestion was to rename IPROTO_ROLLBACK along with IPROTO_CONFIRM along with IPROTO_CONFIRM to something else (IPROTO_RAFT_ROLLBACK? IPROTO_SYNC_ROLLBACK? Please discuss the name with TeamS) and use IPROTO_ROLLBACK for interactive transactions. > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 3b792130b..376abbff0 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -66,6 +66,7 @@ > #include "tt_static.h" > #include "salad/stailq.h" > #include "assoc.h" > +#include "txn.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -79,6 +80,8 @@ enum { > struct iproto_connection; > > struct iproto_stream { > + /** Currently active stream transaction or NULL */ > + struct txn *txn; > /** > * Queue of pending requests (iproto messages) for this stream, > * processed sequentially. This field is accesable only from > @@ -89,6 +92,11 @@ struct iproto_stream { > uint64_t id; > /** This stream connection */ > struct iproto_connection *connection; > + /** > + * Pre-allocated disconnect msg to gracefully rollback stream > + * transaction and destroy stream object. > + */ > + struct cmsg on_disconnect; > }; > > /** > @@ -135,6 +143,10 @@ struct iproto_thread { > /** > * Static routes for this iproto thread > */ > + struct cmsg_hop begin_route[2]; > + struct cmsg_hop commit_route[2]; > + struct cmsg_hop rollback_route[2]; > + struct cmsg_hop rollback_on_disconnect_stream_route[2]; I think _stream_ is rdundant here. Rollback only makes sense for streams. > struct cmsg_hop destroy_route[2]; > struct cmsg_hop disconnect_route[2]; > struct cmsg_hop misc_route[2]; > @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) > return NULL; > } > errinj_stream_count_add(1); > + stream->txn = NULL; > stailq_create(&stream->pending_requests); > stream->id = stream_id; > stream->connection = connection; > return stream; > } > > +static inline void > +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream) iproto_rollback_on_disconnect? > +{ > + struct iproto_connection *conn = stream->connection; > + struct iproto_thread *iproto_thread = conn->iproto_thread; > + struct cmsg_hop *route = > + iproto_thread->rollback_on_disconnect_stream_route; > + cmsg_init(&stream->on_disconnect, route); > + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); > +} > + > /** > * Return true if we have not enough spare messages > * in the message pool. > @@ -670,6 +694,7 @@ static void > iproto_stream_delete(struct iproto_stream *stream) > { > assert(stailq_empty(&stream->pending_requests)); > + assert(stream->txn == NULL); > errinj_stream_count_add(-1); > mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); > } > @@ -716,6 +741,7 @@ static inline bool > iproto_connection_is_idle(struct iproto_connection *con) > { > return con->long_poll_count == 0 && > + mh_size(con->streams) == 0 && Why do you need this? Stream-lined messages should already pin iproto connection via ibuf. I tried to remove this and rerun the test you added and got no failures. > ibuf_used(&con->ibuf[0]) == 0 && > ibuf_used(&con->ibuf[1]) == 0; > } > @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) > } > } > > +/** > + * Since the processing of requests within a transaction > + * for a stream can occur in different fibers, we store > + * a pointer to transaction in the stream structure. > + * Check if message belongs to stream and there is active > + * transaction for this stream. In case it is so, sets this > + * transaction for current fiber. > + */ > +static inline void > +tx_prepare_transaction_for_request(struct iproto_msg *msg) > +{ > + if (msg->stream != NULL && msg->stream->txn != NULL) { > + txn_attach(msg->stream->txn); > + msg->stream->txn = NULL; > + } > + assert(!in_txn() || msg->stream != NULL); > +} > + > static inline struct iproto_msg * > tx_accept_msg(struct cmsg *m) > { > struct iproto_msg *msg = (struct iproto_msg *) m; > tx_accept_wpos(msg->connection, &msg->wpos); > tx_fiber_init(msg->connection->session, msg->header.sync); > + tx_prepare_transaction_for_request(msg); > return msg; > } > > +static inline void > +tx_end_msg(struct iproto_msg *msg) > +{ > + if (msg->stream != NULL) assert(msg->stream->txn == NULL); > + msg->stream->txn = txn_detach(); > +} > + > /** > * Write error message to the output buffer and advance > * write position. Doesn't throw. > @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m) > > trigger_clear(&fiber_on_yield); > > + if (in_txn() != NULL && msg->header.stream_id == 0) { > + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); > + port_destroy(&port); > + goto error; > + } > + Please move this after the rc != 0 check below. > if (rc != 0) > goto error; > > diff --git a/src/box/txn.c b/src/box/txn.c > index b80e722a4..796ab4529 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event) > txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD); > return 0; > } > + > +struct txn * > +txn_detach(void) > +{ > + struct txn *txn = in_txn(); > + if (txn == NULL) > + return NULL; > + if (!txn_has_flag(txn, TXN_CAN_YIELD)) { > + txn_on_yield(NULL, NULL); > + trigger_clear(&txn->fiber_on_yield); > + } > + trigger_clear(&txn->fiber_on_stop); > + fiber_set_txn(fiber(), NULL); > + return txn; > +} > + > +void > +txn_attach(struct txn *txn) > +{ > + assert(txn != NULL); assert(!in_txn()); > + fiber_set_txn(fiber(), txn); > +}