From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org> To: mechanik20051988 <mechanik20051988@tarantool.org> Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org, mechanik20051988 <mechanik20.05.1988@gmail.com> Subject: Re: [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams Date: Wed, 11 Aug 2021 15:39:44 +0300 [thread overview] Message-ID: <20210811123944.grespqlur2nztyv3@esperanza> (raw) In-Reply-To: <06356e3ba5ad12dc98f0aeaa122af0fe022e8822.1628671235.git.mechanik20051988@tarantool.org> On Wed, Aug 11, 2021 at 11:56:57AM +0300, mechanik20051988 wrote: > From: mechanik20051988 <mechanik20.05.1988@gmail.com> > > Implement interactive transactions over iproto streams. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. If any request fails during the > transaction, it will not affect the other requests in the transaction. > If disconnect occurs when there is some active transaction in stream, > this transaction will be rollbacked, if it does not have time to commit > before this moment. > > Part of #5860 > > @TarantoolBot document > Title: interactive transactions was implemented over iproto streams. > The main purpose of streams is transactions via iproto. Each stream > can start its own transaction, so they allows multiplexing several > transactions over one connection. There are multiple ways to begin, > commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL > with corresponding function (box.begin, box.commit and box.rollback), > IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START', > 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT, > IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is > some active transaction in stream, this transaction will be rollbacked, if it > does not have time to commit before this moment. > Add new command codes for begin, commit and rollback transactions: > `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and > `IPROTO_TRANSACTION_ROLLBACK 16` accordingly. The suggestion was to rename IPROTO_ROLLBACK along with IPROTO_CONFIRM along with IPROTO_CONFIRM to something else (IPROTO_RAFT_ROLLBACK? IPROTO_SYNC_ROLLBACK? Please discuss the name with TeamS) and use IPROTO_ROLLBACK for interactive transactions. > diff --git a/src/box/iproto.cc b/src/box/iproto.cc > index 3b792130b..376abbff0 100644 > --- a/src/box/iproto.cc > +++ b/src/box/iproto.cc > @@ -66,6 +66,7 @@ > #include "tt_static.h" > #include "salad/stailq.h" > #include "assoc.h" > +#include "txn.h" > > enum { > IPROTO_SALT_SIZE = 32, > @@ -79,6 +80,8 @@ enum { > struct iproto_connection; > > struct iproto_stream { > + /** Currently active stream transaction or NULL */ > + struct txn *txn; > /** > * Queue of pending requests (iproto messages) for this stream, > * processed sequentially. This field is accesable only from > @@ -89,6 +92,11 @@ struct iproto_stream { > uint64_t id; > /** This stream connection */ > struct iproto_connection *connection; > + /** > + * Pre-allocated disconnect msg to gracefully rollback stream > + * transaction and destroy stream object. > + */ > + struct cmsg on_disconnect; > }; > > /** > @@ -135,6 +143,10 @@ struct iproto_thread { > /** > * Static routes for this iproto thread > */ > + struct cmsg_hop begin_route[2]; > + struct cmsg_hop commit_route[2]; > + struct cmsg_hop rollback_route[2]; > + struct cmsg_hop rollback_on_disconnect_stream_route[2]; I think _stream_ is rdundant here. Rollback only makes sense for streams. > struct cmsg_hop destroy_route[2]; > struct cmsg_hop disconnect_route[2]; > struct cmsg_hop misc_route[2]; > @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) > return NULL; > } > errinj_stream_count_add(1); > + stream->txn = NULL; > stailq_create(&stream->pending_requests); > stream->id = stream_id; > stream->connection = connection; > return stream; > } > > +static inline void > +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream) iproto_rollback_on_disconnect? > +{ > + struct iproto_connection *conn = stream->connection; > + struct iproto_thread *iproto_thread = conn->iproto_thread; > + struct cmsg_hop *route = > + iproto_thread->rollback_on_disconnect_stream_route; > + cmsg_init(&stream->on_disconnect, route); > + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); > +} > + > /** > * Return true if we have not enough spare messages > * in the message pool. > @@ -670,6 +694,7 @@ static void > iproto_stream_delete(struct iproto_stream *stream) > { > assert(stailq_empty(&stream->pending_requests)); > + assert(stream->txn == NULL); > errinj_stream_count_add(-1); > mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); > } > @@ -716,6 +741,7 @@ static inline bool > iproto_connection_is_idle(struct iproto_connection *con) > { > return con->long_poll_count == 0 && > + mh_size(con->streams) == 0 && Why do you need this? Stream-lined messages should already pin iproto connection via ibuf. I tried to remove this and rerun the test you added and got no failures. > ibuf_used(&con->ibuf[0]) == 0 && > ibuf_used(&con->ibuf[1]) == 0; > } > @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) > } > } > > +/** > + * Since the processing of requests within a transaction > + * for a stream can occur in different fibers, we store > + * a pointer to transaction in the stream structure. > + * Check if message belongs to stream and there is active > + * transaction for this stream. In case it is so, sets this > + * transaction for current fiber. > + */ > +static inline void > +tx_prepare_transaction_for_request(struct iproto_msg *msg) > +{ > + if (msg->stream != NULL && msg->stream->txn != NULL) { > + txn_attach(msg->stream->txn); > + msg->stream->txn = NULL; > + } > + assert(!in_txn() || msg->stream != NULL); > +} > + > static inline struct iproto_msg * > tx_accept_msg(struct cmsg *m) > { > struct iproto_msg *msg = (struct iproto_msg *) m; > tx_accept_wpos(msg->connection, &msg->wpos); > tx_fiber_init(msg->connection->session, msg->header.sync); > + tx_prepare_transaction_for_request(msg); > return msg; > } > > +static inline void > +tx_end_msg(struct iproto_msg *msg) > +{ > + if (msg->stream != NULL) assert(msg->stream->txn == NULL); > + msg->stream->txn = txn_detach(); > +} > + > /** > * Write error message to the output buffer and advance > * write position. Doesn't throw. > @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m) > > trigger_clear(&fiber_on_yield); > > + if (in_txn() != NULL && msg->header.stream_id == 0) { > + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); > + port_destroy(&port); > + goto error; > + } > + Please move this after the rc != 0 check below. > if (rc != 0) > goto error; > > diff --git a/src/box/txn.c b/src/box/txn.c > index b80e722a4..796ab4529 100644 > --- a/src/box/txn.c > +++ b/src/box/txn.c > @@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event) > txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD); > return 0; > } > + > +struct txn * > +txn_detach(void) > +{ > + struct txn *txn = in_txn(); > + if (txn == NULL) > + return NULL; > + if (!txn_has_flag(txn, TXN_CAN_YIELD)) { > + txn_on_yield(NULL, NULL); > + trigger_clear(&txn->fiber_on_yield); > + } > + trigger_clear(&txn->fiber_on_stop); > + fiber_set_txn(fiber(), NULL); > + return txn; > +} > + > +void > +txn_attach(struct txn *txn) > +{ > + assert(txn != NULL); assert(!in_txn()); > + fiber_set_txn(fiber(), txn); > +}
next prev parent reply other threads:[~2021-08-11 12:39 UTC|newest] Thread overview: 14+ messages / expand[flat|nested] mbox.gz Atom feed top 2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement " mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches 2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches 2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches 2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches 2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches [this message] 2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches 2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20210811123944.grespqlur2nztyv3@esperanza \ --to=tarantool-patches@dev.tarantool.org \ --cc=mechanik20.05.1988@gmail.com \ --cc=mechanik20051988@tarantool.org \ --cc=v.shpilevoy@tarantool.org \ --cc=vdavydov@tarantool.org \ --subject='Re: [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox