[Tarantool-patches] [PATCH 6/7] iproto: implement interactive transactions over iproto streams
mechanik20051988
mechanik20051988 at tarantool.org
Thu Aug 5 21:17:44 MSK 2021
From: mechanik20051988 <mechanik20.05.1988 at gmail.com>
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.
Part of #5860
@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
some active transaction in stream, this transaction will be rollbacked, if it
does not have time to commit before this moment.
Add new command codes for begin, commit and rollback transactions:
`IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
`IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
---
src/box/call.c | 10 +-
src/box/errcode.h | 1 +
src/box/iproto.cc | 249 +++++++++++++++++++++++++-
src/box/iproto_constants.c | 6 +
src/box/iproto_constants.h | 10 +-
test/box-tap/feedback_daemon.test.lua | 2 +-
test/box/error.result | 1 +
test/box/misc.result | 5 +-
8 files changed, 274 insertions(+), 10 deletions(-)
diff --git a/src/box/call.c b/src/box/call.c
index a6384efe2..9ba0fa9ac 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -141,8 +141,9 @@ box_process_call(struct call_request *request, struct port *port)
const char *name = request->name;
assert(name != NULL);
uint32_t name_len = mp_decode_strl(&name);
- /* Transaction is not started. */
- assert(!in_txn());
+ uint64_t stream_id = request->header->stream_id;
+ /* Transaction is not started for not stream requests. */
+ assert(stream_id != 0 || !in_txn());
int rc;
struct port args;
@@ -157,7 +158,7 @@ box_process_call(struct call_request *request, struct port *port)
}
if (rc != 0)
return -1;
- if (in_txn() != NULL) {
+ if (in_txn() != NULL && stream_id == 0) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
port_destroy(port);
return -1;
@@ -177,9 +178,10 @@ box_process_eval(struct call_request *request, struct port *port)
request->args_end - request->args);
const char *expr = request->expr;
uint32_t expr_len = mp_decode_strl(&expr);
+ uint64_t stream_id = request->header->stream_id;
if (box_lua_eval(expr, expr_len, &args, port) != 0)
return -1;
- if (in_txn() != 0) {
+ if (in_txn() != 0 && stream_id == 0) {
diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
port_destroy(port);
return -1;
diff --git a/src/box/errcode.h b/src/box/errcode.h
index 6c8c00256..e76fd6442 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -279,6 +279,7 @@ struct errcode_record {
/*224 */_(ER_ELECTION_DISABLED, "Elections were turned off")\
/*225 */_(ER_TXN_ROLLBACK, "Transaction was rolled back") \
/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process this type (%u) of requests in stream") \
+ /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process this type (%u) of requests out of stream") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3642cbd02..37715ab7f 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -65,6 +65,7 @@
#include "tt_static.h"
#include "salad/stailq.h"
#include "assoc.h"
+#include "txn.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -78,6 +79,8 @@ enum {
struct iproto_connection;
struct iproto_stream {
+ /** Currently active stream transaction or NULL */
+ struct txn *txn;
/**
* Queue of pending requests (iproto messages) for this stream,
* processed sequentially. This field is accesable only from
@@ -88,6 +91,8 @@ struct iproto_stream {
uint64_t id;
/** This stream connection */
struct iproto_connection *connection;
+ /** Pre-allocated disconnect msg to gracefully destroy stream */
+ struct cmsg on_disconnect;
};
/**
@@ -134,6 +139,10 @@ struct iproto_thread {
/**
* Static routes for this iproto thread
*/
+ struct cmsg_hop begin_route[2];
+ struct cmsg_hop commit_route[2];
+ struct cmsg_hop rollback_route[2];
+ struct cmsg_hop destroy_stream_on_disconnect_route[2];
struct cmsg_hop destroy_route[2];
struct cmsg_hop disconnect_route[2];
struct cmsg_hop misc_route[2];
@@ -622,12 +631,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
return NULL;
}
errinj_stream_count_add(1);
+ stream->txn = NULL;
stailq_create(&stream->pending_requests);
stream->id = stream_id;
stream->connection = connection;
return stream;
}
+static inline void
+iproto_stream_push_on_disconnect(struct iproto_stream *stream)
+{
+ struct iproto_connection *conn = stream->connection;
+ struct iproto_thread *iproto_thread = conn->iproto_thread;
+ struct cmsg_hop *route =
+ iproto_thread->destroy_stream_on_disconnect_route;
+ cmsg_init(&stream->on_disconnect, route);
+ cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
+}
+
/**
* Return true if we have not enough spare messages
* in the message pool.
@@ -651,6 +672,7 @@ static void
iproto_stream_delete(struct iproto_stream *stream)
{
assert(stailq_empty(&stream->pending_requests));
+ assert(stream->txn == NULL);
errinj_stream_count_add(-1);
mempool_free(&stream->connection->iproto_thread->stream_pool, stream);
}
@@ -697,6 +719,7 @@ static inline bool
iproto_connection_is_idle(struct iproto_connection *con)
{
return con->long_poll_count == 0 &&
+ mh_size(con->streams) == 0 &&
ibuf_used(&con->ibuf[0]) == 0 &&
ibuf_used(&con->ibuf[1]) == 0;
}
@@ -786,6 +809,23 @@ iproto_connection_close(struct iproto_connection *con)
* is done only once.
*/
con->p_ibuf->wpos -= con->parse_size;
+ mh_int_t node;
+ mh_foreach(con->streams, node) {
+ struct iproto_stream *stream = (struct iproto_stream *)
+ mh_i64ptr_node(con->streams, node)->val;
+ /**
+ * If stream requests queue is empty, it means that
+ * that there is some active transaction which was
+ * not commited yet. We need to rollback it, since
+ * we push on_disconnect message to tx thread here.
+ * If stream requests queue is not empty, it means
+ * that stream processing some request in tx thread
+ * now. We destroy stream in `net_send_msg` after
+ * processing all requests.
+ */
+ if (stailq_empty(&stream->pending_requests))
+ iproto_stream_push_on_disconnect(stream);
+ }
cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
assert(con->state == IPROTO_CONNECTION_ALIVE);
con->state = IPROTO_CONNECTION_CLOSED;
@@ -946,6 +986,7 @@ iproto_msg_set_stream(struct iproto_msg *msg)
*/
errinj_stream_msg_count_add(1);
stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+ assert(stream != NULL);
msg->stream = stream;
/*
* If the request queue in the stream is not empty, it means
@@ -1388,6 +1429,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
uint64_t stream_id;
uint8_t type;
bool request_is_not_for_stream;
+ bool request_is_only_for_stream;
struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1399,11 +1441,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
request_is_not_for_stream =
((type > IPROTO_TYPE_STAT_MAX &&
type != IPROTO_PING) || type == IPROTO_AUTH);
+ request_is_only_for_stream =
+ (type == IPROTO_TRANSACTION_BEGIN ||
+ type == IPROTO_TRANSACTION_COMMIT ||
+ type == IPROTO_TRANSACTION_ROLLBACK);
if (stream_id != 0 && request_is_not_for_stream) {
diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
(uint32_t) type);
goto error;
+ } else if (stream_id == 0 && request_is_only_for_stream) {
+ diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
+ (uint32_t) type);
+ goto error;
}
/*
@@ -1418,6 +1468,9 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
case IPROTO_UPDATE:
case IPROTO_DELETE:
case IPROTO_UPSERT:
+ case IPROTO_TRANSACTION_BEGIN:
+ case IPROTO_TRANSACTION_COMMIT:
+ case IPROTO_TRANSACTION_ROLLBACK:
if (xrow_decode_dml(&msg->header, &msg->dml,
dml_request_key_map(type)))
goto error;
@@ -1498,6 +1551,37 @@ tx_fiber_init(struct session *session, uint64_t sync)
fiber_set_user(f, &session->credentials);
}
+static void
+tx_process_destroy_stream_on_disconnect(struct cmsg *m)
+{
+ struct iproto_stream *stream =
+ container_of(m, struct iproto_stream,
+ on_disconnect);
+
+ if (stream->txn != NULL) {
+ tx_fiber_init(stream->connection->session, 0);
+ fiber_set_txn(fiber(), stream->txn);
+ if (box_txn_rollback() != 0)
+ panic("failed to rollback transaction on disconnect");
+ stream->txn = NULL;
+ }
+}
+
+static void
+net_finish_destroy_stream_on_disconnect(struct cmsg *m)
+{
+ struct iproto_stream *stream =
+ container_of(m, struct iproto_stream,
+ on_disconnect);
+ struct iproto_connection *con = stream->connection;
+
+ struct mh_i64ptr_node_t node = { stream->id, NULL };
+ mh_i64ptr_remove(con->streams, &node, 0);
+ iproto_stream_delete(stream);
+ assert(!evio_has_fd(&con->input));
+ if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
+ iproto_connection_try_to_start_destroy(con);
+}
static void
tx_process_disconnect(struct cmsg *m)
@@ -1632,15 +1716,53 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
}
}
+/**
+ * Since the processing of requests within a transaction
+ * for a stream can occur in different fibers, we store
+ * a pointer to transaction in the stream structure.
+ * Check if message belongs to stream and there is active
+ * transaction for this stream. In case it is so, sets this
+ * transaction for current fiber.
+ */
+static inline void
+tx_prepare_transaction_for_request(struct iproto_msg *msg)
+{
+ if (msg->stream != NULL) {
+ struct txn *txn = msg->stream->txn;
+ /*
+ * When we do any operations (which are written to `wal`)
+ * outside of transaction, we consider each such operation
+ * as a small transaction and write it to `wal` immediately.
+ * When operation is performed as part of transaction, we
+ * write all transaction to `wal` at the commit. In this case,
+ * `is_commit` flag will be set when writing to `wal` for the
+ * last operation in transaction, the rest operations must have
+ * this flag set to false, to mark that they all belongs to the
+ * same transaction.
+ */
+ if (txn != NULL)
+ msg->header.is_commit = false;
+ fiber_set_txn(fiber(), txn);
+ }
+}
+
static inline struct iproto_msg *
tx_accept_msg(struct cmsg *m)
{
struct iproto_msg *msg = (struct iproto_msg *) m;
tx_accept_wpos(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_prepare_transaction_for_request(msg);
return msg;
}
+static inline void
+tx_end_msg(struct iproto_msg *msg)
+{
+ if (msg->stream != NULL)
+ msg->stream->txn = txn_detach();
+}
+
/**
* Write error message to the output buffer and advance
* write position. Doesn't throw.
@@ -1666,6 +1788,7 @@ tx_reply_iproto_error(struct cmsg *m)
iproto_reply_error(out, diag_last_error(&msg->diag),
msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
}
/** Inject a short delay on tx request processing for testing. */
@@ -1678,6 +1801,79 @@ tx_inject_delay(void)
});
}
+static void
+tx_process_begin(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+ struct iproto_stream *stream = msg->stream;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_begin() != 0)
+ goto error;
+
+ stream->txn = txn_detach();
+ assert(stream->txn != NULL);
+
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
+static void
+tx_process_commit(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+ struct iproto_stream *stream = msg->stream;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_commit() != 0) {
+ stream->txn = in_txn();
+ goto error;
+ }
+
+ stream->txn = NULL;
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
+static void
+tx_process_rollback(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+ struct iproto_stream *stream = msg->stream;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_rollback() != 0)
+ goto error;
+
+ stream->txn = NULL;
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
static void
tx_process1(struct cmsg *m)
{
@@ -1699,9 +1895,11 @@ tx_process1(struct cmsg *m)
iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
tuple != 0);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -1742,9 +1940,11 @@ tx_process_select(struct cmsg *m)
iproto_reply_select(out, &svp, msg->header.sync,
::schema_version, count);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static int
@@ -1829,11 +2029,13 @@ tx_process_call(struct cmsg *m)
goto error;
}
+ tx_end_msg(msg);
iproto_reply_select(out, &svp, msg->header.sync,
::schema_version, count);
iproto_wpos_create(&msg->wpos, out);
return;
error:
+ tx_end_msg(msg);
tx_reply_error(msg);
}
@@ -1843,6 +2045,7 @@ tx_process_misc(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
struct obuf *out = con->tx.p_obuf;
+ assert(!(msg->header.type != IPROTO_PING && in_txn()));
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1875,9 +2078,11 @@ tx_process_misc(struct cmsg *m)
} catch (Exception *e) {
tx_reply_error(msg);
}
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -1969,10 +2174,12 @@ tx_process_sql(struct cmsg *m)
goto error;
}
port_destroy(&port);
+ tx_end_msg(msg);
iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
iproto_wpos_create(&msg->wpos, out);
return;
error:
+ tx_end_msg(msg);
tx_reply_error(msg);
}
@@ -1983,6 +2190,7 @@ tx_process_replication(struct cmsg *m)
struct iproto_connection *con = msg->connection;
struct ev_io io;
coio_create(&io, con->input.fd);
+ assert(!in_txn());
try {
switch (msg->header.type) {
case IPROTO_JOIN:
@@ -2043,9 +2251,24 @@ net_send_msg(struct cmsg *m)
errinj_stream_msg_count_add(-1);
if (stailq_empty(&stream->pending_requests)) {
- struct mh_i64ptr_node_t node = { stream->id, NULL };
- mh_i64ptr_remove(con->streams, &node, 0);
- iproto_stream_delete(stream);
+ /*
+ * If no more messages for the current stream
+ * and no transaction started, then delete it.
+ */
+ if (stream->txn == NULL) {
+ struct mh_i64ptr_node_t node = { stream->id, NULL };
+ mh_i64ptr_remove(con->streams, &node, 0);
+ iproto_stream_delete(stream);
+ } else if (!evio_has_fd(&con->input)) {
+ /*
+ * Here we are in case when connection was closed,
+ * there is no messages in stream queue, but there
+ * is some active transaction in stream.
+ * Send disconnect message to rollback this
+ * transaction.
+ */
+ iproto_stream_push_on_disconnect(stream);
+ }
} else {
/*
* If there are new messages for this stream
@@ -2374,6 +2597,23 @@ iproto_session_push(struct session *session, struct port *port)
static inline void
iproto_thread_init_routes(struct iproto_thread *iproto_thread)
{
+ iproto_thread->begin_route[0] =
+ { tx_process_begin, &iproto_thread->net_pipe };
+ iproto_thread->begin_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->commit_route[0] =
+ { tx_process_commit, &iproto_thread->net_pipe };
+ iproto_thread->commit_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->rollback_route[0] =
+ { tx_process_rollback, &iproto_thread->net_pipe };
+ iproto_thread->rollback_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->destroy_stream_on_disconnect_route[0] =
+ { tx_process_destroy_stream_on_disconnect,
+ &iproto_thread->net_pipe };
+ iproto_thread->destroy_stream_on_disconnect_route[1] =
+ { net_finish_destroy_stream_on_disconnect, NULL };
iproto_thread->destroy_route[0] =
{ tx_process_destroy, &iproto_thread->net_pipe };
iproto_thread->destroy_route[1] =
@@ -2437,6 +2677,9 @@ iproto_thread_init_routes(struct iproto_thread *iproto_thread)
iproto_thread->dml_route[12] = NULL;
/* IPROTO_PREPARE */
iproto_thread->dml_route[13] = iproto_thread->sql_route;
+ iproto_thread->dml_route[14] = iproto_thread->begin_route;
+ iproto_thread->dml_route[15] = iproto_thread->commit_route;
+ iproto_thread->dml_route[16] = iproto_thread->rollback_route;
iproto_thread->connect_route[0] =
{ tx_process_connect, &iproto_thread->net_pipe };
iproto_thread->connect_route[1] = { net_send_greeting, NULL };
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index f2902946a..913a64de5 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
"EXECUTE",
NULL, /* NOP */
"PREPARE",
+ "BEGIN",
+ "COMMIT",
+ "ROLLBACK",
};
#define bit(c) (1ULL<<IPROTO_##c)
@@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
0, /* EXECUTE */
0, /* NOP */
0, /* PREPARE */
+ 0, /* BEGIN */
+ 0, /* COMMIT */
+ 0, /* ROLLBACK */
};
#undef bit
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 59e8574f3..d8b242f1a 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -237,6 +237,12 @@ enum iproto_type {
IPROTO_NOP = 12,
/** Prepare SQL statement. */
IPROTO_PREPARE = 13,
+ /* Begin transaction */
+ IPROTO_TRANSACTION_BEGIN = 14,
+ /* Commit transaction */
+ IPROTO_TRANSACTION_COMMIT = 15,
+ /* Rollback transaction */
+ IPROTO_TRANSACTION_ROLLBACK = 16,
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
@@ -345,7 +351,9 @@ static inline bool
iproto_type_is_dml(uint16_t type)
{
return (type >= IPROTO_SELECT && type <= IPROTO_DELETE) ||
- type == IPROTO_UPSERT || type == IPROTO_NOP;
+ type == IPROTO_UPSERT || type == IPROTO_NOP ||
+ (type >= IPROTO_TRANSACTION_BEGIN &&
+ type <= IPROTO_TRANSACTION_ROLLBACK);
}
/**
diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua
index a2e041649..f700f3f72 100755
--- a/test/box-tap/feedback_daemon.test.lua
+++ b/test/box-tap/feedback_daemon.test.lua
@@ -251,7 +251,7 @@ box.space.features_sync:drop()
local function check_stats(stat)
local sub = test:test('feedback operation stats')
- sub:plan(18)
+ sub:plan(21)
local box_stat = box.stat()
local net_stat = box.stat.net()
for op, val in pairs(box_stat) do
diff --git a/test/box/error.result b/test/box/error.result
index 7984f6bb3..893687e63 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -445,6 +445,7 @@ t;
| 224: box.error.ELECTION_DISABLED
| 225: box.error.TXN_ROLLBACK
| 226: box.error.UNABLE_TO_PROCESS_IN_STREAM
+ | 227: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM
| ...
test_run:cmd("setopt delimiter ''");
diff --git a/test/box/misc.result b/test/box/misc.result
index b62a64355..c86245914 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -136,11 +136,14 @@ end;
t;
---
- - DELETE
+ - COMMIT
- SELECT
+ - ROLLBACK
- INSERT
- EVAL
- - CALL
- ERROR
+ - CALL
+ - BEGIN
- PREPARE
- REPLACE
- UPSERT
--
2.20.1
More information about the Tarantool-patches
mailing list