[Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams
Vladimir Davydov
vdavydov at tarantool.org
Wed Aug 11 15:39:44 MSK 2021
On Wed, Aug 11, 2021 at 11:56:57AM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
>
> Implement interactive transactions over iproto streams. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. If any request fails during the
> transaction, it will not affect the other requests in the transaction.
> If disconnect occurs when there is some active transaction in stream,
> this transaction will be rollbacked, if it does not have time to commit
> before this moment.
>
> Part of #5860
>
> @TarantoolBot document
> Title: interactive transactions was implemented over iproto streams.
> The main purpose of streams is transactions via iproto. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. There are multiple ways to begin,
> commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
> with corresponding function (box.begin, box.commit and box.rollback),
> IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
> 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
> IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
> some active transaction in stream, this transaction will be rollbacked, if it
> does not have time to commit before this moment.
> Add new command codes for begin, commit and rollback transactions:
> `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
> `IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
The suggestion was to rename IPROTO_ROLLBACK along with IPROTO_CONFIRM
along with IPROTO_CONFIRM to something else (IPROTO_RAFT_ROLLBACK?
IPROTO_SYNC_ROLLBACK? Please discuss the name with TeamS) and use
IPROTO_ROLLBACK for interactive transactions.
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3b792130b..376abbff0 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -66,6 +66,7 @@
> #include "tt_static.h"
> #include "salad/stailq.h"
> #include "assoc.h"
> +#include "txn.h"
>
> enum {
> IPROTO_SALT_SIZE = 32,
> @@ -79,6 +80,8 @@ enum {
> struct iproto_connection;
>
> struct iproto_stream {
> + /** Currently active stream transaction or NULL */
> + struct txn *txn;
> /**
> * Queue of pending requests (iproto messages) for this stream,
> * processed sequentially. This field is accesable only from
> @@ -89,6 +92,11 @@ struct iproto_stream {
> uint64_t id;
> /** This stream connection */
> struct iproto_connection *connection;
> + /**
> + * Pre-allocated disconnect msg to gracefully rollback stream
> + * transaction and destroy stream object.
> + */
> + struct cmsg on_disconnect;
> };
>
> /**
> @@ -135,6 +143,10 @@ struct iproto_thread {
> /**
> * Static routes for this iproto thread
> */
> + struct cmsg_hop begin_route[2];
> + struct cmsg_hop commit_route[2];
> + struct cmsg_hop rollback_route[2];
> + struct cmsg_hop rollback_on_disconnect_stream_route[2];
I think _stream_ is rdundant here. Rollback only makes sense for
streams.
> struct cmsg_hop destroy_route[2];
> struct cmsg_hop disconnect_route[2];
> struct cmsg_hop misc_route[2];
> @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
> return NULL;
> }
> errinj_stream_count_add(1);
> + stream->txn = NULL;
> stailq_create(&stream->pending_requests);
> stream->id = stream_id;
> stream->connection = connection;
> return stream;
> }
>
> +static inline void
> +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream)
iproto_rollback_on_disconnect?
> +{
> + struct iproto_connection *conn = stream->connection;
> + struct iproto_thread *iproto_thread = conn->iproto_thread;
> + struct cmsg_hop *route =
> + iproto_thread->rollback_on_disconnect_stream_route;
> + cmsg_init(&stream->on_disconnect, route);
> + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
> +}
> +
> /**
> * Return true if we have not enough spare messages
> * in the message pool.
> @@ -670,6 +694,7 @@ static void
> iproto_stream_delete(struct iproto_stream *stream)
> {
> assert(stailq_empty(&stream->pending_requests));
> + assert(stream->txn == NULL);
> errinj_stream_count_add(-1);
> mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
> }
> @@ -716,6 +741,7 @@ static inline bool
> iproto_connection_is_idle(struct iproto_connection *con)
> {
> return con->long_poll_count == 0 &&
> + mh_size(con->streams) == 0 &&
Why do you need this? Stream-lined messages should already pin iproto
connection via ibuf. I tried to remove this and rerun the test you added
and got no failures.
> ibuf_used(&con->ibuf[0]) == 0 &&
> ibuf_used(&con->ibuf[1]) == 0;
> }
> @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> +/**
> + * Since the processing of requests within a transaction
> + * for a stream can occur in different fibers, we store
> + * a pointer to transaction in the stream structure.
> + * Check if message belongs to stream and there is active
> + * transaction for this stream. In case it is so, sets this
> + * transaction for current fiber.
> + */
> +static inline void
> +tx_prepare_transaction_for_request(struct iproto_msg *msg)
> +{
> + if (msg->stream != NULL && msg->stream->txn != NULL) {
> + txn_attach(msg->stream->txn);
> + msg->stream->txn = NULL;
> + }
> + assert(!in_txn() || msg->stream != NULL);
> +}
> +
> static inline struct iproto_msg *
> tx_accept_msg(struct cmsg *m)
> {
> struct iproto_msg *msg = (struct iproto_msg *) m;
> tx_accept_wpos(msg->connection, &msg->wpos);
> tx_fiber_init(msg->connection->session, msg->header.sync);
> + tx_prepare_transaction_for_request(msg);
> return msg;
> }
>
> +static inline void
> +tx_end_msg(struct iproto_msg *msg)
> +{
> + if (msg->stream != NULL)
assert(msg->stream->txn == NULL);
> + msg->stream->txn = txn_detach();
> +}
> +
> /**
> * Write error message to the output buffer and advance
> * write position. Doesn't throw.
> @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m)
>
> trigger_clear(&fiber_on_yield);
>
> + if (in_txn() != NULL && msg->header.stream_id == 0) {
> + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> + port_destroy(&port);
> + goto error;
> + }
> +
Please move this after the rc != 0 check below.
> if (rc != 0)
> goto error;
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index b80e722a4..796ab4529 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event)
> txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
> return 0;
> }
> +
> +struct txn *
> +txn_detach(void)
> +{
> + struct txn *txn = in_txn();
> + if (txn == NULL)
> + return NULL;
> + if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
> + txn_on_yield(NULL, NULL);
> + trigger_clear(&txn->fiber_on_yield);
> + }
> + trigger_clear(&txn->fiber_on_stop);
> + fiber_set_txn(fiber(), NULL);
> + return txn;
> +}
> +
> +void
> +txn_attach(struct txn *txn)
> +{
> + assert(txn != NULL);
assert(!in_txn());
> + fiber_set_txn(fiber(), txn);
> +}
More information about the Tarantool-patches
mailing list