From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from [87.239.111.99] (localhost [127.0.0.1]) by dev.tarantool.org (Postfix) with ESMTP id C8FF76EC60; Wed, 11 Aug 2021 12:00:35 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org C8FF76EC60 DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=tarantool.org; s=dev; t=1628672435; bh=x+29BcrOct37OC41SaVqfrM8Imi964kMEIl993/a6Vs=; h=To:Cc:Date:In-Reply-To:References:Subject:List-Id: List-Unsubscribe:List-Archive:List-Post:List-Help:List-Subscribe: From:Reply-To:From; b=M7DZw3/iyGZ0uX29qWdZquUoRalC1sm0ggM/wyYAcaL9KOtCGtHMtOl1fZ74ItzUM 6wLm1Bh4X+IpnpQwzm35XO0HIc/4+Gd2i3TA8xGsBI9/WaF1HID3yK6WhsCprSQPX5 6QXzAYHS5NKyykJUBj+LmqW57rBUHFMqotWh3zlE= Received: from smtp39.i.mail.ru (smtp39.i.mail.ru [94.100.177.99]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 25FF96EC60 for ; Wed, 11 Aug 2021 11:57:08 +0300 (MSK) DKIM-Filter: OpenDKIM Filter v2.11.0 dev.tarantool.org 25FF96EC60 Received: by smtp39.i.mail.ru with esmtpa (envelope-from ) id 1mDk2s-0005BM-Up; Wed, 11 Aug 2021 11:57:07 +0300 To: tarantool-patches@dev.tarantool.org, vdavydov@tarantool.org, v.shpilevoy@tarantool.org Cc: mechanik20051988 Date: Wed, 11 Aug 2021 11:56:57 +0300 Message-Id: <06356e3ba5ad12dc98f0aeaa122af0fe022e8822.1628671235.git.mechanik20051988@tarantool.org> X-Mailer: git-send-email 2.20.1 In-Reply-To: References: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-7564579A: B8F34718100C35BD X-77F55803: 4F1203BC0FB41BD92087353F0EC44DD9D5AC6413C25DCF08CC98B8FCC5CD86F3182A05F53808504070C8A7339DCFFA14A69AC831956CBB32383FA96A70BEC076706935AE7E20DBA4 X-7FA49CB5: FF5795518A3D127A4AD6D5ED66289B5278DA827A17800CE761966F250AC1AE21EA1F7E6F0F101C67BD4B6F7A4D31EC0BCC500DACC3FED6E28638F802B75D45FF8AA50765F7900637FA81DCE0280C9CC68F08D7030A58E5AD1A62830130A00468AEEEE3FBA3A834EE7353EFBB553375666B668463F4F89F86CB6CA9EE141D4DC4E794FE8056386518A471835C12D1D9774AD6D5ED66289B5259CC434672EE6371117882F4460429724CE54428C33FAD30A8DF7F3B2552694AC26CFBAC0749D213D2E47CDBA5A9658378DA827A17800CE70F3DDF2BBF19B93A9FA2833FD35BB23DF004C90652538430302FCEF25BFAB3454AD6D5ED66289B5278DA827A17800CE739FC3CDB34CE90A4D32BA5DBAC0009BE395957E7521B51C20BC6067A898B09E4090A508E0FED6299176DF2183F8FC7C0D6FB7FEA634EEEAACD04E86FAF290E2D7E9C4E3C761E06A71DD303D21008E29813377AFFFEAFD269A417C69337E82CC2E827F84554CEF50127C277FBC8AE2E8BA83251EDC214901ED5E8D9A59859A8B6B4612C9E9B341BF4089D37D7C0E48F6C5571747095F342E88FB05168BE4CE3AF X-B7AD71C0: AC4F5C86D027EB782CDD5689AFBDA7A213B5FB47DCBC3458834459D11680B505CEFCAD60ED5E5B76CAAF7F39C889F4DB X-C1DE0DAB: C20DE7B7AB408E4181F030C43753B8183A4AFAF3EA6BDC44F3F687384632F7D27E1D096BA193793CEC37B2C929B7C627DD2AE4DD1245DF35B1881A6453793CE9C32612AADDFBE061C61BE10805914D3804EBA3D8E7E5B87ABF8C51168CD8EBDB6D77D8F98F67F34EDC48ACC2A39D04F89CDFB48F4795C241BDAD6C7F3747799A X-C8649E89: 4E36BF7865823D7055A7F0CF078B5EC49A30900B95165D346B71C4B0698719D710DE5CFC261550D149A41DB1F302D2F0BE7F1B982A7AFB4881EC1894CE21C6621D7E09C32AA3244CC338A737D2C31E7B133BD52BECBF72C98894E9C85370243E927AC6DF5659F194 X-D57D3AED: 3ZO7eAau8CL7WIMRKs4sN3D3tLDjz0dLbV79QFUyzQ2Ujvy7cMT6pYYqY16iZVKkSc3dCLJ7zSJH7+u4VD18S7Vl4ZUrpaVfd2+vE6kuoey4m4VkSEu530nj6fImhcD4MUrOEAnl0W826KZ9Q+tr5ycPtXkTV4k65bRjmOUUP8cvGozZ33TWg5HZplvhhXbhDGzqmQDTd6OAevLeAnq3Ra9uf7zvY2zzsIhlcp/Y7m53TZgf2aB4JOg4gkr2bioj6qlzQV0oSZPQG/asqbh38A== X-Mailru-Sender: 583F1D7ACE8F49BD29FC049B2A5BF9632710EBCDEE68D68643F1441273D44B42A57F2940040AB040B79567116EAC6FCF4E830D9205DBEA545646F0D3C63A617F27ACC94E9A535D22112434F685709FCF0DA7A0AF5A3A8387 X-Mras: Ok Subject: [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams X-BeenThere: tarantool-patches@dev.tarantool.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , From: mechanik20051988 via Tarantool-patches Reply-To: mechanik20051988 Errors-To: tarantool-patches-bounces@dev.tarantool.org Sender: "Tarantool-patches" From: mechanik20051988 Implement interactive transactions over iproto streams. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. If any request fails during the transaction, it will not affect the other requests in the transaction. If disconnect occurs when there is some active transaction in stream, this transaction will be rollbacked, if it does not have time to commit before this moment. Part of #5860 @TarantoolBot document Title: interactive transactions was implemented over iproto streams. The main purpose of streams is transactions via iproto. Each stream can start its own transaction, so they allows multiplexing several transactions over one connection. There are multiple ways to begin, commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL with corresponding function (box.begin, box.commit and box.rollback), IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START', 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT, IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is some active transaction in stream, this transaction will be rollbacked, if it does not have time to commit before this moment. Add new command codes for begin, commit and rollback transactions: `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and `IPROTO_TRANSACTION_ROLLBACK 16` accordingly. --- src/box/call.c | 12 -- src/box/errcode.h | 1 + src/box/iproto.cc | 243 +++++++++++++++++++++++++- src/box/iproto_constants.c | 6 + src/box/iproto_constants.h | 6 + src/box/txn.c | 22 +++ src/box/txn.h | 19 ++ test/box-tap/feedback_daemon.test.lua | 2 +- test/box/error.result | 1 + test/box/misc.result | 5 +- 10 files changed, 300 insertions(+), 17 deletions(-) diff --git a/src/box/call.c b/src/box/call.c index a6384efe2..0ce84b1ed 100644 --- a/src/box/call.c +++ b/src/box/call.c @@ -141,8 +141,6 @@ box_process_call(struct call_request *request, struct port *port) const char *name = request->name; assert(name != NULL); uint32_t name_len = mp_decode_strl(&name); - /* Transaction is not started. */ - assert(!in_txn()); int rc; struct port args; @@ -157,11 +155,6 @@ box_process_call(struct call_request *request, struct port *port) } if (rc != 0) return -1; - if (in_txn() != NULL) { - diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); - port_destroy(port); - return -1; - } return 0; } @@ -179,10 +172,5 @@ box_process_eval(struct call_request *request, struct port *port) uint32_t expr_len = mp_decode_strl(&expr); if (box_lua_eval(expr, expr_len, &args, port) != 0) return -1; - if (in_txn() != 0) { - diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); - port_destroy(port); - return -1; - } return 0; } diff --git a/src/box/errcode.h b/src/box/errcode.h index f8fda23c1..a6f096698 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -282,6 +282,7 @@ struct errcode_record { /*227 */_(ER_SYNC_QUEUE_UNCLAIMED, "The synchronous transaction queue doesn't belong to any instance")\ /*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\ /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \ + /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 3b792130b..376abbff0 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -66,6 +66,7 @@ #include "tt_static.h" #include "salad/stailq.h" #include "assoc.h" +#include "txn.h" enum { IPROTO_SALT_SIZE = 32, @@ -79,6 +80,8 @@ enum { struct iproto_connection; struct iproto_stream { + /** Currently active stream transaction or NULL */ + struct txn *txn; /** * Queue of pending requests (iproto messages) for this stream, * processed sequentially. This field is accesable only from @@ -89,6 +92,11 @@ struct iproto_stream { uint64_t id; /** This stream connection */ struct iproto_connection *connection; + /** + * Pre-allocated disconnect msg to gracefully rollback stream + * transaction and destroy stream object. + */ + struct cmsg on_disconnect; }; /** @@ -135,6 +143,10 @@ struct iproto_thread { /** * Static routes for this iproto thread */ + struct cmsg_hop begin_route[2]; + struct cmsg_hop commit_route[2]; + struct cmsg_hop rollback_route[2]; + struct cmsg_hop rollback_on_disconnect_stream_route[2]; struct cmsg_hop destroy_route[2]; struct cmsg_hop disconnect_route[2]; struct cmsg_hop misc_route[2]; @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id) return NULL; } errinj_stream_count_add(1); + stream->txn = NULL; stailq_create(&stream->pending_requests); stream->id = stream_id; stream->connection = connection; return stream; } +static inline void +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream) +{ + struct iproto_connection *conn = stream->connection; + struct iproto_thread *iproto_thread = conn->iproto_thread; + struct cmsg_hop *route = + iproto_thread->rollback_on_disconnect_stream_route; + cmsg_init(&stream->on_disconnect, route); + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect); +} + /** * Return true if we have not enough spare messages * in the message pool. @@ -670,6 +694,7 @@ static void iproto_stream_delete(struct iproto_stream *stream) { assert(stailq_empty(&stream->pending_requests)); + assert(stream->txn == NULL); errinj_stream_count_add(-1); mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream); } @@ -716,6 +741,7 @@ static inline bool iproto_connection_is_idle(struct iproto_connection *con) { return con->long_poll_count == 0 && + mh_size(con->streams) == 0 && ibuf_used(&con->ibuf[0]) == 0 && ibuf_used(&con->ibuf[1]) == 0; } @@ -805,6 +831,23 @@ iproto_connection_close(struct iproto_connection *con) * is done only once. */ con->p_ibuf->wpos -= con->parse_size; + mh_int_t node; + mh_foreach(con->streams, node) { + struct iproto_stream *stream = (struct iproto_stream *) + mh_i64ptr_node(con->streams, node)->val; + /** + * If stream requests queue is empty, it means that + * that there is some active transaction which was + * not commited yet. We need to rollback it, since + * we push on_disconnect message to tx thread here. + * If stream requests queue is not empty, it means + * that stream processing some request in tx thread + * now. We destroy stream in `net_send_msg` after + * processing all requests. + */ + if (stailq_empty(&stream->pending_requests)) + iproto_stream_push_on_disconnect_msg(stream); + } cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg); assert(con->state == IPROTO_CONNECTION_ALIVE); con->state = IPROTO_CONNECTION_CLOSED; @@ -965,6 +1008,7 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg) */ errinj_stream_msg_count_add(1); stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val; + assert(stream != NULL); msg->stream = stream; /* * If the request queue in the stream is not empty, it means @@ -1407,6 +1451,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, uint64_t stream_id; uint8_t type; bool request_is_not_for_stream; + bool request_is_only_for_stream; struct iproto_thread *iproto_thread = msg->connection->iproto_thread; if (xrow_header_decode(&msg->header, pos, reqend, true)) @@ -1418,11 +1463,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, request_is_not_for_stream = ((type > IPROTO_TYPE_STAT_MAX && type != IPROTO_PING) || type == IPROTO_AUTH); + request_is_only_for_stream = + (type == IPROTO_TRANSACTION_BEGIN || + type == IPROTO_TRANSACTION_COMMIT || + type == IPROTO_TRANSACTION_ROLLBACK); if (stream_id != 0 && request_is_not_for_stream) { diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM, iproto_type_name(type)); goto error; + } else if (stream_id == 0 && request_is_only_for_stream) { + diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, + iproto_type_name(type)); + goto error; } /* @@ -1450,6 +1503,15 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, sizeof(*(iproto_thread->dml_route))); cmsg_init(&msg->base, iproto_thread->dml_route[type]); break; + case IPROTO_TRANSACTION_BEGIN: + cmsg_init(&msg->base, iproto_thread->begin_route); + break; + case IPROTO_TRANSACTION_COMMIT: + cmsg_init(&msg->base, iproto_thread->commit_route); + break; + case IPROTO_TRANSACTION_ROLLBACK: + cmsg_init(&msg->base, iproto_thread->rollback_route); + break; case IPROTO_CALL_16: case IPROTO_CALL: case IPROTO_EVAL: @@ -1523,6 +1585,38 @@ tx_fiber_init(struct session *session, uint64_t sync) fiber_set_user(f, &session->credentials); } +static void +tx_process_rollback_on_disconnect(struct cmsg *m) +{ + struct iproto_stream *stream = + container_of(m, struct iproto_stream, + on_disconnect); + + if (stream->txn != NULL) { + tx_fiber_init(stream->connection->session, 0); + txn_attach(stream->txn); + if (box_txn_rollback() != 0) + panic("failed to rollback transaction on disconnect"); + stream->txn = NULL; + } +} + +static void +net_finish_rollback_on_disconnect(struct cmsg *m) +{ + struct iproto_stream *stream = + container_of(m, struct iproto_stream, + on_disconnect); + struct iproto_connection *con = stream->connection; + + struct mh_i64ptr_node_t node = { stream->id, NULL }; + mh_i64ptr_remove(con->streams, &node, 0); + iproto_stream_delete(stream); + assert(!evio_has_fd(&con->input)); + if (con->state == IPROTO_CONNECTION_PENDING_DESTROY) + iproto_connection_try_to_start_destroy(con); +} + static void tx_process_disconnect(struct cmsg *m) { @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos) } } +/** + * Since the processing of requests within a transaction + * for a stream can occur in different fibers, we store + * a pointer to transaction in the stream structure. + * Check if message belongs to stream and there is active + * transaction for this stream. In case it is so, sets this + * transaction for current fiber. + */ +static inline void +tx_prepare_transaction_for_request(struct iproto_msg *msg) +{ + if (msg->stream != NULL && msg->stream->txn != NULL) { + txn_attach(msg->stream->txn); + msg->stream->txn = NULL; + } + assert(!in_txn() || msg->stream != NULL); +} + static inline struct iproto_msg * tx_accept_msg(struct cmsg *m) { struct iproto_msg *msg = (struct iproto_msg *) m; tx_accept_wpos(msg->connection, &msg->wpos); tx_fiber_init(msg->connection->session, msg->header.sync); + tx_prepare_transaction_for_request(msg); return msg; } +static inline void +tx_end_msg(struct iproto_msg *msg) +{ + if (msg->stream != NULL) + msg->stream->txn = txn_detach(); +} + /** * Write error message to the output buffer and advance * write position. Doesn't throw. @@ -1690,6 +1810,7 @@ tx_reply_iproto_error(struct cmsg *m) iproto_reply_error(out, diag_last_error(&msg->diag), msg->header.sync, ::schema_version); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); } /** Inject a short delay on tx request processing for testing. */ @@ -1702,6 +1823,72 @@ tx_inject_delay(void) }); } +static void +tx_process_begin(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_begin() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + +static void +tx_process_commit(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_commit() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + +static void +tx_process_rollback(struct cmsg *m) +{ + struct iproto_msg *msg = tx_accept_msg(m); + struct obuf *out; + + if (tx_check_schema(msg->header.schema_version)) + goto error; + + if (box_txn_rollback() != 0) + goto error; + + out = msg->connection->tx.p_obuf; + iproto_reply_ok(out, msg->header.sync, ::schema_version); + iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); + return; +error: + tx_reply_error(msg); + tx_end_msg(msg); +} + static void tx_process1(struct cmsg *m) { @@ -1723,9 +1910,11 @@ tx_process1(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, tuple != 0); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1766,9 +1955,11 @@ tx_process_select(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, count); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static int @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m) trigger_clear(&fiber_on_yield); + if (in_txn() != NULL && msg->header.stream_id == 0) { + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE); + port_destroy(&port); + goto error; + } + if (rc != 0) goto error; @@ -1856,9 +2053,11 @@ tx_process_call(struct cmsg *m) iproto_reply_select(out, &svp, msg->header.sync, ::schema_version, count); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1867,6 +2066,7 @@ tx_process_misc(struct cmsg *m) struct iproto_msg *msg = tx_accept_msg(m); struct iproto_connection *con = msg->connection; struct obuf *out = con->tx.p_obuf; + assert(!(msg->header.type != IPROTO_PING && in_txn())); if (tx_check_schema(msg->header.schema_version)) goto error; @@ -1899,9 +2099,11 @@ tx_process_misc(struct cmsg *m) } catch (Exception *e) { tx_reply_error(msg); } + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -1995,9 +2197,11 @@ tx_process_sql(struct cmsg *m) port_destroy(&port); iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version); iproto_wpos_create(&msg->wpos, out); + tx_end_msg(msg); return; error: tx_reply_error(msg); + tx_end_msg(msg); } static void @@ -2007,6 +2211,7 @@ tx_process_replication(struct cmsg *m) struct iproto_connection *con = msg->connection; struct ev_io io; coio_create(&io, con->input.fd); + assert(!in_txn()); try { switch (msg->header.type) { case IPROTO_JOIN: @@ -2064,9 +2269,24 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg) errinj_stream_msg_count_add(-1); if (stailq_empty(&stream->pending_requests)) { - struct mh_i64ptr_node_t node = { stream->id, NULL }; - mh_i64ptr_remove(con->streams, &node, 0); - iproto_stream_delete(stream); + /* + * If no more messages for the current stream + * and no transaction started, then delete it. + */ + if (stream->txn == NULL) { + struct mh_i64ptr_node_t node = { stream->id, NULL }; + mh_i64ptr_remove(con->streams, &node, 0); + iproto_stream_delete(stream); + } else if (!evio_has_fd(&con->input)) { + /* + * Here we are in case when connection was closed, + * there is no messages in stream queue, but there + * is some active transaction in stream. + * Send disconnect message to rollback this + * transaction. + */ + iproto_stream_push_on_disconnect_msg(stream); + } } else { /* * If there are new messages for this stream @@ -2407,6 +2627,23 @@ iproto_session_push(struct session *session, struct port *port) static inline void iproto_thread_init_routes(struct iproto_thread *iproto_thread) { + iproto_thread->begin_route[0] = + { tx_process_begin, &iproto_thread->net_pipe }; + iproto_thread->begin_route[1] = + { net_send_msg, NULL }; + iproto_thread->commit_route[0] = + { tx_process_commit, &iproto_thread->net_pipe }; + iproto_thread->commit_route[1] = + { net_send_msg, NULL }; + iproto_thread->rollback_route[0] = + { tx_process_rollback, &iproto_thread->net_pipe }; + iproto_thread->rollback_route[1] = + { net_send_msg, NULL }; + iproto_thread->rollback_on_disconnect_stream_route[0] = + { tx_process_rollback_on_disconnect, + &iproto_thread->net_pipe }; + iproto_thread->rollback_on_disconnect_stream_route[1] = + { net_finish_rollback_on_disconnect, NULL }; iproto_thread->destroy_route[0] = { tx_process_destroy, &iproto_thread->net_pipe }; iproto_thread->destroy_route[1] = diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index f2902946a..913a64de5 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -166,6 +166,9 @@ const char *iproto_type_strs[] = "EXECUTE", NULL, /* NOP */ "PREPARE", + "BEGIN", + "COMMIT", + "ROLLBACK", }; #define bit(c) (1ULL<fiber_on_yield); + } + trigger_clear(&txn->fiber_on_stop); + fiber_set_txn(fiber(), NULL); + return txn; +} + +void +txn_attach(struct txn *txn) +{ + assert(txn != NULL); + fiber_set_txn(fiber(), txn); +} diff --git a/src/box/txn.h b/src/box/txn.h index 8741dc6a1..f11144567 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -457,6 +457,25 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn) fiber->storage.txn = txn; } +/** + * Detach transaction from fiber. + * By default if the fiber is stopped the transaction started + * in this fiber is rollback. This function detaches transaction + * from fiber - detached transaction does not rollback in case + * when fiber stopped, but can be aborted in case it does not + * support yeild. + */ +struct txn * +txn_detach(void); + +/** + * Attach transaction to fiber. + * Attach @a txn that has been detached previously and saved + * somewhere to a new fiber. + */ +void +txn_attach(struct txn *txn); + /** * Start a transaction explicitly. * @pre no transaction is active diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua index a2e041649..f700f3f72 100755 --- a/test/box-tap/feedback_daemon.test.lua +++ b/test/box-tap/feedback_daemon.test.lua @@ -251,7 +251,7 @@ box.space.features_sync:drop() local function check_stats(stat) local sub = test:test('feedback operation stats') - sub:plan(18) + sub:plan(21) local box_stat = box.stat() local net_stat = box.stat.net() for op, val in pairs(box_stat) do diff --git a/test/box/error.result b/test/box/error.result index f80fdfed5..bc804197a 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -448,6 +448,7 @@ t; | 227: box.error.SYNC_QUEUE_UNCLAIMED | 228: box.error.SYNC_QUEUE_FOREIGN | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM + | 230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM | ... test_run:cmd("setopt delimiter ''"); diff --git a/test/box/misc.result b/test/box/misc.result index b62a64355..c86245914 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -136,11 +136,14 @@ end; t; --- - - DELETE + - COMMIT - SELECT + - ROLLBACK - INSERT - EVAL - - CALL - ERROR + - CALL + - BEGIN - PREPARE - REPLACE - UPSERT -- 2.20.1