[Tarantool-patches] [PATCH v5 8/9] iproto: implement interactive transactions over iproto streams

mechanik20051988 mechanik20051988 at tarantool.org
Thu Aug 12 14:16:38 MSK 2021


From: mechanik20051988 <mechanik20.05.1988 at gmail.com>

Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.

Part of #5860

@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK
accordingly. If disconnect occurs when there is some active transaction
in stream, this transaction will be rollbacked, if it does not have time
to commit before this moment. Add new command codes for begin, commit and
rollback transactions: `IPROTO_BEGIN 14`, `IPROTO_COMMIT 15` and
`IPROTO_ROLLBACK 16` accordingly.
---
 src/box/call.c                        |  12 --
 src/box/errcode.h                     |   1 +
 src/box/iproto.cc                     | 256 +++++++++++++++++++++++++-
 src/box/iproto_constants.c            |   6 +
 src/box/iproto_constants.h            |   6 +
 src/box/txn.c                         |  23 +++
 src/box/txn.h                         |  19 ++
 test/box-tap/feedback_daemon.test.lua |   2 +-
 test/box/error.result                 |   1 +
 test/box/misc.result                  |   5 +-
 10 files changed, 314 insertions(+), 17 deletions(-)

diff --git a/src/box/call.c b/src/box/call.c
index a6384efe2..0ce84b1ed 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -141,8 +141,6 @@ box_process_call(struct call_request *request, struct port *port)
 	const char *name = request->name;
 	assert(name != NULL);
 	uint32_t name_len = mp_decode_strl(&name);
-	/* Transaction is not started. */
-	assert(!in_txn());
 
 	int rc;
 	struct port args;
@@ -157,11 +155,6 @@ box_process_call(struct call_request *request, struct port *port)
 	}
 	if (rc != 0)
 		return -1;
-	if (in_txn() != NULL) {
-		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		port_destroy(port);
-		return -1;
-	}
 	return 0;
 }
 
@@ -179,10 +172,5 @@ box_process_eval(struct call_request *request, struct port *port)
 	uint32_t expr_len = mp_decode_strl(&expr);
 	if (box_lua_eval(expr, expr_len, &args, port) != 0)
 		return -1;
-	if (in_txn() != 0) {
-		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
-		port_destroy(port);
-		return -1;
-	}
 	return 0;
 }
diff --git a/src/box/errcode.h b/src/box/errcode.h
index f8fda23c1..a6f096698 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -282,6 +282,7 @@ struct errcode_record {
 	/*227 */_(ER_SYNC_QUEUE_UNCLAIMED,	"The synchronous transaction queue doesn't belong to any instance")\
 	/*228 */_(ER_SYNC_QUEUE_FOREIGN,	"The synchronous transaction queue belongs to other instance with id %u")\
 	/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
+	/*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 84dbdab40..318e31e93 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -66,6 +66,7 @@
 #include "tt_static.h"
 #include "salad/stailq.h"
 #include "assoc.h"
+#include "txn.h"
 
 enum {
 	IPROTO_SALT_SIZE = 32,
@@ -79,6 +80,8 @@ enum {
 struct iproto_connection;
 
 struct iproto_stream {
+	/** Currently active stream transaction or NULL */
+	struct txn *txn;
 	/**
 	 * Queue of pending requests (iproto messages) for this stream,
 	 * processed sequentially. This field is accesable only from
@@ -89,6 +92,11 @@ struct iproto_stream {
 	uint64_t id;
 	/** This stream connection */
 	struct iproto_connection *connection;
+	/**
+	 * Pre-allocated disconnect msg to gracefully rollback stream
+	 * transaction and destroy stream object.
+	 */
+	struct cmsg on_disconnect;
 };
 
 /**
@@ -135,6 +143,10 @@ struct iproto_thread {
 	/**
 	 * Static routes for this iproto thread
 	 */
+	struct cmsg_hop begin_route[2];
+	struct cmsg_hop commit_route[2];
+	struct cmsg_hop rollback_route[2];
+	struct cmsg_hop rollback_on_disconnect_route[2];
 	struct cmsg_hop destroy_route[2];
 	struct cmsg_hop disconnect_route[2];
 	struct cmsg_hop misc_route[2];
