Tarantool development patches archive
 help / color / mirror / Atom feed
From: Vladimir Davydov via Tarantool-patches <tarantool-patches@dev.tarantool.org>
To: mechanik20051988 <mechanik20051988@tarantool.org>
Cc: v.shpilevoy@tarantool.org, tarantool-patches@dev.tarantool.org,
	mechanik20051988 <mechanik20.05.1988@gmail.com>
Subject: Re: [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams
Date: Fri, 6 Aug 2021 15:59:30 +0300	[thread overview]
Message-ID: <20210806125930.h4u3zpbrxbapwlfz@esperanza> (raw)
In-Reply-To: <06bcadd06bca14d32391e5612427b7b3f9084d26.1628184138.git.mechanik20.05.1988@gmail.com>

On Thu, Aug 05, 2021 at 09:17:44PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988@gmail.com>
> 
> Implement interactive transactions over iproto streams. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. If any request fails during the
> transaction, it will not affect the other requests in the transaction.
> If disconnect occurs when there is some active transaction in stream,
> this transaction will be rollbacked, if it does not have time to commit
> before this moment.
> 
> Part of #5860
> 
> @TarantoolBot document
> Title: interactive transactions was implemented over iproto streams.
> The main purpose of streams is transactions via iproto. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. There are multiple ways to begin,
> commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
> with corresponding function (box.begin, box.commit and box.rollback),
> IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
> 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
> IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
> some active transaction in stream, this transaction will be rollbacked, if it
> does not have time to commit before this moment.
> Add new command codes for begin, commit and rollback transactions:
> `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
> `IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
> ---
>  src/box/call.c                        |  10 +-
>  src/box/errcode.h                     |   1 +
>  src/box/iproto.cc                     | 249 +++++++++++++++++++++++++-
>  src/box/iproto_constants.c            |   6 +
>  src/box/iproto_constants.h            |  10 +-
>  test/box-tap/feedback_daemon.test.lua |   2 +-
>  test/box/error.result                 |   1 +
>  test/box/misc.result                  |   5 +-
>  8 files changed, 274 insertions(+), 10 deletions(-)
> 
> diff --git a/src/box/call.c b/src/box/call.c
> index a6384efe2..9ba0fa9ac 100644
> --- a/src/box/call.c
> +++ b/src/box/call.c
> @@ -141,8 +141,9 @@ box_process_call(struct call_request *request, struct port *port)
>  	const char *name = request->name;
>  	assert(name != NULL);
>  	uint32_t name_len = mp_decode_strl(&name);
> -	/* Transaction is not started. */
> -	assert(!in_txn());
> +	uint64_t stream_id = request->header->stream_id;
> +	/* Transaction is not started for not stream requests. */
> +	assert(stream_id != 0 || !in_txn());
>  
>  	int rc;
>  	struct port args;
> @@ -157,7 +158,7 @@ box_process_call(struct call_request *request, struct port *port)
>  	}
>  	if (rc != 0)
>  		return -1;
> -	if (in_txn() != NULL) {
> +	if (in_txn() != NULL && stream_id == 0) {
>  		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
>  		port_destroy(port);
>  		return -1;
> @@ -177,9 +178,10 @@ box_process_eval(struct call_request *request, struct port *port)
>  			    request->args_end - request->args);
>  	const char *expr = request->expr;
>  	uint32_t expr_len = mp_decode_strl(&expr);
> +	uint64_t stream_id = request->header->stream_id;
>  	if (box_lua_eval(expr, expr_len, &args, port) != 0)
>  		return -1;
> -	if (in_txn() != 0) {
> +	if (in_txn() != 0 && stream_id == 0) {
>  		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
>  		port_destroy(port);
>  		return -1;
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index 6c8c00256..e76fd6442 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -279,6 +279,7 @@ struct errcode_record {
>  	/*224 */_(ER_ELECTION_DISABLED,		"Elections were turned off")\
>  	/*225 */_(ER_TXN_ROLLBACK,		"Transaction was rolled back") \
>  	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \
> +	/*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process this type (%u) of requests out of stream") \

The same comment as for ER_UNABLE_TO_PROCESS_IN_STREAM: I think we
should report a human-readable request type name, not just an id.

>  
>  /*
>   * !IMPORTANT! Please follow instructions at start of the file
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3642cbd02..37715ab7f 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -65,6 +65,7 @@
>  #include "tt_static.h"
>  #include "salad/stailq.h"
>  #include "assoc.h"
> +#include "txn.h"
>  
>  enum {
>  	IPROTO_SALT_SIZE = 32,
> @@ -78,6 +79,8 @@ enum {
>  struct iproto_connection;
>  
>  struct iproto_stream {
> +	/** Currently active stream transaction or NULL */
> +	struct txn *txn;
>  	/**
>  	 * Queue of pending requests (iproto messages) for this stream,
>  	 * processed sequentially. This field is accesable only from
> @@ -88,6 +91,8 @@ struct iproto_stream {
>  	uint64_t id;
>  	/** This stream connection */
>  	struct iproto_connection *connection;
> +	/** Pre-allocated disconnect msg to gracefully destroy stream */
> +	struct cmsg on_disconnect;
>  };
>  
>  /**
> @@ -134,6 +139,10 @@ struct iproto_thread {
>  	/**
>  	 * Static routes for this iproto thread
>  	 */
> +	struct cmsg_hop begin_route[2];
> +	struct cmsg_hop commit_route[2];
> +	struct cmsg_hop rollback_route[2];
> +	struct cmsg_hop destroy_stream_on_disconnect_route[2];
>  	struct cmsg_hop destroy_route[2];
>  	struct cmsg_hop disconnect_route[2];
>  	struct cmsg_hop misc_route[2];
> @@ -622,12 +631,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
>  		return NULL;
>  	}
>  	errinj_stream_count_add(1);
> +	stream->txn = NULL;
>  	stailq_create(&stream->pending_requests);
>  	stream->id = stream_id;
>  	stream->connection = connection;
>  	return stream;
>  }
>  
> +static inline void
> +iproto_stream_push_on_disconnect(struct iproto_stream *stream)
> +{
> +	struct iproto_connection *conn = stream->connection;
> +	struct iproto_thread *iproto_thread = conn->iproto_thread;
> +	struct cmsg_hop *route =
> +		iproto_thread->destroy_stream_on_disconnect_route;
> +	cmsg_init(&stream->on_disconnect, route);
> +	cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
> +}
> +
>  /**
>   * Return true if we have not enough spare messages
>   * in the message pool.
> @@ -651,6 +672,7 @@ static void
>  iproto_stream_delete(struct iproto_stream *stream)
>  {
>  	assert(stailq_empty(&stream->pending_requests));
> +	assert(stream->txn == NULL);
>  	errinj_stream_count_add(-1);
>  	mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
>  }
> @@ -697,6 +719,7 @@ static inline bool
>  iproto_connection_is_idle(struct iproto_connection *con)
>  {
>  	return con->long_poll_count == 0 &&
> +	       mh_size(con->streams) == 0 &&

Why? A message in a stream should pin the input buffer, shouldn't it?

>  	       ibuf_used(&con->ibuf[0]) == 0 &&
>  	       ibuf_used(&con->ibuf[1]) == 0;
>  }
> @@ -786,6 +809,23 @@ iproto_connection_close(struct iproto_connection *con)
>  		 * is done only once.
>  		 */
>  		con->p_ibuf->wpos -= con->parse_size;
> +		mh_int_t node;
> +		mh_foreach(con->streams, node) {
> +			struct iproto_stream *stream = (struct iproto_stream *)
> +				mh_i64ptr_node(con->streams, node)->val;
> +			/**
> +			 * If stream requests queue is empty, it means that
> +			 * that there is some active transaction which was
> +			 * not commited yet. We need to rollback it, since
> +			 * we push on_disconnect message to tx thread here.
> +			 * If stream requests queue is not empty, it means
> +			 * that stream processing some request in tx thread
> +			 * now. We destroy stream in `net_send_msg` after
> +			 * processing all requests.
> +			 */
> +			if (stailq_empty(&stream->pending_requests))
> +				iproto_stream_push_on_disconnect(stream);
> +		}

This should be done by disconnect_msg. There shouldn't be a separate
route for destroying unfinished transactions.

>  		cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
>  		assert(con->state == IPROTO_CONNECTION_ALIVE);
>  		con->state = IPROTO_CONNECTION_CLOSED;
> @@ -946,6 +986,7 @@ iproto_msg_set_stream(struct iproto_msg *msg)
>  	 */
>  	errinj_stream_msg_count_add(1);
>  	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
> +	assert(stream != NULL);
>  	msg->stream = stream;
>  	/*
>  	 * If the request queue in the stream is not empty, it means
> @@ -1388,6 +1429,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  	uint64_t stream_id;
>  	uint8_t type;
>  	bool request_is_not_for_stream;
> +	bool request_is_only_for_stream;
>  	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
>  
>  	if (xrow_header_decode(&msg->header, pos, reqend, true))
> @@ -1399,11 +1441,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  	request_is_not_for_stream =
>  		((type > IPROTO_TYPE_STAT_MAX &&
>  		 type != IPROTO_PING) || type == IPROTO_AUTH);
> +	request_is_only_for_stream =
> +		(type == IPROTO_TRANSACTION_BEGIN ||
> +		 type == IPROTO_TRANSACTION_COMMIT ||
> +		 type == IPROTO_TRANSACTION_ROLLBACK);
>  
>  	if (stream_id != 0 && request_is_not_for_stream) {
>  		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
>  			 (uint32_t) type);
>  		goto error;
> +	} else if (stream_id == 0 && request_is_only_for_stream) {
> +		diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
> +			 (uint32_t) type);
> +		goto error;
>  	}
>  
>  	/*
> @@ -1418,6 +1468,9 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
>  	case IPROTO_UPDATE:
>  	case IPROTO_DELETE:
>  	case IPROTO_UPSERT:
> +	case IPROTO_TRANSACTION_BEGIN:
> +	case IPROTO_TRANSACTION_COMMIT:
> +	case IPROTO_TRANSACTION_ROLLBACK:
>  		if (xrow_decode_dml(&msg->header, &msg->dml,
>  				    dml_request_key_map(type)))

Why do you need to decode a body for these messages?
AFAIU they don't have a body.

>  			goto error;
> @@ -1498,6 +1551,37 @@ tx_fiber_init(struct session *session, uint64_t sync)
>  	fiber_set_user(f, &session->credentials);
>  }
>  
> +static void
> +tx_process_destroy_stream_on_disconnect(struct cmsg *m)
> +{
> +	struct iproto_stream *stream =
> +		container_of(m, struct iproto_stream,
> +			     on_disconnect);
> +
> +	if (stream->txn != NULL) {
> +		tx_fiber_init(stream->connection->session, 0);
> +		fiber_set_txn(fiber(), stream->txn);
> +		if (box_txn_rollback() != 0)
> +			panic("failed to rollback transaction on disconnect");
> +		stream->txn = NULL;
> +	}
> +}
> +
> +static void
> +net_finish_destroy_stream_on_disconnect(struct cmsg *m)
> +{
> +	struct iproto_stream *stream =
> +		container_of(m, struct iproto_stream,
> +			     on_disconnect);
> +	struct iproto_connection *con = stream->connection;
> +
> +	struct mh_i64ptr_node_t node = { stream->id, NULL };
> +	mh_i64ptr_remove(con->streams, &node, 0);
> +	iproto_stream_delete(stream);
> +	assert(!evio_has_fd(&con->input));
> +	if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
> +		iproto_connection_try_to_start_destroy(con);
> +}
>  
>  static void
>  tx_process_disconnect(struct cmsg *m)
> @@ -1632,15 +1716,53 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
>  	}
>  }
>  
> +/**
> + * Since the processing of requests within a transaction
> + * for a stream can occur in different fibers, we store
> + * a pointer to transaction in the stream structure.
> + * Check if message belongs to stream and there is active
> + * transaction for this stream. In case it is so, sets this
> + * transaction for current fiber.
> + */
> +static inline void
> +tx_prepare_transaction_for_request(struct iproto_msg *msg)

The function is used in just one place. Should be fine to inline it.

> +{
> +	if (msg->stream != NULL) {
> +		struct txn *txn = msg->stream->txn;
> +		/*
> +		 * When we do any operations (which are written to `wal`)
> +		 * outside of transaction, we consider each such operation
> +		 * as a small transaction and write it to `wal` immediately.
> +		 * When operation is performed as part of transaction, we
> +		 * write all transaction to `wal` at the commit. In this case,
> +		 * `is_commit` flag will be set when writing to `wal` for the
> +		 * last operation in transaction, the rest operations must have
> +		 * this flag set to false, to mark that they all belongs to the
> +		 * same transaction.
> +		 */
> +		if (txn != NULL)
> +			msg->header.is_commit = false;

AFAIU this flag is used only when a statement is written to WAL.
Please remove.

> +		fiber_set_txn(fiber(), txn);

I would clear stream->txn here: a call() may start a new transaction, in
which case stream->txn will refer to feed memory, which is error-prone.
You wouldn't need to clear stream->txn after commit and rollback if you
cleared it here.

> +	}
> +}
> +
>  static inline struct iproto_msg *
>  tx_accept_msg(struct cmsg *m)
>  {
>  	struct iproto_msg *msg = (struct iproto_msg *) m;
>  	tx_accept_wpos(msg->connection, &msg->wpos);
>  	tx_fiber_init(msg->connection->session, msg->header.sync);
> +	tx_prepare_transaction_for_request(msg);
>  	return msg;
>  }
>  
> +static inline void
> +tx_end_msg(struct iproto_msg *msg)
> +{
> +	if (msg->stream != NULL)
> +		msg->stream->txn = txn_detach();
> +}
> +
>  /**
>   * Write error message to the output buffer and advance
>   * write position. Doesn't throw.
> @@ -1666,6 +1788,7 @@ tx_reply_iproto_error(struct cmsg *m)
>  	iproto_reply_error(out, diag_last_error(&msg->diag),
>  			   msg->header.sync, ::schema_version);
>  	iproto_wpos_create(&msg->wpos, out);
> +	tx_end_msg(msg);
>  }
>  
>  /** Inject a short delay on tx request processing for testing. */
> @@ -1678,6 +1801,79 @@ tx_inject_delay(void)
>  	});
>  }
>  
> +static void
> +tx_process_begin(struct cmsg *m)
> +{
> +	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct obuf *out;
> +	struct iproto_stream *stream = msg->stream;
> +
> +	if (tx_check_schema(msg->header.schema_version))
> +		goto error;
> +
> +	if (box_txn_begin() != 0)
> +		goto error;
> +
> +	stream->txn = txn_detach();

Can you always do it in just one place - tx_end_msg?

> +	assert(stream->txn != NULL);
> +
> +	out = msg->connection->tx.p_obuf;
> +	iproto_reply_ok(out, msg->header.sync, ::schema_version);
> +	iproto_wpos_create(&msg->wpos, out);
> +	return;
> +error:
> +	tx_reply_error(msg);
> +	tx_end_msg(msg);

It looks inconsistent that sometimes we call tx_end_msg after processing
a request, sometimes we don't. Let's call tx_end_msg after processing
every message.

Also, I think it would be better if we raised an error in tx_end_msg if
tx is active, but stream is unset. Then we wouldn't need to check
stream_id in box_process_call/eval.

> +}
> +
> +static void
> +tx_process_commit(struct cmsg *m)
> +{
> +	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct obuf *out;
> +	struct iproto_stream *stream = msg->stream;
> +
> +	if (tx_check_schema(msg->header.schema_version))
> +		goto error;
> +
> +	if (box_txn_commit() != 0) {
> +		stream->txn = in_txn();
> +		goto error;
> +	}
> +
> +	stream->txn = NULL;
> +	out = msg->connection->tx.p_obuf;
> +	iproto_reply_ok(out, msg->header.sync, ::schema_version);
> +	iproto_wpos_create(&msg->wpos, out);
> +	return;
> +error:
> +	tx_reply_error(msg);
> +	tx_end_msg(msg);
> +}
> +
> +static void
> +tx_process_rollback(struct cmsg *m)
> +{
> +	struct iproto_msg *msg = tx_accept_msg(m);
> +	struct obuf *out;
> +	struct iproto_stream *stream = msg->stream;
> +
> +	if (tx_check_schema(msg->header.schema_version))
> +		goto error;
> +
> +	if (box_txn_rollback() != 0)
> +		goto error;
> +
> +	stream->txn = NULL;
> +	out = msg->connection->tx.p_obuf;
> +	iproto_reply_ok(out, msg->header.sync, ::schema_version);
> +	iproto_wpos_create(&msg->wpos, out);
> +	return;
> +error:
> +	tx_reply_error(msg);
> +	tx_end_msg(msg);
> +}
> +
>  static void
>  tx_process1(struct cmsg *m)
>  {
> @@ -1699,9 +1895,11 @@ tx_process1(struct cmsg *m)
>  	iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
>  			    tuple != 0);
>  	iproto_wpos_create(&msg->wpos, out);
> +	tx_end_msg(msg);
>  	return;
>  error:
>  	tx_reply_error(msg);
> +	tx_end_msg(msg);
>  }
>  
>  static void
> @@ -1742,9 +1940,11 @@ tx_process_select(struct cmsg *m)
>  	iproto_reply_select(out, &svp, msg->header.sync,
>  			    ::schema_version, count);
>  	iproto_wpos_create(&msg->wpos, out);
> +	tx_end_msg(msg);
>  	return;
>  error:
>  	tx_reply_error(msg);
> +	tx_end_msg(msg);
>  }
>  
>  static int
> @@ -1829,11 +2029,13 @@ tx_process_call(struct cmsg *m)
>  		goto error;
>  	}
>  
> +	tx_end_msg(msg);
>  	iproto_reply_select(out, &svp, msg->header.sync,
>  			    ::schema_version, count);
>  	iproto_wpos_create(&msg->wpos, out);
>  	return;
>  error:
> +	tx_end_msg(msg);
>  	tx_reply_error(msg);
>  }
>  
> @@ -1843,6 +2045,7 @@ tx_process_misc(struct cmsg *m)
>  	struct iproto_msg *msg = tx_accept_msg(m);
>  	struct iproto_connection *con = msg->connection;
>  	struct obuf *out = con->tx.p_obuf;
> +	assert(!(msg->header.type != IPROTO_PING && in_txn()));
>  	if (tx_check_schema(msg->header.schema_version))
>  		goto error;
>  
> @@ -1875,9 +2078,11 @@ tx_process_misc(struct cmsg *m)
>  	} catch (Exception *e) {
>  		tx_reply_error(msg);
>  	}
> +	tx_end_msg(msg);
>  	return;
>  error:
>  	tx_reply_error(msg);
> +	tx_end_msg(msg);
>  }
>  
>  static void
> @@ -1969,10 +2174,12 @@ tx_process_sql(struct cmsg *m)
>  		goto error;
>  	}
>  	port_destroy(&port);
> +	tx_end_msg(msg);
>  	iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
>  	iproto_wpos_create(&msg->wpos, out);
>  	return;

