[Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams
Vladimir Davydov
vdavydov at tarantool.org
Fri Aug 6 15:59:30 MSK 2021
On Thu, Aug 05, 2021 at 09:17:44PM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
>
> Implement interactive transactions over iproto streams. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. If any request fails during the
> transaction, it will not affect the other requests in the transaction.
> If disconnect occurs when there is some active transaction in stream,
> this transaction will be rollbacked, if it does not have time to commit
> before this moment.
>
> Part of #5860
>
> @TarantoolBot document
> Title: interactive transactions was implemented over iproto streams.
> The main purpose of streams is transactions via iproto. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. There are multiple ways to begin,
> commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
> with corresponding function (box.begin, box.commit and box.rollback),
> IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
> 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
> IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
> some active transaction in stream, this transaction will be rollbacked, if it
> does not have time to commit before this moment.
> Add new command codes for begin, commit and rollback transactions:
> `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
> `IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
> ---
> src/box/call.c | 10 +-
> src/box/errcode.h | 1 +
> src/box/iproto.cc | 249 +++++++++++++++++++++++++-
> src/box/iproto_constants.c | 6 +
> src/box/iproto_constants.h | 10 +-
> test/box-tap/feedback_daemon.test.lua | 2 +-
> test/box/error.result | 1 +
> test/box/misc.result | 5 +-
> 8 files changed, 274 insertions(+), 10 deletions(-)
>
> diff --git a/src/box/call.c b/src/box/call.c
> index a6384efe2..9ba0fa9ac 100644
> --- a/src/box/call.c
> +++ b/src/box/call.c
> @@ -141,8 +141,9 @@ box_process_call(struct call_request *request, struct port *port)
> const char *name = request->name;
> assert(name != NULL);
> uint32_t name_len = mp_decode_strl(&name);
> - /* Transaction is not started. */
> - assert(!in_txn());
> + uint64_t stream_id = request->header->stream_id;
> + /* Transaction is not started for not stream requests. */
> + assert(stream_id != 0 || !in_txn());
>
> int rc;
> struct port args;
> @@ -157,7 +158,7 @@ box_process_call(struct call_request *request, struct port *port)
> }
> if (rc != 0)
> return -1;
> - if (in_txn() != NULL) {
> + if (in_txn() != NULL && stream_id == 0) {
> diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> port_destroy(port);
> return -1;
> @@ -177,9 +178,10 @@ box_process_eval(struct call_request *request, struct port *port)
> request->args_end - request->args);
> const char *expr = request->expr;
> uint32_t expr_len = mp_decode_strl(&expr);
> + uint64_t stream_id = request->header->stream_id;
> if (box_lua_eval(expr, expr_len, &args, port) != 0)
> return -1;
> - if (in_txn() != 0) {
> + if (in_txn() != 0 && stream_id == 0) {
> diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> port_destroy(port);
> return -1;
> diff --git a/src/box/errcode.h b/src/box/errcode.h
> index 6c8c00256..e76fd6442 100644
> --- a/src/box/errcode.h
> +++ b/src/box/errcode.h
> @@ -279,6 +279,7 @@ struct errcode_record {
> /*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\
> /*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \
> /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \
> + /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process this type (%u) of requests out of stream") \
The same comment as for ER_UNABLE_TO_PROCESS_IN_STREAM: I think we
should report a human-readable request type name, not just an id.
>
> /*
> * !IMPORTANT! Please follow instructions at start of the file
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3642cbd02..37715ab7f 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -65,6 +65,7 @@
> #include "tt_static.h"
> #include "salad/stailq.h"
> #include "assoc.h"
> +#include "txn.h"
>
> enum {
> IPROTO_SALT_SIZE = 32,
> @@ -78,6 +79,8 @@ enum {
> struct iproto_connection;
>
> struct iproto_stream {
> + /** Currently active stream transaction or NULL */
> + struct txn *txn;
> /**
> * Queue of pending requests (iproto messages) for this stream,
> * processed sequentially. This field is accesable only from
> @@ -88,6 +91,8 @@ struct iproto_stream {
> uint64_t id;
> /** This stream connection */
> struct iproto_connection *connection;
> + /** Pre-allocated disconnect msg to gracefully destroy stream */
> + struct cmsg on_disconnect;
> };
>
> /**
> @@ -134,6 +139,10 @@ struct iproto_thread {
> /**
> * Static routes for this iproto thread
> */
> + struct cmsg_hop begin_route[2];
> + struct cmsg_hop commit_route[2];
> + struct cmsg_hop rollback_route[2];
> + struct cmsg_hop destroy_stream_on_disconnect_route[2];
> struct cmsg_hop destroy_route[2];
> struct cmsg_hop disconnect_route[2];
> struct cmsg_hop misc_route[2];
> @@ -622,12 +631,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
> return NULL;
> }
> errinj_stream_count_add(1);
> + stream->txn = NULL;
> stailq_create(&stream->pending_requests);
> stream->id = stream_id;
> stream->connection = connection;
> return stream;
> }
>
> +static inline void
> +iproto_stream_push_on_disconnect(struct iproto_stream *stream)
> +{
> + struct iproto_connection *conn = stream->connection;
> + struct iproto_thread *iproto_thread = conn->iproto_thread;
> + struct cmsg_hop *route =
> + iproto_thread->destroy_stream_on_disconnect_route;
> + cmsg_init(&stream->on_disconnect, route);
> + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
> +}
> +
> /**
> * Return true if we have not enough spare messages
> * in the message pool.
> @@ -651,6 +672,7 @@ static void
> iproto_stream_delete(struct iproto_stream *stream)
> {
> assert(stailq_empty(&stream->pending_requests));
> + assert(stream->txn == NULL);
> errinj_stream_count_add(-1);
> mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
> }
> @@ -697,6 +719,7 @@ static inline bool
> iproto_connection_is_idle(struct iproto_connection *con)
> {
> return con->long_poll_count == 0 &&
> + mh_size(con->streams) == 0 &&
Why? A message in a stream should pin the input buffer, shouldn't it?
> ibuf_used(&con->ibuf[0]) == 0 &&
> ibuf_used(&con->ibuf[1]) == 0;
> }
> @@ -786,6 +809,23 @@ iproto_connection_close(struct iproto_connection *con)
> * is done only once.
> */
> con->p_ibuf->wpos -= con->parse_size;
> + mh_int_t node;
> + mh_foreach(con->streams, node) {
> + struct iproto_stream *stream = (struct iproto_stream *)
> + mh_i64ptr_node(con->streams, node)->val;
> + /**
> + * If stream requests queue is empty, it means that
> + * that there is some active transaction which was
> + * not commited yet. We need to rollback it, since
> + * we push on_disconnect message to tx thread here.
> + * If stream requests queue is not empty, it means
> + * that stream processing some request in tx thread
> + * now. We destroy stream in `net_send_msg` after
> + * processing all requests.
> + */
> + if (stailq_empty(&stream->pending_requests))
> + iproto_stream_push_on_disconnect(stream);
> + }
This should be done by disconnect_msg. There shouldn't be a separate
route for destroying unfinished transactions.
> cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
> assert(con->state == IPROTO_CONNECTION_ALIVE);
> con->state = IPROTO_CONNECTION_CLOSED;
> @@ -946,6 +986,7 @@ iproto_msg_set_stream(struct iproto_msg *msg)
> */
> errinj_stream_msg_count_add(1);
> stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
> + assert(stream != NULL);
> msg->stream = stream;
> /*
> * If the request queue in the stream is not empty, it means
> @@ -1388,6 +1429,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> uint64_t stream_id;
> uint8_t type;
> bool request_is_not_for_stream;
> + bool request_is_only_for_stream;
> struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
>
> if (xrow_header_decode(&msg->header, pos, reqend, true))
> @@ -1399,11 +1441,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> request_is_not_for_stream =
> ((type > IPROTO_TYPE_STAT_MAX &&
> type != IPROTO_PING) || type == IPROTO_AUTH);
> + request_is_only_for_stream =
> + (type == IPROTO_TRANSACTION_BEGIN ||
> + type == IPROTO_TRANSACTION_COMMIT ||
> + type == IPROTO_TRANSACTION_ROLLBACK);
>
> if (stream_id != 0 && request_is_not_for_stream) {
> diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
> (uint32_t) type);
> goto error;
> + } else if (stream_id == 0 && request_is_only_for_stream) {
> + diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
> + (uint32_t) type);
> + goto error;
> }
>
> /*
> @@ -1418,6 +1468,9 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
> case IPROTO_UPDATE:
> case IPROTO_DELETE:
> case IPROTO_UPSERT:
> + case IPROTO_TRANSACTION_BEGIN:
> + case IPROTO_TRANSACTION_COMMIT:
> + case IPROTO_TRANSACTION_ROLLBACK:
> if (xrow_decode_dml(&msg->header, &msg->dml,
> dml_request_key_map(type)))
Why do you need to decode a body for these messages?
AFAIU they don't have a body.
> goto error;
> @@ -1498,6 +1551,37 @@ tx_fiber_init(struct session *session, uint64_t sync)
> fiber_set_user(f, &session->credentials);
> }
>
> +static void
> +tx_process_destroy_stream_on_disconnect(struct cmsg *m)
> +{
> + struct iproto_stream *stream =
> + container_of(m, struct iproto_stream,
> + on_disconnect);
> +
> + if (stream->txn != NULL) {
> + tx_fiber_init(stream->connection->session, 0);
> + fiber_set_txn(fiber(), stream->txn);
> + if (box_txn_rollback() != 0)
> + panic("failed to rollback transaction on disconnect");
> + stream->txn = NULL;
> + }
> +}
> +
> +static void
> +net_finish_destroy_stream_on_disconnect(struct cmsg *m)
> +{
> + struct iproto_stream *stream =
> + container_of(m, struct iproto_stream,
> + on_disconnect);
> + struct iproto_connection *con = stream->connection;
> +
> + struct mh_i64ptr_node_t node = { stream->id, NULL };
> + mh_i64ptr_remove(con->streams, &node, 0);
> + iproto_stream_delete(stream);
> + assert(!evio_has_fd(&con->input));
> + if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
> + iproto_connection_try_to_start_destroy(con);
> +}
>
> static void
> tx_process_disconnect(struct cmsg *m)
> @@ -1632,15 +1716,53 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> +/**
> + * Since the processing of requests within a transaction
> + * for a stream can occur in different fibers, we store
> + * a pointer to transaction in the stream structure.
> + * Check if message belongs to stream and there is active
> + * transaction for this stream. In case it is so, sets this
> + * transaction for current fiber.
> + */
> +static inline void
> +tx_prepare_transaction_for_request(struct iproto_msg *msg)
The function is used in just one place. Should be fine to inline it.
> +{
> + if (msg->stream != NULL) {
> + struct txn *txn = msg->stream->txn;
> + /*
> + * When we do any operations (which are written to `wal`)
> + * outside of transaction, we consider each such operation
> + * as a small transaction and write it to `wal` immediately.
> + * When operation is performed as part of transaction, we
> + * write all transaction to `wal` at the commit. In this case,
> + * `is_commit` flag will be set when writing to `wal` for the
> + * last operation in transaction, the rest operations must have
> + * this flag set to false, to mark that they all belongs to the
> + * same transaction.
> + */
> + if (txn != NULL)
> + msg->header.is_commit = false;
AFAIU this flag is used only when a statement is written to WAL.
Please remove.
> + fiber_set_txn(fiber(), txn);
I would clear stream->txn here: a call() may start a new transaction, in
which case stream->txn will refer to feed memory, which is error-prone.
You wouldn't need to clear stream->txn after commit and rollback if you
cleared it here.
> + }
> +}
> +
> static inline struct iproto_msg *
> tx_accept_msg(struct cmsg *m)
> {
> struct iproto_msg *msg = (struct iproto_msg *) m;
> tx_accept_wpos(msg->connection, &msg->wpos);
> tx_fiber_init(msg->connection->session, msg->header.sync);
> + tx_prepare_transaction_for_request(msg);
> return msg;
> }
>
> +static inline void
> +tx_end_msg(struct iproto_msg *msg)
> +{
> + if (msg->stream != NULL)
> + msg->stream->txn = txn_detach();
> +}
> +
> /**
> * Write error message to the output buffer and advance
> * write position. Doesn't throw.
> @@ -1666,6 +1788,7 @@ tx_reply_iproto_error(struct cmsg *m)
> iproto_reply_error(out, diag_last_error(&msg->diag),
> msg->header.sync, ::schema_version);
> iproto_wpos_create(&msg->wpos, out);
> + tx_end_msg(msg);
> }
>
> /** Inject a short delay on tx request processing for testing. */
> @@ -1678,6 +1801,79 @@ tx_inject_delay(void)
> });
> }
>
> +static void
> +tx_process_begin(struct cmsg *m)
> +{
> + struct iproto_msg *msg = tx_accept_msg(m);
> + struct obuf *out;
> + struct iproto_stream *stream = msg->stream;
> +
> + if (tx_check_schema(msg->header.schema_version))
> + goto error;
> +
> + if (box_txn_begin() != 0)
> + goto error;
> +
> + stream->txn = txn_detach();
Can you always do it in just one place - tx_end_msg?
> + assert(stream->txn != NULL);
> +
> + out = msg->connection->tx.p_obuf;
> + iproto_reply_ok(out, msg->header.sync, ::schema_version);
> + iproto_wpos_create(&msg->wpos, out);
> + return;
> +error:
> + tx_reply_error(msg);
> + tx_end_msg(msg);
It looks inconsistent that sometimes we call tx_end_msg after processing
a request, sometimes we don't. Let's call tx_end_msg after processing
every message.
Also, I think it would be better if we raised an error in tx_end_msg if
tx is active, but stream is unset. Then we wouldn't need to check
stream_id in box_process_call/eval.
> +}
> +
> +static void
> +tx_process_commit(struct cmsg *m)
> +{
> + struct iproto_msg *msg = tx_accept_msg(m);
> + struct obuf *out;
> + struct iproto_stream *stream = msg->stream;
> +
> + if (tx_check_schema(msg->header.schema_version))
> + goto error;
> +
> + if (box_txn_commit() != 0) {
> + stream->txn = in_txn();
> + goto error;
> + }
> +
> + stream->txn = NULL;
> + out = msg->connection->tx.p_obuf;
> + iproto_reply_ok(out, msg->header.sync, ::schema_version);
> + iproto_wpos_create(&msg->wpos, out);
> + return;
> +error:
> + tx_reply_error(msg);
> + tx_end_msg(msg);
> +}
> +
> +static void
> +tx_process_rollback(struct cmsg *m)
> +{
> + struct iproto_msg *msg = tx_accept_msg(m);
> + struct obuf *out;
> + struct iproto_stream *stream = msg->stream;
> +
> + if (tx_check_schema(msg->header.schema_version))
> + goto error;
> +
> + if (box_txn_rollback() != 0)
> + goto error;
> +
> + stream->txn = NULL;
> + out = msg->connection->tx.p_obuf;
> + iproto_reply_ok(out, msg->header.sync, ::schema_version);
> + iproto_wpos_create(&msg->wpos, out);
> + return;
> +error:
> + tx_reply_error(msg);
> + tx_end_msg(msg);
> +}
> +
> static void
> tx_process1(struct cmsg *m)
> {
> @@ -1699,9 +1895,11 @@ tx_process1(struct cmsg *m)
> iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
> tuple != 0);
> iproto_wpos_create(&msg->wpos, out);
> + tx_end_msg(msg);
> return;
> error:
> tx_reply_error(msg);
> + tx_end_msg(msg);
> }
>
> static void
> @@ -1742,9 +1940,11 @@ tx_process_select(struct cmsg *m)
> iproto_reply_select(out, &svp, msg->header.sync,
> ::schema_version, count);
> iproto_wpos_create(&msg->wpos, out);
> + tx_end_msg(msg);
> return;
> error:
> tx_reply_error(msg);
> + tx_end_msg(msg);
> }
>
> static int
> @@ -1829,11 +2029,13 @@ tx_process_call(struct cmsg *m)
> goto error;
> }
>
> + tx_end_msg(msg);
> iproto_reply_select(out, &svp, msg->header.sync,
> ::schema_version, count);
> iproto_wpos_create(&msg->wpos, out);
> return;
> error:
> + tx_end_msg(msg);
> tx_reply_error(msg);
> }
>
> @@ -1843,6 +2045,7 @@ tx_process_misc(struct cmsg *m)
> struct iproto_msg *msg = tx_accept_msg(m);
> struct iproto_connection *con = msg->connection;
> struct obuf *out = con->tx.p_obuf;
> + assert(!(msg->header.type != IPROTO_PING && in_txn()));
> if (tx_check_schema(msg->header.schema_version))
> goto error;
>
> @@ -1875,9 +2078,11 @@ tx_process_misc(struct cmsg *m)
> } catch (Exception *e) {
> tx_reply_error(msg);
> }
> + tx_end_msg(msg);
> return;
> error:
> tx_reply_error(msg);
> + tx_end_msg(msg);
> }
>
> static void
> @@ -1969,10 +2174,12 @@ tx_process_sql(struct cmsg *m)
> goto error;
> }
> port_destroy(&port);
> + tx_end_msg(msg);
> iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
> iproto_wpos_create(&msg->wpos, out);
> return;
Sometimes you call tx_end_msg after writing to obuf, sometimes before.
Please be consistent.
> error:
> + tx_end_msg(msg);
> tx_reply_error(msg);
> }
>
> @@ -1983,6 +2190,7 @@ tx_process_replication(struct cmsg *m)
> struct iproto_connection *con = msg->connection;
> struct ev_io io;
> coio_create(&io, con->input.fd);
> + assert(!in_txn());
> try {
> switch (msg->header.type) {
> case IPROTO_JOIN:
> @@ -2043,9 +2251,24 @@ net_send_msg(struct cmsg *m)
> errinj_stream_msg_count_add(-1);
>
> if (stailq_empty(&stream->pending_requests)) {
> - struct mh_i64ptr_node_t node = { stream->id, NULL };
> - mh_i64ptr_remove(con->streams, &node, 0);
> - iproto_stream_delete(stream);
> + /*
> + * If no more messages for the current stream
> + * and no transaction started, then delete it.
> + */
> + if (stream->txn == NULL) {
> + struct mh_i64ptr_node_t node = { stream->id, NULL };
> + mh_i64ptr_remove(con->streams, &node, 0);
> + iproto_stream_delete(stream);
> + } else if (!evio_has_fd(&con->input)) {
> + /*
> + * Here we are in case when connection was closed,
> + * there is no messages in stream queue, but there
> + * is some active transaction in stream.
> + * Send disconnect message to rollback this
> + * transaction.
> + */
> + iproto_stream_push_on_disconnect(stream);
> + }
Should be done by disconnect_msg.
> } else {
> /*
> * If there are new messages for this stream
> @@ -2374,6 +2597,23 @@ iproto_session_push(struct session *session, struct port *port)
> static inline void
> iproto_thread_init_routes(struct iproto_thread *iproto_thread)
> {
> + iproto_thread->begin_route[0] =
> + { tx_process_begin, &iproto_thread->net_pipe };
> + iproto_thread->begin_route[1] =
> + { net_send_msg, NULL };
> + iproto_thread->commit_route[0] =
> + { tx_process_commit, &iproto_thread->net_pipe };
> + iproto_thread->commit_route[1] =
> + { net_send_msg, NULL };
> + iproto_thread->rollback_route[0] =
> + { tx_process_rollback, &iproto_thread->net_pipe };
> + iproto_thread->rollback_route[1] =
> + { net_send_msg, NULL };
> + iproto_thread->destroy_stream_on_disconnect_route[0] =
> + { tx_process_destroy_stream_on_disconnect,
> + &iproto_thread->net_pipe };
> + iproto_thread->destroy_stream_on_disconnect_route[1] =
> + { net_finish_destroy_stream_on_disconnect, NULL };
> iproto_thread->destroy_route[0] =
> { tx_process_destroy, &iproto_thread->net_pipe };
> iproto_thread->destroy_route[1] =
> @@ -2437,6 +2677,9 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
> iproto_thread->dml_route[12] = NULL;
> /* IPROTO_PREPARE */
> iproto_thread->dml_route[13] = iproto_thread->sql_route;
> + iproto_thread->dml_route[14] = iproto_thread->begin_route;
> + iproto_thread->dml_route[15] = iproto_thread->commit_route;
> + iproto_thread->dml_route[16] = iproto_thread->rollback_route;
> iproto_thread->connect_route[0] =
> { tx_process_connect, &iproto_thread->net_pipe };
> iproto_thread->connect_route[1] = { net_send_greeting, NULL };
> diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
> index f2902946a..913a64de5 100644
> --- a/src/box/iproto_constants.c
> +++ b/src/box/iproto_constants.c
> @@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
> "EXECUTE",
> NULL, /* NOP */
> "PREPARE",
> + "BEGIN",
> + "COMMIT",
> + "ROLLBACK",
> };
>
> #define bit(c) (1ULL<<IPROTO_##c)
> @@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
> 0, /* EXECUTE */
> 0, /* NOP */
> 0, /* PREPARE */
> + 0, /* BEGIN */
> + 0, /* COMMIT */
> + 0, /* ROLLBACK */
> };
> #undef bit
>
> diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
> index 59e8574f3..d8b242f1a 100644
> --- a/src/box/iproto_constants.h
> +++ b/src/box/iproto_constants.h
> @@ -237,6 +237,12 @@ enum iproto_type {
> IPROTO_NOP = 12,
> /** Prepare SQL statement. */
> IPROTO_PREPARE = 13,
> + /* Begin transaction */
> + IPROTO_TRANSACTION_BEGIN = 14,
> + /* Commit transaction */
> + IPROTO_TRANSACTION_COMMIT = 15,
> + /* Rollback transaction */
> + IPROTO_TRANSACTION_ROLLBACK = 16,
Why not simply IPROTO_BEGIN/COMMIT/ROLLBACK?
> /** The maximum typecode used for box.stat() */
> IPROTO_TYPE_STAT_MAX,
>
> @@ -345,7 +351,9 @@ static inline bool
> iproto_type_is_dml(uint16_t type)
> {
> return (type >= IPROTO_SELECT && type <= IPROTO_DELETE) ||
> - type == IPROTO_UPSERT || type == IPROTO_NOP;
> + type == IPROTO_UPSERT || type == IPROTO_NOP ||
> + (type >= IPROTO_TRANSACTION_BEGIN &&
> + type <= IPROTO_TRANSACTION_ROLLBACK);
BEGIN, COMMIT, ROLLBACK are not DML statements. Please remove.
More information about the Tarantool-patches
mailing list