@@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
 		return NULL;
 	}
 	errinj_stream_count_add(1);
+	stream->txn = NULL;
 	stailq_create(&stream->pending_requests);
 	stream->id = stream_id;
 	stream->connection = connection;
 	return stream;
 }
 
+static inline void
+iproto_stream_rollback_on_disconnect(struct iproto_stream *stream)
+{
+	struct iproto_connection *conn = stream->connection;
+	struct iproto_thread *iproto_thread = conn->iproto_thread;
+	struct cmsg_hop *route =
+		iproto_thread->rollback_on_disconnect_route;
+	cmsg_init(&stream->on_disconnect, route);
+	cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
+}
+
 /**
  * Return true if we have not enough spare messages
  * in the message pool.
@@ -670,6 +694,7 @@ static void
 iproto_stream_delete(struct iproto_stream *stream)
 {
 	assert(stailq_empty(&stream->pending_requests));
+	assert(stream->txn == NULL);
 	errinj_stream_count_add(-1);
 	mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
 }
@@ -715,7 +740,19 @@ iproto_msg_new(struct iproto_connection *con)
 static inline bool
 iproto_connection_is_idle(struct iproto_connection *con)
 {
+	/*
+	 * The check for 'mh_size (streams) == 0' was added, because it is
+	 * possible that when disconnect occurs, there is active transaction
+	 * in stream after processing all messages. In this case we send
+	 * special message to rollback it, and without this check we would
+	 * immediately send special message to destroy connection. This would
+	 * not lead to error now, since the messages are processed strictly
+	 * sequentially and rollback does not yield, but it is not safely and
+	 * if we add some more complex logic, it may lead to difficulty catching
+	 * errors in the future.
+	 */
 	return con->long_poll_count == 0 &&
+	       mh_size(con->streams) == 0 &&
 	       ibuf_used(&con->ibuf[0]) == 0 &&
 	       ibuf_used(&con->ibuf[1]) == 0;
 }
@@ -805,6 +842,23 @@ iproto_connection_close(struct iproto_connection *con)
 		 * is done only once.
 		 */
 		con->p_ibuf->wpos -= con->parse_size;
+		mh_int_t node;
+		mh_foreach(con->streams, node) {
+			struct iproto_stream *stream = (struct iproto_stream *)
+				mh_i64ptr_node(con->streams, node)->val;
+			/**
+			 * If stream requests queue is empty, it means that
+			 * that there is some active transaction which was
+			 * not commited yet. We need to rollback it, since
+			 * we push on_disconnect message to tx thread here.
+			 * If stream requests queue is not empty, it means
+			 * that stream processing some request in tx thread
+			 * now. We destroy stream in `net_send_msg` after
+			 * processing all requests.
+			 */
+			if (stailq_empty(&stream->pending_requests))
+				iproto_stream_rollback_on_disconnect(stream);
+		}
 		cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
 		assert(con->state == IPROTO_CONNECTION_ALIVE);
 		con->state = IPROTO_CONNECTION_CLOSED;
@@ -965,6 +1019,7 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
 	 */
 	errinj_stream_msg_count_add(1);
 	stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+	assert(stream != NULL);
 	msg->stream = stream;
 	/*
 	 * If the request queue in the stream is not empty, it means
@@ -1407,6 +1462,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	uint64_t stream_id;
 	uint8_t type;
 	bool request_is_not_for_stream;
+	bool request_is_only_for_stream;
 	struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
 
 	if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1418,11 +1474,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 	request_is_not_for_stream =
 		((type > IPROTO_TYPE_STAT_MAX &&
 		 type != IPROTO_PING) || type == IPROTO_AUTH);
+	request_is_only_for_stream =
+		(type == IPROTO_BEGIN ||
+		 type == IPROTO_COMMIT ||
+		 type == IPROTO_ROLLBACK);
 
 	if (stream_id != 0 && request_is_not_for_stream) {
 		diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
 			 iproto_type_name(type));
 		goto error;
+	} else if (stream_id == 0 && request_is_only_for_stream) {
+		diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
+			 iproto_type_name(type));
+		goto error;
 	}
 
 	/*
@@ -1450,6 +1514,15 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		              sizeof(*(iproto_thread->dml_route)));
 		cmsg_init(&msg->base, iproto_thread->dml_route[type]);
 		break;
+	case IPROTO_BEGIN:
+		cmsg_init(&msg->base, iproto_thread->begin_route);
+		break;
+	case IPROTO_COMMIT:
+		cmsg_init(&msg->base, iproto_thread->commit_route);
+		break;
+	case IPROTO_ROLLBACK:
+		cmsg_init(&msg->base, iproto_thread->rollback_route);
+		break;
 	case IPROTO_CALL_16:
 	case IPROTO_CALL:
 	case IPROTO_EVAL:
@@ -1523,6 +1596,38 @@ tx_fiber_init(struct session *session, uint64_t sync)
 	fiber_set_user(f, &session->credentials);
 }
 
+static void
+tx_process_rollback_on_disconnect(struct cmsg *m)
+{
+	struct iproto_stream *stream =
+		container_of(m, struct iproto_stream,
+			     on_disconnect);
+
+	if (stream->txn != NULL) {
+		tx_fiber_init(stream->connection->session, 0);
+		txn_attach(stream->txn);
+		if (box_txn_rollback() != 0)
+			panic("failed to rollback transaction on disconnect");
+		stream->txn = NULL;
+	}
+}
+
+static void
+net_finish_rollback_on_disconnect(struct cmsg *m)
+{
+	struct iproto_stream *stream =
+		container_of(m, struct iproto_stream,
+			     on_disconnect);
+	struct iproto_connection *con = stream->connection;
+
+	struct mh_i64ptr_node_t node = { stream->id, NULL };
+	mh_i64ptr_remove(con->streams, &node, 0);
+	iproto_stream_delete(stream);
+	assert(!evio_has_fd(&con->input));
+	if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
+		iproto_connection_try_to_start_destroy(con);
+}
+
 static void
 tx_process_disconnect(struct cmsg *m)
 {
@@ -1656,15 +1761,43 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
 	}
 }
 
+/**
+ * Since the processing of requests within a transaction
+ * for a stream can occur in different fibers, we store
+ * a pointer to transaction in the stream structure.
+ * Check if message belongs to stream and there is active
+ * transaction for this stream. In case it is so, sets this
+ * transaction for current fiber.
+ */
+static inline void
+tx_prepare_transaction_for_request(struct iproto_msg *msg)
+{
+	if (msg->stream != NULL && msg->stream->txn != NULL) {
+		txn_attach(msg->stream->txn);
+		msg->stream->txn = NULL;
+	}
+	assert(!in_txn() || msg->stream != NULL);
+}
+
 static inline struct iproto_msg *
 tx_accept_msg(struct cmsg *m)
 {
 	struct iproto_msg *msg = (struct iproto_msg *) m;
 	tx_accept_wpos(msg->connection, &msg->wpos);
 	tx_fiber_init(msg->connection->session, msg->header.sync);
+	tx_prepare_transaction_for_request(msg);
 	return msg;
 }
 
+static inline void
+tx_end_msg(struct iproto_msg *msg)
+{
+	if (msg->stream != NULL) {
+		assert(msg->stream->txn == NULL);
+		msg->stream->txn = txn_detach();
+	}
+}
+
 /**
  * Write error message to the output buffer and advance
  * write position. Doesn't throw.
@@ -1690,6 +1823,7 @@ tx_reply_iproto_error(struct cmsg *m)
 	iproto_reply_error(out, diag_last_error(&msg->diag),
 			   msg->header.sync, ::schema_version);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 }
 
 /** Inject a short delay on tx request processing for testing. */
@@ -1702,6 +1836,72 @@ tx_inject_delay(void)
 	});
 }
 