Sometimes you call tx_end_msg after writing to obuf, sometimes before.
Please be consistent.

>  error:
> +	tx_end_msg(msg);
>  	tx_reply_error(msg);
>  }
>  
> @@ -1983,6 +2190,7 @@ tx_process_replication(struct cmsg *m)
>  	struct iproto_connection *con = msg->connection;
>  	struct ev_io io;
>  	coio_create(&io, con->input.fd);
> +	assert(!in_txn());
>  	try {
>  		switch (msg->header.type) {
>  		case IPROTO_JOIN:
> @@ -2043,9 +2251,24 @@ net_send_msg(struct cmsg *m)
>  	errinj_stream_msg_count_add(-1);
>  
>  	if (stailq_empty(&stream->pending_requests)) {
> -		struct mh_i64ptr_node_t node = { stream->id, NULL };
> -		mh_i64ptr_remove(con->streams, &node, 0);
> -		iproto_stream_delete(stream);
> +		/*
> +		 * If no more messages for the current stream
> +		 * and no transaction started, then delete it.
> +		 */
> +		if (stream->txn == NULL) {
> +			struct mh_i64ptr_node_t node = { stream->id, NULL };
> +			mh_i64ptr_remove(con->streams, &node, 0);
> +			iproto_stream_delete(stream);
> +		} else if (!evio_has_fd(&con->input)) {
> +			/*
> +			 * Here we are in case when connection was closed,
> +			 * there is no messages in stream queue, but there
> +			 * is some active transaction in stream.
> +			 * Send disconnect message to rollback this
> +			 * transaction.
> +			 */
> +			iproto_stream_push_on_disconnect(stream);
> +		}

Should be done by disconnect_msg.

>  	} else {
>  		/*
>  		 * If there are new messages for this stream
> @@ -2374,6 +2597,23 @@ iproto_session_push(struct session *session, struct port *port)
>  static inline void
>  iproto_thread_init_routes(struct iproto_thread *iproto_thread)
>  {
> +	iproto_thread->begin_route[0] =
> +		{ tx_process_begin, &iproto_thread->net_pipe };
> +	iproto_thread->begin_route[1] =
> +		{ net_send_msg, NULL };
> +	iproto_thread->commit_route[0] =
> +		{ tx_process_commit, &iproto_thread->net_pipe };
> +	iproto_thread->commit_route[1] =
> +		{ net_send_msg, NULL };
> +	iproto_thread->rollback_route[0] =
> +		{ tx_process_rollback, &iproto_thread->net_pipe };
> +	iproto_thread->rollback_route[1] =
> +		{ net_send_msg, NULL };
> +	iproto_thread->destroy_stream_on_disconnect_route[0] =
> +		{ tx_process_destroy_stream_on_disconnect,
> +		  &iproto_thread->net_pipe };
> +	iproto_thread->destroy_stream_on_disconnect_route[1] =
> +		{ net_finish_destroy_stream_on_disconnect, NULL };
>  	iproto_thread->destroy_route[0] =
>  		{ tx_process_destroy, &iproto_thread->net_pipe };
>  	iproto_thread->destroy_route[1] =
> @@ -2437,6 +2677,9 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
>  	iproto_thread->dml_route[12] = NULL;
>  	/* IPROTO_PREPARE */
>  	iproto_thread->dml_route[13] = iproto_thread->sql_route;
> +	iproto_thread->dml_route[14] = iproto_thread->begin_route;
> +	iproto_thread->dml_route[15] = iproto_thread->commit_route;
> +	iproto_thread->dml_route[16] = iproto_thread->rollback_route;
>  	iproto_thread->connect_route[0] =
>  		{ tx_process_connect, &iproto_thread->net_pipe };
>  	iproto_thread->connect_route[1] = { net_send_greeting, NULL };
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index f2902946a..913a64de5 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
>  	"EXECUTE",
>  	NULL, /* NOP */
>  	"PREPARE",
> +	"BEGIN",
> +	"COMMIT",
> +	"ROLLBACK",
>  };
>  
>  #define bit(c) (1ULL<<IPROTO_##c)
> @@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
>  	0,                                                     /* EXECUTE */
>  	0,                                                     /* NOP */
>  	0,                                                     /* PREPARE */
> +	0,                                                     /* BEGIN */
> +	0,                                                     /* COMMIT */
> +	0,                                                     /* ROLLBACK */
>  };
>  #undef bit
>  
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 59e8574f3..d8b242f1a 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -237,6 +237,12 @@ enum iproto_type {
>  	IPROTO_NOP = 12,
>  	/** Prepare SQL statement. */
>  	IPROTO_PREPARE = 13,
> +	/* Begin transaction */
> +	IPROTO_TRANSACTION_BEGIN = 14,
> +	/* Commit transaction */
> +	IPROTO_TRANSACTION_COMMIT = 15,
> +	/* Rollback transaction */
> +	IPROTO_TRANSACTION_ROLLBACK = 16,

Why not simply IPROTO_BEGIN/COMMIT/ROLLBACK?

>  	/** The maximum typecode used for box.stat() */
>  	IPROTO_TYPE_STAT_MAX,
>  
> @@ -345,7 +351,9 @@ static inline bool
>  iproto_type_is_dml(uint16_t type)
>  {
>  	return (type >= IPROTO_SELECT && type <= IPROTO_DELETE) ||
> -		type == IPROTO_UPSERT || type == IPROTO_NOP;
> +		type == IPROTO_UPSERT || type == IPROTO_NOP ||
> +		(type >= IPROTO_TRANSACTION_BEGIN &&
> +		 type <= IPROTO_TRANSACTION_ROLLBACK);

BEGIN, COMMIT, ROLLBACK are not DML statements. Please remove.

  reply	other threads:[~2021-08-06 12:59 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-08-05 18:17 [Tarantool-patches] [PATCH 0/7] implement " mechanik20051988 via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 1/7] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
2021-08-06  8:20   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 2/7] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
2021-08-06  8:33   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 3/7] txn: detach transaction from fiber mechanik20051988 via Tarantool-patches
2021-08-06  8:51   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 4/7] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
2021-08-06 10:30   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 5/7] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-06 12:03   ` Vladimir Davydov via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-06 12:59   ` Vladimir Davydov via Tarantool-patches [this message]
2021-08-09 10:39     ` Vladimir Davydov via Tarantool-patches
2021-08-09 10:40       ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Vladimir Davydov via Tarantool-patches
2021-08-09 10:40         ` [Tarantool-patches] [PATCH 2/2] iproto: clear request::header for client requests Vladimir Davydov via Tarantool-patches
2021-08-09 11:27           ` Evgeny Mekhanik via Tarantool-patches
2021-08-09 11:26         ` [Tarantool-patches] [PATCH 1/2] xrow: remove unused call_request::header Evgeny Mekhanik via Tarantool-patches
2021-08-05 18:17 ` [Tarantool-patches] [PATCH 7/7] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
2021-08-06 14:04   ` Vladimir Davydov via Tarantool-patches

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20210806125930.h4u3zpbrxbapwlfz@esperanza \
    --to=tarantool-patches@dev.tarantool.org \
    --cc=mechanik20.05.1988@gmail.com \
    --cc=mechanik20051988@tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --cc=vdavydov@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox