[Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams

Vladimir Davydov vdavydov at tarantool.org
Wed Aug 11 15:39:44 MSK 2021


On Wed, Aug 11, 2021 at 11:56:57AM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
> 
> Implement interactive transactions over iproto streams. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. If any request fails during the
> transaction, it will not affect the other requests in the transaction.
> If disconnect occurs when there is some active transaction in stream,
> this transaction will be rollbacked, if it does not have time to commit
> before this moment.
> 
> Part of #5860
> 
> @TarantoolBot document
> Title: interactive transactions was implemented over iproto streams.
> The main purpose of streams is transactions via iproto. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. There are multiple ways to begin,
> commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
> with corresponding function (box.begin, box.commit and box.rollback),
> IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
> 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
> IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
> some active transaction in stream, this transaction will be rollbacked, if it
> does not have time to commit before this moment.
> Add new command codes for begin, commit and rollback transactions:
> `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
> `IPROTO_TRANSACTION_ROLLBACK 16` accordingly.

The suggestion was to rename IPROTO_ROLLBACK along with IPROTO_CONFIRM
along with IPROTO_CONFIRM to something else (IPROTO_RAFT_ROLLBACK?
IPROTO_SYNC_ROLLBACK? Please discuss the name with TeamS) and use
IPROTO_ROLLBACK for interactive transactions.

> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3b792130b..376abbff0 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -66,6 +66,7 @@
>  #include "tt_static.h"
>  #include "salad/stailq.h"
>  #include "assoc.h"
> +#include "txn.h"
>  
>  enum {
>  	IPROTO_SALT_SIZE = 32,
> @@ -79,6 +80,8 @@ enum {
>  struct iproto_connection;
>  
>  struct iproto_stream {
> +	/** Currently active stream transaction or NULL */
> +	struct txn *txn;
>  	/**
>  	 * Queue of pending requests (iproto messages) for this stream,
>  	 * processed sequentially. This field is accesable only from
> @@ -89,6 +92,11 @@ struct iproto_stream {
>  	uint64_t id;
>  	/** This stream connection */
>  	struct iproto_connection *connection;
> +	/**
> +	 * Pre-allocated disconnect msg to gracefully rollback stream
> +	 * transaction and destroy stream object.
> +	 */
> +	struct cmsg on_disconnect;
>  };
>  
>  /**
> @@ -135,6 +143,10 @@ struct iproto_thread {
>  	/**
>  	 * Static routes for this iproto thread
>  	 */
> +	struct cmsg_hop begin_route[2];
> +	struct cmsg_hop commit_route[2];
> +	struct cmsg_hop rollback_route[2];
> +	struct cmsg_hop rollback_on_disconnect_stream_route[2];

I think _stream_ is rdundant here. Rollback only makes sense for
streams.

>  	struct cmsg_hop destroy_route[2];
>  	struct cmsg_hop disconnect_route[2];
>  	struct cmsg_hop misc_route[2];
> @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
>  		return NULL;
>  	}
>  	errinj_stream_count_add(1);
> +	stream->txn = NULL;
>  	stailq_create(&stream->pending_requests);
>  	stream->id = stream_id;
>  	stream->connection = connection;
>  	return stream;
>  }
>  
> +static inline void
> +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream)

iproto_rollback_on_disconnect?

> +{
> +	struct iproto_connection *conn = stream->connection;
> +	struct iproto_thread *iproto_thread = conn->iproto_thread;
> +	struct cmsg_hop *route =
> +		iproto_thread->rollback_on_disconnect_stream_route;
> +	cmsg_init(&stream->on_disconnect, route);
> +	cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
> +}
> +
>  /**
>   * Return true if we have not enough spare messages
>   * in the message pool.
> @@ -670,6 +694,7 @@ static void
>  iproto_stream_delete(struct iproto_stream *stream)
>  {
>  	assert(stailq_empty(&stream->pending_requests));
> +	assert(stream->txn == NULL);
>  	errinj_stream_count_add(-1);
>  	mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
>  }
> @@ -716,6 +741,7 @@ static inline bool
>  iproto_connection_is_idle(struct iproto_connection *con)
>  {
>  	return con->long_poll_count == 0 &&
> +	       mh_size(con->streams) == 0 &&

Why do you need this? Stream-lined messages should already pin iproto
connection via ibuf. I tried to remove this and rerun the test you added
and got no failures.

>  	       ibuf_used(&con->ibuf[0]) == 0 &&
>  	       ibuf_used(&con->ibuf[1]) == 0;
>  }
> @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>  	}
>  }
>  
> +/**
> + * Since the processing of requests within a transaction
> + * for a stream can occur in different fibers, we store
> + * a pointer to transaction in the stream structure.
> + * Check if message belongs to stream and there is active
> + * transaction for this stream. In case it is so, sets this
> + * transaction for current fiber.
> + */
> +static inline void
> +tx_prepare_transaction_for_request(struct iproto_msg *msg)
> +{
> +	if (msg->stream != NULL && msg->stream->txn != NULL) {
> +		txn_attach(msg->stream->txn);
> +		msg->stream->txn = NULL;
> +	}
> +	assert(!in_txn() || msg->stream != NULL);
> +}
> +
>  static inline struct iproto_msg *
>  tx_accept_msg(struct cmsg *m)
>  {
>  	struct iproto_msg *msg = (struct iproto_msg *) m;
>  	tx_accept_wpos(msg->connection, &msg->wpos);
>  	tx_fiber_init(msg->connection->session, msg->header.sync);
> +	tx_prepare_transaction_for_request(msg);
>  	return msg;
>  }
>  
> +static inline void
> +tx_end_msg(struct iproto_msg *msg)
> +{
> +	if (msg->stream != NULL)

assert(msg->stream->txn == NULL);

> +		msg->stream->txn = txn_detach();
> +}
> +
>  /**
>   * Write error message to the output buffer and advance
>   * write position. Doesn't throw.
> @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m)
>  
>  	trigger_clear(&fiber_on_yield);
>  
> +	if (in_txn() != NULL && msg->header.stream_id == 0) {
> +		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> +		port_destroy(&port);
> +		goto error;
> +	}
> +

Please move this after the rc != 0 check below.

>  	if (rc != 0)
>  		goto error;
>  
> diff --git a/src/box/txn.c b/src/box/txn.c
> index b80e722a4..796ab4529 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event)
>  	txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
>  	return 0;
>  }
> +
> +struct txn *
> +txn_detach(void)
> +{
> +	struct txn *txn = in_txn();
> +	if (txn == NULL)
> +		return NULL;
> +	if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
> +		txn_on_yield(NULL, NULL);
> +		trigger_clear(&txn->fiber_on_yield);
> +	}
> +	trigger_clear(&txn->fiber_on_stop);
> +	fiber_set_txn(fiber(), NULL);
> +	return txn;
> +}
> +
> +void
> +txn_attach(struct txn *txn)
> +{
> +	assert(txn != NULL);

assert(!in_txn());

> +	fiber_set_txn(fiber(), txn);
> +}


More information about the Tarantool-patches mailing list