+static void
+tx_process_begin(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_begin() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
+static void
+tx_process_commit(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_commit() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
+static void
+tx_process_rollback(struct cmsg *m)
+{
+	struct iproto_msg *msg = tx_accept_msg(m);
+	struct obuf *out;
+
+	if (tx_check_schema(msg->header.schema_version))
+		goto error;
+
+	if (box_txn_rollback() != 0)
+		goto error;
+
+	out = msg->connection->tx.p_obuf;
+	iproto_reply_ok(out, msg->header.sync, ::schema_version);
+	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
+	return;
+error:
+	tx_reply_error(msg);
+	tx_end_msg(msg);
+}
+
 static void
 tx_process1(struct cmsg *m)
 {
@@ -1723,9 +1923,11 @@ tx_process1(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
 			    tuple != 0);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1766,9 +1968,11 @@ tx_process_select(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync,
 			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static int
@@ -1818,6 +2022,12 @@ tx_process_call(struct cmsg *m)
 	if (rc != 0)
 		goto error;
 
+	if (in_txn() != NULL && msg->header.stream_id == 0) {
+		diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
+		port_destroy(&port);
+		goto error;
+	}
+
 	/*
 	 * Add all elements returned by the function to iproto.
 	 *
@@ -1856,9 +2066,11 @@ tx_process_call(struct cmsg *m)
 	iproto_reply_select(out, &svp, msg->header.sync,
 			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1867,6 +2079,7 @@ tx_process_misc(struct cmsg *m)
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
 	struct obuf *out = con->tx.p_obuf;
+	assert(!(msg->header.type != IPROTO_PING && in_txn()));
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
@@ -1899,9 +2112,11 @@ tx_process_misc(struct cmsg *m)
 	} catch (Exception *e) {
 		tx_reply_error(msg);
 	}
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -1995,9 +2210,11 @@ tx_process_sql(struct cmsg *m)
 	port_destroy(&port);
 	iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
 	iproto_wpos_create(&msg->wpos, out);
+	tx_end_msg(msg);
 	return;
 error:
 	tx_reply_error(msg);
+	tx_end_msg(msg);
 }
 
 static void
@@ -2007,6 +2224,7 @@ tx_process_replication(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	struct ev_io io;
 	coio_create(&io, con->input.fd);
+	assert(!in_txn());
 	try {
 		switch (msg->header.type) {
 		case IPROTO_JOIN:
@@ -2066,9 +2284,24 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
 	errinj_stream_msg_count_add(-1);
 
 	if (stailq_empty(&stream->pending_requests)) {
-		struct mh_i64ptr_node_t node = { stream->id, NULL };
-		mh_i64ptr_remove(con->streams, &node, 0);
-		iproto_stream_delete(stream);
+		/*
+		 * If no more messages for the current stream
+		 * and no transaction started, then delete it.
+		 */
+		if (stream->txn == NULL) {
+			struct mh_i64ptr_node_t node = { stream->id, NULL };
+			mh_i64ptr_remove(con->streams, &node, 0);
+			iproto_stream_delete(stream);
+		} else if (!evio_has_fd(&con->input)) {
+			/*
+			 * Here we are in case when connection was closed,
+			 * there is no messages in stream queue, but there
+			 * is some active transaction in stream.
+			 * Send disconnect message to rollback this
+			 * transaction.
+			 */
+			iproto_stream_rollback_on_disconnect(stream);
+		}
 	} else {
 		/*
 		 * If there are new messages for this stream
@@ -2404,6 +2637,23 @@ iproto_session_push(struct session *session, struct port *port)
 static inline void
 iproto_thread_init_routes(struct iproto_thread *iproto_thread)
 {
+	iproto_thread->begin_route[0] =
+		{ tx_process_begin, &iproto_thread->net_pipe };
+	iproto_thread->begin_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->commit_route[0] =
+		{ tx_process_commit, &iproto_thread->net_pipe };
+	iproto_thread->commit_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->rollback_route[0] =
+		{ tx_process_rollback, &iproto_thread->net_pipe };
+	iproto_thread->rollback_route[1] =
+		{ net_send_msg, NULL };
+	iproto_thread->rollback_on_disconnect_route[0] =
+		{ tx_process_rollback_on_disconnect,
+		  &iproto_thread->net_pipe };
+	iproto_thread->rollback_on_disconnect_route[1] =
+		{ net_finish_rollback_on_disconnect, NULL };
 	iproto_thread->destroy_route[0] =
 		{ tx_process_destroy, &iproto_thread->net_pipe };
 	iproto_thread->destroy_route[1] =
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index f2902946a..913a64de5 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
 	"EXECUTE",
 	NULL, /* NOP */
 	"PREPARE",
+	"BEGIN",
+	"COMMIT",
+	"ROLLBACK",
 };
 
 #define bit(c) (1ULL<<IPROTO_##c)
@@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
 	0,                                                     /* EXECUTE */
 	0,                                                     /* NOP */
 	0,                                                     /* PREPARE */
+	0,                                                     /* BEGIN */
+	0,                                                     /* COMMIT */
+	0,                                                     /* ROLLBACK */
 };
 #undef bit
 
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 8792737b2..921007580 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -237,6 +237,12 @@ enum iproto_type {
 	IPROTO_NOP = 12,
 	/** Prepare SQL statement. */
 	IPROTO_PREPARE = 13,
+	/* Begin transaction */
+	IPROTO_BEGIN = 14,
+	/* Commit transaction */
+	IPROTO_COMMIT = 15,
+	/* Rollback transaction */
+	IPROTO_ROLLBACK = 16,
 	/** The maximum typecode used for box.stat() */
 	IPROTO_TYPE_STAT_MAX,
 
diff --git a/src/box/txn.c b/src/box/txn.c
index e057d2762..06d048870 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -1255,3 +1255,26 @@ txn_on_yield(struct trigger *trigger, void *event)
 	txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
 	return 0;
 }
+
+struct txn *
+txn_detach(void)
+{
+	struct txn *txn = in_txn();
+	if (txn == NULL)
+		return NULL;
+	if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
+		txn_on_yield(NULL, NULL);
+		trigger_clear(&txn->fiber_on_yield);
+	}
+	trigger_clear(&txn->fiber_on_stop);
+	fiber_set_txn(fiber(), NULL);
+	return txn;
+}
+
+void
+txn_attach(struct txn *txn)
+{
+	assert(txn != NULL);
+	assert(!in_txn());
+	fiber_set_txn(fiber(), txn);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 8741dc6a1..f11144567 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -457,6 +457,25 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
 	fiber->storage.txn = txn;
 }
 
+/**
+ * Detach transaction from fiber.
+ * By default if the fiber is stopped the transaction started
+ * in this fiber is rollback. This function detaches transaction
+ * from fiber - detached transaction does not rollback in case
+ * when fiber stopped, but can be aborted in case it does not
+ * support yeild.
+ */
+struct txn *
+txn_detach(void);
+
+/**
+ * Attach transaction to fiber.
+ * Attach @a txn that has been detached previously and saved
+ * somewhere to a new fiber.
+ */
+void
+txn_attach(struct txn *txn);
+
 /**
  * Start a transaction explicitly.
  * @pre no transaction is active
diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua
index a2e041649..f700f3f72 100755
--- a/test/box-tap/feedback_daemon.test.lua
+++ b/test/box-tap/feedback_daemon.test.lua
@@ -251,7 +251,7 @@ box.space.features_sync:drop()
 
 local function check_stats(stat)
     local sub = test:test('feedback operation stats')
-    sub:plan(18)
+    sub:plan(21)
     local box_stat = box.stat()
     local net_stat = box.stat.net()
     for op, val in pairs(box_stat) do
diff --git a/test/box/error.result b/test/box/error.result
index f80fdfed5..bc804197a 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -448,6 +448,7 @@ t;
  |   227: box.error.SYNC_QUEUE_UNCLAIMED
  |   228: box.error.SYNC_QUEUE_FOREIGN
  |   229: box.error.UNABLE_TO_PROCESS_IN_STREAM
+ |   230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM
  | ...
 
 test_run:cmd("setopt delimiter ''");
diff --git a/test/box/misc.result b/test/box/misc.result
index b62a64355..c86245914 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -136,11 +136,14 @@ end;
 t;
 ---
 - - DELETE
+  - COMMIT
   - SELECT
+  - ROLLBACK
   - INSERT
   - EVAL
-  - CALL
   - ERROR
+  - CALL
+  - BEGIN
   - PREPARE
   - REPLACE
   - UPSERT
-- 
2.20.1



More information about the Tarantool-patches mailing list