* [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
` (6 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy
From: Vladimir Davydov <vdavydov@tarantool.org>
---
src/box/xrow.c | 1 -
src/box/xrow.h | 2 --
2 files changed, 3 deletions(-)
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 8ab8b2768..a61c6e345 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1139,7 +1139,6 @@ error:
}
memset(request, 0, sizeof(*request));
- request->header = row;
uint32_t map_size = mp_decode_map(&data);
for (uint32_t i = 0; i < map_size; ++i) {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index c6e8ed0fd..0f2fcf94a 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -304,8 +304,6 @@ xrow_decode_raft(const struct xrow_header *row, struct raft_request *r,
* CALL/EVAL request.
*/
struct call_request {
- /** Request header */
- const struct xrow_header *header;
/** Function name for CALL request. MessagePack String. */
const char *name;
/** Expression for EVAL request. MessagePack String. */
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
` (5 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy
From: Vladimir Davydov <vdavydov@tarantool.org>
To apply a client request, we only need to know its type and body. All
the meta information, such as LSN, TSN, or replica id, must be set by
WAL. Currently, however, it isn't necessarily true: iproto leaves a
request header received over iproto as is, and tx will reuse the header
instead of allocating a new one in this case, which is needed to process
replication requests, see txn_add_redo().
Unless a client actually sets one of those meta fields, this causes no
problems. However, if we added transaction support to the replication
protocol, reusing the header would result in broken xlog, because
currently, all requests received over iproto have the is_commit field
set in xrow_header for the lack of TSN, while is_commit must only be set
for the final statement in a transaction. One way to fix it would be
clearing is_commit explicitly in iproto, but ignoring the whole header
received over iproto looks more logical and error-proof.
Needed for #5860
---
src/box/iproto.cc | 6 ++++++
src/box/xrow.h | 2 +-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 5cc69b77f..dcf60e1be 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1264,6 +1264,12 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
if (xrow_decode_dml(&msg->header, &msg->dml,
dml_request_key_map(type)))
goto error;
+ /*
+ * In contrast to replication requests, for a client request
+ * the xrow header is set by WAL, which generates LSNs and sets
+ * replica id. Ignore the header received over network.
+ */
+ msg->dml.header = NULL;
assert(type < sizeof(iproto_thread->dml_route) /
sizeof(*(iproto_thread->dml_route)));
cmsg_init(&msg->base, iproto_thread->dml_route[type]);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0f2fcf94a..48b8b55f5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -163,7 +163,7 @@ struct request {
/*
* Either log row, or network header, or NULL, depending
* on where this packet originated from: the write ahead
- * log/snapshot, client request, or a Lua request.
+ * log/snapshot, repliation, or a client request.
*/
struct xrow_header *header;
/**
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 1/8] xrow: remove unused call_request::header mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 2/8] iproto: clear request::header for client requests mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
` (4 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
For further implementation of streams, we need to separate
requests belonging to and not belonging to streams. For this
purpose, the stream ID field was added to the iproto binary
protocol. For requests that do not belong to stream, this field
is omitted or equal to zero. For requests belonging to stream,
we use this field to determine which stream the request belongs to.
Part of #5860
@TarantoolBot document
Title: new field in binary iproto protocol
Add new field to binary iproto protocol.
`IPROTO_STREAM_ID 0x0a` determines whether a request
belongs to a stream or not. If this field is omited
or equal to zero this request doesn't belongs to stream.
---
src/box/iproto_constants.c | 4 +-
src/box/iproto_constants.h | 1 +
src/box/xrow.c | 8 ++
src/box/xrow.h | 5 ++
test/unit/xrow.cc | 7 +-
test/unit/xrow.result | 168 +++++++++++++++++++------------------
6 files changed, 109 insertions(+), 84 deletions(-)
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index addda39dc..f2902946a 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -43,10 +43,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
/* 0x07 */ MP_UINT, /* IPROTO_GROUP_ID */
/* 0x08 */ MP_UINT, /* IPROTO_TSN */
/* 0x09 */ MP_UINT, /* IPROTO_FLAGS */
+ /* 0x0a */ MP_UINT, /* IPROTO_STREAM_ID */
/* }}} */
/* {{{ unused */
- /* 0x0a */ MP_UINT,
/* 0x0b */ MP_UINT,
/* 0x0c */ MP_UINT,
/* 0x0d */ MP_UINT,
@@ -198,7 +198,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
"group id", /* 0x07 */
"tsn", /* 0x08 */
"flags", /* 0x09 */
- NULL, /* 0x0a */
+ "stream_id", /* 0x0a */
NULL, /* 0x0b */
NULL, /* 0x0c */
NULL, /* 0x0d */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 3d78ce2bb..b9498868c 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -72,6 +72,7 @@ enum iproto_key {
IPROTO_GROUP_ID = 0x07,
IPROTO_TSN = 0x08,
IPROTO_FLAGS = 0x09,
+ IPROTO_STREAM_ID = 0x0a,
/* Leave a gap for other keys in the header. */
IPROTO_SPACE_ID = 0x10,
IPROTO_INDEX_ID = 0x11,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index a61c6e345..7df1af4ab 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -186,6 +186,9 @@ error:
flags = mp_decode_uint(pos);
header->flags = flags;
break;
+ case IPROTO_STREAM_ID:
+ header->stream_id = mp_decode_uint(pos);
+ break;
default:
/* unknown header */
mp_next(pos);
@@ -319,6 +322,11 @@ xrow_header_encode(const struct xrow_header *header, uint64_t sync,
flags_to_encode |= IPROTO_FLAG_COMMIT;
}
}
+ if (header->stream_id != 0) {
+ d = mp_encode_uint(d, IPROTO_STREAM_ID);
+ d = mp_encode_uint(d, header->stream_id);
+ map_size++;
+ }
if (flags_to_encode != 0) {
d = mp_encode_uint(d, IPROTO_FLAGS);
d = mp_encode_uint(d, flags_to_encode);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 48b8b55f5..cb83fddff 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -81,6 +81,11 @@ struct xrow_header {
* transaction.
*/
int64_t tsn;
+ /**
+ * Stream id. Used in iproto binary protocol to identify stream.
+ * Zero if stream is not used.
+ */
+ uint64_t stream_id;
/** Transaction meta flags set only in the last transaction row. */
union {
uint8_t flags;
diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc
index b6018eed9..2c0dd88b6 100644
--- a/test/unit/xrow.cc
+++ b/test/unit/xrow.cc
@@ -220,8 +220,10 @@ test_xrow_header_encode_decode()
header.bodycnt = 0;
header.tsn = header.lsn;
uint64_t sync = 100500;
+ uint64_t stream_id = 1;
for (int opt_idx = 0; opt_idx < bit_comb_count; opt_idx++) {
- plan(12);
+ plan(13);
+ header.stream_id = stream_id++;
header.is_commit = opt_idx & 0x01;
header.wait_sync = opt_idx >> 1 & 0x01;
header.wait_ack = opt_idx >> 2 & 0x01;
@@ -229,7 +231,7 @@ test_xrow_header_encode_decode()
is(1, xrow_header_encode(&header, sync, vec, 200), "encode");
int fixheader_len = 200;
pos = (char *)vec[0].iov_base + fixheader_len;
- uint32_t exp_map_size = 5;
+ uint32_t exp_map_size = 6;
/*
* header.is_commit flag isn't encoded, since this row looks
* like a single-statement transaction.
@@ -249,6 +251,7 @@ test_xrow_header_encode_decode()
end += vec[0].iov_len;
is(xrow_header_decode(&decoded_header, &begin, end, true), 0,
"header decode");
+ is(header.stream_id, decoded_header.stream_id, "decoded stream_id");
is(header.is_commit, decoded_header.is_commit, "decoded is_commit");
is(header.wait_sync, decoded_header.wait_sync, "decoded wait_sync");
is(header.wait_ack, decoded_header.wait_ack, "decoded wait_ack");
diff --git a/test/unit/xrow.result b/test/unit/xrow.result
index 3b705d5ba..1ca222d37 100644
--- a/test/unit/xrow.result
+++ b/test/unit/xrow.result
@@ -43,117 +43,125 @@
ok 1 - subtests
1..9
ok 1 - bad msgpack end
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 2 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 3 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 4 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 5 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 6 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 7 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 8 - subtests
- 1..12
+ 1..13
ok 1 - encode
ok 2 - header map size
ok 3 - header decode
- ok 4 - decoded is_commit
- ok 5 - decoded wait_sync
- ok 6 - decoded wait_ack
- ok 7 - decoded type
- ok 8 - decoded replica_id
- ok 9 - decoded lsn
- ok 10 - decoded tm
- ok 11 - decoded sync
- ok 12 - decoded bodycnt
+ ok 4 - decoded stream_id
+ ok 5 - decoded is_commit
+ ok 6 - decoded wait_sync
+ ok 7 - decoded wait_ack
+ ok 8 - decoded type
+ ok 9 - decoded replica_id
+ ok 10 - decoded lsn
+ ok 11 - decoded tm
+ ok 12 - decoded sync
+ ok 13 - decoded bodycnt
ok 9 - subtests
ok 2 - subtests
1..1
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
` (2 preceding siblings ...)
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 3/8] iproto: implement stream id in binary iproto protocol mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
` (3 subsequent siblings)
7 siblings, 0 replies; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
There was no check for successful memory allocation in `new` and `clear`
functions for mhash table. And if the memory was not allocated, a null
pointer dereference occured.
---
src/lib/salad/mhash.h | 99 +++++++++++++++++++++++++++---------------
test/unit/mhash_body.c | 4 +-
2 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/src/lib/salad/mhash.h b/src/lib/salad/mhash.h
index b555cad4c..74235eeaa 100644
--- a/src/lib/salad/mhash.h
+++ b/src/lib/salad/mhash.h
@@ -157,7 +157,7 @@ struct _mh(t) {
#define MH_DENSITY 0.7
struct _mh(t) * _mh(new)();
-void _mh(clear)(struct _mh(t) *h);
+int _mh(clear)(struct _mh(t) *h);
void _mh(delete)(struct _mh(t) *h);
void _mh(resize)(struct _mh(t) *h, mh_arg_t arg);
int _mh(start_resize)(struct _mh(t) *h, mh_int_t buckets, mh_int_t batch,
@@ -399,23 +399,50 @@ _mh(del_resize)(struct _mh(t) *h, mh_int_t x,
struct _mh(t) *
_mh(new)()
{
- struct _mh(t) *h = (struct _mh(t) *) calloc(1, sizeof(*h));
- h->shadow = (struct _mh(t) *) calloc(1, sizeof(*h));
+ struct _mh(t) *h = (struct _mh(t) *)calloc(1, sizeof(*h));
+ if (h == NULL)
+ return NULL;
+ h->shadow = (struct _mh(t) *)calloc(1, sizeof(*h));
+ if (h->shadow == NULL)
+ goto fail;
h->prime = 0;
h->n_buckets = __ac_prime_list[h->prime];
- h->p = (mh_node_t *) calloc(h->n_buckets, sizeof(mh_node_t));
+ h->p = (mh_node_t *)calloc(h->n_buckets, sizeof(mh_node_t));
+ if (h->p == NULL)
+ goto fail;
#if !mh_bytemap
- h->b = (uint32_t *) calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
+ h->b = (uint32_t *)calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
#else
- h->b = (uint8_t *) calloc(h->n_buckets, sizeof(uint8_t));
+ h->b = (uint8_t *)calloc(h->n_buckets, sizeof(uint8_t));
#endif
+ if (h->b == NULL)
+ goto fail;
h->upper_bound = h->n_buckets * MH_DENSITY;
return h;
+
+fail:
+ free(h->p);
+ free(h->shadow);
+ free(h);
+ return NULL;
}
-void
+int
_mh(clear)(struct _mh(t) *h)
{
+ mh_int_t n_buckets = __ac_prime_list[h->prime];
+ mh_node_t *p = (mh_node_t *)calloc(n_buckets, sizeof(mh_node_t));
+ if (p == NULL)
+ return -1;
+#if !mh_bytemap
+ uint32_t *b = (uint32_t *)calloc(n_buckets / 16 + 1, sizeof(uint32_t));
+#else
+ uint8_t *b = (uint8_t *)calloc(n_buckets, sizeof(uint8_t));
+#endif
+ if (b == NULL) {
+ free(p);
+ return -1;
+ }
if (h->shadow->p) {
free(h->shadow->p);
free(h->shadow->b);
@@ -424,15 +451,12 @@ _mh(clear)(struct _mh(t) *h)
free(h->p);
free(h->b);
h->prime = 0;
- h->n_buckets = __ac_prime_list[h->prime];
- h->p = (mh_node_t *) calloc(h->n_buckets, sizeof(mh_node_t));
-#if !mh_bytemap
- h->b = (uint32_t *) calloc(h->n_buckets / 16 + 1, sizeof(uint32_t));
-#else
- h->b = (uint8_t *) calloc(h->n_buckets, sizeof(uint8_t));
-#endif
+ h->n_buckets = n_buckets;
+ h->p = p;
+ h->b = b;
h->size = 0;
h->upper_bound = h->n_buckets * MH_DENSITY;
+ return 0;
}
void
@@ -515,42 +539,47 @@ _mh(start_resize)(struct _mh(t) *h, mh_int_t buckets, mh_int_t batch,
/* hash size is already greater than requested */
return 0;
}
- while (h->prime < __ac_HASH_PRIME_SIZE - 1) {
- if (__ac_prime_list[h->prime] >= buckets)
+ mh_int_t new_prime = h->prime;
+ while (new_prime < __ac_HASH_PRIME_SIZE - 1) {
+ if (__ac_prime_list[new_prime] >= buckets)
break;
- h->prime += 1;
+ new_prime += 1;
}
-
- h->batch = batch > 0 ? batch : h->n_buckets / (256 * 1024);
- if (h->batch < 256) {
+ mh_int_t new_batch = batch > 0 ? batch : h->n_buckets / (256 * 1024);
+ if (new_batch < 256) {
/*
* Minimal batch must be greater or equal to
* 1 / (1 - f), where f is upper bound percent
* = MH_DENSITY
*/
- h->batch = 256;
+ new_batch = 256;
}
- struct _mh(t) *s = h->shadow;
- memcpy(s, h, sizeof(*h));
- s->resize_position = 0;
- s->n_buckets = __ac_prime_list[h->prime];
- s->upper_bound = s->n_buckets * MH_DENSITY;
- s->n_dirty = 0;
- s->size = 0;
- s->p = (mh_node_t *) malloc(s->n_buckets * sizeof(mh_node_t));
- if (s->p == NULL)
+ mh_int_t n_buckets = __ac_prime_list[new_prime];
+ mh_node_t *p = (mh_node_t *)malloc(n_buckets * sizeof(mh_node_t));
+ if (p == NULL)
return -1;
#if !mh_bytemap
- s->b = (uint32_t *) calloc(s->n_buckets / 16 + 1, sizeof(uint32_t));
+ uint32_t *b = (uint32_t *)calloc(n_buckets / 16 + 1, sizeof(uint32_t));
#else
- s->b = (uint8_t *) calloc(s->n_buckets, sizeof(uint8_t));
+ uint8_t *b = (uint8_t *)calloc(n_buckets, sizeof(uint8_t));
#endif
- if (s->b == NULL) {
- free(s->p);
- s->p = NULL;
+ if (b == NULL) {
+ free(p);
return -1;
}
+
+ h->prime = new_prime;
+ h->batch = new_batch;
+ struct _mh(t) *s = h->shadow;
+ memcpy(s, h, sizeof(*h));
+ s->resize_position = 0;
+ s->n_buckets = n_buckets;
+ s->upper_bound = s->n_buckets * MH_DENSITY;
+ s->n_dirty = 0;
+ s->size = 0;
+ s->p = p;
+ s->b = b;
_mh(resize)(h, arg);
return 0;
diff --git a/test/unit/mhash_body.c b/test/unit/mhash_body.c
index 458817fb1..324c72a43 100644
--- a/test/unit/mhash_body.c
+++ b/test/unit/mhash_body.c
@@ -23,7 +23,7 @@ h = init();
destroy(h);
h = init();
-clear(h);
+fail_unless(clear(h) == 0);
/* access not yet initialized hash */
clr(9);
@@ -59,7 +59,7 @@ tst(7);
tst(8);
tst(9);
-clear(h);
+fail_unless(clear(h) == 0);
/* after clear no items should exist */
clr(1);
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
` (3 preceding siblings ...)
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 4/8] salad: fix segfault in case when mhash table allocation failure mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
` (2 subsequent siblings)
7 siblings, 1 reply; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
Implement streams in iproto. There is a hash table of streams for
each connection. When a new request comes with a non-zero stream ID,
we look for the stream with such ID in this table and if it does not
exist, we create it. The request is placed in the queue of pending
requests, and if this queue was empty at the time of its receipt, it
is pushed to the tx thread for processing. When a request belonging to
stream returns to the network thread after processing is completed, we
take the next request out of the queue of pending requests and send it
for processing to tx thread. If there is no pending requests we remove
stream object from hash table and destroy it. Requests with zero stream
ID are processed in the old way.
Part of #5860
@TarantoolBot document
Title: streams are implemented in iproto
A distinctive feature of streams is that all requests in them
are processed sequentially. The execution of the next request
in stream will not start until the previous one is completed.
To separate requests belonging to and not belonging to streams
we use stream ID field in binary iproto protocol: requests with
non-zero stream ID belongs to some stream. Stream ID is unique
within the connection and indicates which stream the request
belongs to. For streams from different connections, the IDs may
be the same.
---
src/box/errcode.h | 1 +
src/box/iproto.cc | 228 ++++++++++++++++++++++++++++++++++++++++-
src/lib/core/errinj.h | 2 +
test/box/errinj.result | 2 +
test/box/error.result | 1 +
5 files changed, 229 insertions(+), 5 deletions(-)
diff --git a/src/box/errcode.h b/src/box/errcode.h
index ef2b2e9b1..f8fda23c1 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -281,6 +281,7 @@ struct errcode_record {
/*226 */_(ER_NOT_LEADER, "The instance is not a leader. New leader is %u")\
/*227 */_(ER_SYNC_QUEUE_UNCLAIMED, "The synchronous transaction queue doesn't belong to any instance")\
/*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\
+ /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index dcf60e1be..3b792130b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -64,6 +64,8 @@
#include "execute.h"
#include "errinj.h"
#include "tt_static.h"
+#include "salad/stailq.h"
+#include "assoc.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -74,6 +76,21 @@ enum {
ENDPOINT_NAME_MAX = 10
};
+struct iproto_connection;
+
+struct iproto_stream {
+ /**
+ * Queue of pending requests (iproto messages) for this stream,
+ * processed sequentially. This field is accesable only from
+ * iproto thread. Queue items has iproto_msg type.
+ */
+ struct stailq pending_requests;
+ /** Id of this stream, used as a key in streams hash table */
+ uint64_t id;
+ /** This stream connection */
+ struct iproto_connection *connection;
+};
+
/**
* A position in connection output buffer.
* Since we use rotating buffers to recycle memory,
@@ -136,6 +153,7 @@ struct iproto_thread {
*/
struct mempool iproto_msg_pool;
struct mempool iproto_connection_pool;
+ struct mempool iproto_stream_pool;
/*
* List of stopped connections
*/
@@ -304,6 +322,16 @@ struct iproto_msg
* and the connection must be closed.
*/
bool close_connection;
+ /**
+ * A stailq_entry to hold message in stream.
+ * All messages processed in stream sequently. Before processing
+ * all messages added to queue of pending requests. If this queue
+ * was empty message begins to be processed, otherwise it waits until
+ * all previous messages are processed.
+ */
+ struct stailq_entry in_stream;
+ /** Stream that owns this message, or NULL. */
+ struct iproto_stream *stream;
};
static struct iproto_msg *
@@ -505,6 +533,11 @@ struct iproto_connection
*/
enum iproto_connection_state state;
struct rlist in_stop_list;
+ /**
+ * Hash table that holds all streams for this connection.
+ * This field is accesable only from iproto thread.
+ */
+ struct mh_i64ptr_t *streams;
/**
* Kharon is used to implement box.session.push().
* When a new push is ready, tx uses kharon to notify
@@ -572,6 +605,48 @@ struct iproto_connection
} while (0);
#endif
+/*
+ * TODO(gh-6293): Implement necessary statistic for iproto streams
+ * and remove it from errinj.
+ */
+static inline void
+errinj_stream_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+ struct errinj *inj =
+ errinj(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT);
+ __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static inline void
+errinj_stream_msg_count_add(MAYBE_UNUSED int val)
+{
+#ifndef NDEBUG
+ struct errinj *inj =
+ errinj(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT);
+ __atomic_add_fetch(&inj->iparam, val, __ATOMIC_SEQ_CST);
+#endif
+}
+
+static struct iproto_stream *
+iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
+{
+ struct iproto_thread *iproto_thread = connection->iproto_thread;
+ struct iproto_stream *stream = (struct iproto_stream *)
+ mempool_alloc(&iproto_thread->iproto_stream_pool);
+ if (stream == NULL) {
+ diag_set(OutOfMemory, sizeof(*stream),
+ "mempool_alloc", "stream");
+ return NULL;
+ }
+ errinj_stream_count_add(1);
+ stailq_create(&stream->pending_requests);
+ stream->id = stream_id;
+ stream->connection = connection;
+ return stream;
+}
+
/**
* Return true if we have not enough spare messages
* in the message pool.
@@ -591,6 +666,14 @@ iproto_msg_delete(struct iproto_msg *msg)
iproto_resume(iproto_thread);
}
+static void
+iproto_stream_delete(struct iproto_stream *stream)
+{
+ assert(stailq_empty(&stream->pending_requests));
+ errinj_stream_count_add(-1);
+ mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
+}
+
static struct iproto_msg *
iproto_msg_new(struct iproto_connection *con)
{
@@ -609,6 +692,7 @@ iproto_msg_new(struct iproto_connection *con)
}
msg->close_connection = false;
msg->connection = con;
+ msg->stream = NULL;
rmean_collect(con->iproto_thread->rmean, IPROTO_REQUESTS, 1);
return msg;
}
@@ -836,6 +920,63 @@ iproto_connection_input_buffer(struct iproto_connection *con)
return new_ibuf;
}
+/**
+ * Check if message belongs to stream (stream_id != 0), and if it
+ * is so create new stream or get stream from connection streams
+ * hash table. Put message to stream pending messages list.
+ * @retval 0 - the message is ready to push to TX thread (either if
+ * stream_id is not set (is zero) or the stream is not
+ * processing other messages).
+ * 1 - the message is postponed because its stream is busy
+ * processing previous message(s).
+ * -1 - memory error.
+ */
+static int
+iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
+{
+ uint64_t stream_id = msg->header.stream_id;
+ if (stream_id == 0)
+ return 0;
+
+ struct iproto_connection *con = msg->connection;
+ struct iproto_stream *stream = NULL;
+ mh_int_t pos = mh_i64ptr_find(con->streams, stream_id, 0);
+ if (pos == mh_end(con->streams)) {
+ stream = iproto_stream_new(msg->connection, msg->header.stream_id);
+ if (stream == NULL)
+ return -1;
+ struct mh_i64ptr_node_t node;
+ node.key = stream_id;
+ node.val = stream;
+ pos = mh_i64ptr_put(con->streams, &node, NULL, NULL);
+ if (pos == mh_end(con->streams)) {
+ iproto_stream_delete(stream);
+ diag_set(OutOfMemory, pos + 1, "mh_streams_put",
+ "mh_streams_node");
+ return -1;
+ }
+ }
+ /*
+ * Not all messages belongs to stream. We can't determine which
+ * messages belong to stream in `iproto_msg_new`, so we increment
+ * ERRINJ_IPROTO_STREAM_MSG_COUNT here, when we already know it.
+ * In `iproto_msg_delete` we decrement ERRINJ_IPROTO_STREAM_MSG_COUNT
+ * only if msg->stream != NULL.
+ */
+ errinj_stream_msg_count_add(1);
+ stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+ msg->stream = stream;
+ /*
+ * If the request queue in the stream is not empty, it means
+ * that some previous message wasn't processed yet. Regardless
+ * of this, we put the message in the queue, but we start processing
+ * the message only if the message queue in the stream was empty.
+ */
+ bool was_not_empty = !stailq_empty(&stream->pending_requests);
+ stailq_add_tail_entry(&stream->pending_requests, msg, in_stream);
+ return was_not_empty ? 1 : 0;
+}
+
/**
* Enqueue all requests which were read up. If a request limit is
* reached - stop the connection input even if not the whole batch
@@ -845,7 +986,7 @@ iproto_connection_input_buffer(struct iproto_connection *con)
* @param in Buffer to parse.
*
* @retval 0 Success.
- * @retval -1 Invalid MessagePack error.
+ * @retval -1 Invalid MessagePack or memory error.
*/
static inline int
iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in)
@@ -898,12 +1039,25 @@ err_msgpack:
msg->len = reqend - reqstart; /* total request length */
iproto_msg_decode(msg, &pos, reqend, &stop_input);
+
+ int rc = iproto_msg_start_processing_in_stream(msg);
+ if (rc < 0) {
+ iproto_msg_delete(msg);
+ return -1;
+ }
/*
- * This can't throw, but should not be
- * done in case of exception.
+ * rc > 0, means that stream pending requests queue is not
+ * empty, skip push.
*/
- cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
- n_requests++;
+ if (rc == 0) {
+ /*
+ * This can't throw, but should not be
+ * done in case of exception.
+ */
+ cpipe_push_input(&con->iproto_thread->tx_pipe, &msg->base);
+ n_requests++;
+ }
+
/* Request is parsed */
assert(reqend > reqstart);
assert(con->parse_size >= (size_t) (reqend - reqstart));
@@ -1145,6 +1299,13 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
diag_set(OutOfMemory, sizeof(*con), "mempool_alloc", "con");
return NULL;
}
+ con->streams = mh_i64ptr_new();
+ if (con->streams == NULL) {
+ diag_set(OutOfMemory, sizeof(*(con->streams)),
+ "mh_streams_new", "streams");
+ mempool_free(&con->iproto_thread->iproto_connection_pool, con);
+ return NULL;
+ }
con->iproto_thread = iproto_thread;
con->input.data = con->output.data = con;
con->loop = loop();
@@ -1193,6 +1354,9 @@ iproto_connection_delete(struct iproto_connection *con)
con->obuf[0].iov[0].iov_base == NULL);
assert(con->obuf[1].pos == 0 &&
con->obuf[1].iov[0].iov_base == NULL);
+
+ assert(mh_size(con->streams) == 0);
+ mh_i64ptr_delete(con->streams);
mempool_free(&con->iproto_thread->iproto_connection_pool, con);
}
@@ -1240,7 +1404,9 @@ static void
iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
bool *stop_input)
{
+ uint64_t stream_id;
uint8_t type;
+ bool request_is_not_for_stream;
struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1248,6 +1414,16 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
assert(*pos == reqend);
type = msg->header.type;
+ stream_id = msg->header.stream_id;
+ request_is_not_for_stream =
+ ((type > IPROTO_TYPE_STAT_MAX &&
+ type != IPROTO_PING) || type == IPROTO_AUTH);
+
+ if (stream_id != 0 && request_is_not_for_stream) {
+ diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
+ iproto_type_name(type));
+ goto error;
+ }
/*
* Parse request before putting it into the queue
@@ -1873,12 +2049,52 @@ tx_process_replication(struct cmsg *m)
}
}
+static void
+iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
+{
+ struct iproto_connection *con = msg->connection;
+ struct iproto_stream *stream = msg->stream;
+
+ assert(stream != NULL);
+ struct iproto_msg *tmp =
+ stailq_shift_entry(&stream->pending_requests,
+ struct iproto_msg, in_stream);
+ assert(tmp == msg);
+ (void)tmp;
+ errinj_stream_msg_count_add(-1);
+
+ if (stailq_empty(&stream->pending_requests)) {
+ struct mh_i64ptr_node_t node = { stream->id, NULL };
+ mh_i64ptr_remove(con->streams, &node, 0);
+ iproto_stream_delete(stream);
+ } else {
+ /*
+ * If there are new messages for this stream
+ * then schedule their processing.
+ */
+ struct iproto_msg *next =
+ stailq_first_entry(&stream->pending_requests,
+ struct iproto_msg,
+ in_stream);
+ assert(next != NULL);
+ next->wpos = con->wpos;
+ cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
+ cpipe_flush_input(&con->iproto_thread->tx_pipe);
+ }
+}
+
static void
net_send_msg(struct cmsg *m)
{
struct iproto_msg *msg = (struct iproto_msg *) m;
struct iproto_connection *con = msg->connection;
+ struct iproto_stream *stream = msg->stream;
+
+ if (stream == NULL)
+ goto send_msg;
+ iproto_msg_finish_processing_in_stream(msg);
+send_msg:
if (msg->len != 0) {
/* Discard request (see iproto_enqueue_batch()). */
msg->p_ibuf->rpos += msg->len;
@@ -2066,6 +2282,8 @@ net_cord_f(va_list ap)
sizeof(struct iproto_msg));
mempool_create(&iproto_thread->iproto_connection_pool, &cord()->slabc,
sizeof(struct iproto_connection));
+ mempool_create(&iproto_thread->iproto_stream_pool, &cord()->slabc,
+ sizeof(struct iproto_stream));
evio_service_init(loop(), &iproto_thread->binary, "binary",
iproto_on_accept, iproto_thread);
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index 51611f654..75caaed06 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -156,6 +156,8 @@ struct errinj {
_(ERRINJ_APPLIER_READ_TX_ROW_DELAY, ERRINJ_BOOL, {.bparam = false})\
_(ERRINJ_NETBOX_IO_DELAY, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_REPLICASET_VCLOCK, ERRINJ_BOOL, {.bparam = false}) \
+ _(ERRINJ_IPROTO_STREAM_COUNT, ERRINJ_INT, {.iparam = 0}) \
+ _(ERRINJ_IPROTO_STREAM_MSG_COUNT, ERRINJ_INT, {.iparam = 0}) \
ENUM0(errinj_id, ERRINJ_LIST);
extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index b7e5ec667..129b6e879 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,6 +59,8 @@ evals
- ERRINJ_INDEX_ALLOC: false
- ERRINJ_INDEX_RESERVE: false
- ERRINJ_IPROTO_SINGLE_THREAD_STAT: -1
+ - ERRINJ_IPROTO_STREAM_COUNT: 0
+ - ERRINJ_IPROTO_STREAM_MSG_COUNT: 0
- ERRINJ_IPROTO_TX_DELAY: false
- ERRINJ_IPROTO_WRITE_ERROR_DELAY: false
- ERRINJ_LOG_ROTATE: false
diff --git a/test/box/error.result b/test/box/error.result
index b7ac7a138..f80fdfed5 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -447,6 +447,7 @@ t;
| 226: box.error.NOT_LEADER
| 227: box.error.SYNC_QUEUE_UNCLAIMED
| 228: box.error.SYNC_QUEUE_FOREIGN
+ | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM
| ...
test_run:cmd("setopt delimiter ''");
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
@ 2021-08-11 11:30 ` Vladimir Davydov via Tarantool-patches
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-11 11:30 UTC (permalink / raw)
To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988
On Wed, Aug 11, 2021 at 11:56:55AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index dcf60e1be..3b792130b 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -1873,12 +2049,52 @@ tx_process_replication(struct cmsg *m)
> }
> }
>
> +static void
> +iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
> +{
> + struct iproto_connection *con = msg->connection;
> + struct iproto_stream *stream = msg->stream;
> +
> + assert(stream != NULL);
> + struct iproto_msg *tmp =
> + stailq_shift_entry(&stream->pending_requests,
> + struct iproto_msg, in_stream);
> + assert(tmp == msg);
> + (void)tmp;
> + errinj_stream_msg_count_add(-1);
> +
> + if (stailq_empty(&stream->pending_requests)) {
> + struct mh_i64ptr_node_t node = { stream->id, NULL };
> + mh_i64ptr_remove(con->streams, &node, 0);
> + iproto_stream_delete(stream);
> + } else {
> + /*
> + * If there are new messages for this stream
> + * then schedule their processing.
> + */
> + struct iproto_msg *next =
> + stailq_first_entry(&stream->pending_requests,
> + struct iproto_msg,
> + in_stream);
> + assert(next != NULL);
> + next->wpos = con->wpos;
> + cpipe_push_input(&con->iproto_thread->tx_pipe, &next->base);
> + cpipe_flush_input(&con->iproto_thread->tx_pipe);
> + }
> +}
> +
> static void
> net_send_msg(struct cmsg *m)
> {
> struct iproto_msg *msg = (struct iproto_msg *) m;
> struct iproto_connection *con = msg->connection;
> + struct iproto_stream *stream = msg->stream;
> +
> + if (stream == NULL)
> + goto send_msg;
Please fold this check into iproto_msg_finish_processing_in_stream
and call the latter unconditionally, like you do in case of
iproto_msg_start_processing_in_stream.
After this LGTM.
>
> + iproto_msg_finish_processing_in_stream(msg);
> +send_msg:
> if (msg->len != 0) {
> /* Discard request (see iproto_enqueue_batch()). */
> msg->p_ibuf->rpos += msg->len;
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
` (4 preceding siblings ...)
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 5/8] iproto: implement streams in iproto mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches
2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
7 siblings, 2 replies; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
Add stream support to `net.box`. In "net.box", stream
is an object over connection that has the same methods,
but all requests from it sends with non-zero stream ID.
Since there can be a lot of streams, we do not copy the
spaces from the connection to the stream immediately when
creating a stream, but do it only when we first access space.
Also, when updating the schema, we update the spaces in lazy
mode: each stream has it's own schema_version, when there is
some access to stream space we compare stream schema_version
and connection schema_version and if they are different update
clear stream space cache and wrap space that is being accessed
to stream cache.
Part of #5860
@TarantoolBot document
Title: stream support was added to net.box
In "net.box", stream is an object over connection that
has the same methods, but all requests from it sends
with non-zero stream ID. Stream ID is generated on the
client automatically. Simple example of stream creation
using net.box:
```lua
stream = conn:new_stream()
-- all connection methods are valid, but send requests
-- with non zero stream_id.
```
---
src/box/lua/net_box.c | 95 ++--
src/box/lua/net_box.lua | 205 ++++++--
test/box/access.result | 6 +-
test/box/access.test.lua | 6 +-
...net.box_console_connections_gh-2677.result | 2 +-
...t.box_console_connections_gh-2677.test.lua | 2 +-
.../net.box_incorrect_iterator_gh-841.result | 4 +-
...net.box_incorrect_iterator_gh-841.test.lua | 4 +-
test/box/net.box_iproto_hangs_gh-3464.result | 2 +-
.../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +-
.../net.box_long-poll_input_gh-3400.result | 8 +-
.../net.box_long-poll_input_gh-3400.test.lua | 8 +-
test/box/stream.lua | 13 +
test/box/stream.result | 485 ++++++++++++++++++
test/box/stream.test.lua | 182 +++++++
test/box/suite.ini | 2 +-
16 files changed, 934 insertions(+), 92 deletions(-)
create mode 100644 test/box/stream.lua
create mode 100644 test/box/stream.result
create mode 100644 test/box/stream.test.lua
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 06e574cdf..3bc49af23 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -328,7 +328,7 @@ netbox_registry_reset(struct netbox_registry *registry, struct error *error)
static inline size_t
netbox_begin_encode(struct mpstream *stream, uint64_t sync,
- enum iproto_type type)
+ enum iproto_type type, uint64_t stream_id)
{
/* Remember initial size of ibuf (see netbox_end_encode()) */
struct ibuf *ibuf = stream->ctx;
@@ -340,7 +340,7 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
mpstream_advance(stream, fixheader_size);
/* encode header */
- mpstream_encode_map(stream, 2);
+ mpstream_encode_map(stream, stream_id != 0 ? 3 : 2);
mpstream_encode_uint(stream, IPROTO_SYNC);
mpstream_encode_uint(stream, sync);
@@ -348,6 +348,10 @@ netbox_begin_encode(struct mpstream *stream, uint64_t sync,
mpstream_encode_uint(stream, IPROTO_REQUEST_TYPE);
mpstream_encode_uint(stream, type);
+ if (stream_id != 0) {
+ mpstream_encode_uint(stream, IPROTO_STREAM_ID);
+ mpstream_encode_uint(stream, stream_id);
+ }
/* Caller should remember how many bytes was used in ibuf */
return used;
}
@@ -380,11 +384,11 @@ netbox_end_encode(struct mpstream *stream, size_t initial_size)
static void
netbox_encode_ping(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
(void)L;
(void)idx;
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_PING, stream_id);
netbox_end_encode(stream, svp);
}
@@ -402,7 +406,7 @@ netbox_encode_auth(struct ibuf *ibuf, uint64_t sync,
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
mpstream_error_handler, &is_error);
- size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH);
+ size_t svp = netbox_begin_encode(&stream, sync, IPROTO_AUTH, 0);
/* Adapted from xrow_encode_auth() */
mpstream_encode_map(&stream, password != NULL ? 2 : 1);
@@ -432,7 +436,7 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
mpstream_error_handler, &is_error);
- size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT);
+ size_t svp = netbox_begin_encode(&stream, sync, IPROTO_SELECT, 0);
mpstream_encode_map(&stream, 3);
mpstream_encode_uint(&stream, IPROTO_SPACE_ID);
mpstream_encode_uint(&stream, space_id);
@@ -446,10 +450,10 @@ netbox_encode_select_all(struct ibuf *ibuf, uint64_t sync, uint32_t space_id)
static void
netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync, enum iproto_type type)
+ uint64_t sync, enum iproto_type type, uint64_t stream_id)
{
/* Lua stack at idx: function_name, args */
- size_t svp = netbox_begin_encode(stream, sync, type);
+ size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
mpstream_encode_map(stream, 2);
@@ -468,24 +472,25 @@ netbox_encode_call_impl(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_call_16(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL_16);
+ netbox_encode_call_impl(L, idx, stream, sync,
+ IPROTO_CALL_16, stream_id);
}
static void
netbox_encode_call(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL);
+ netbox_encode_call_impl(L, idx, stream, sync, IPROTO_CALL, stream_id);
}
static void
netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: expr, args */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_EVAL, stream_id);
mpstream_encode_map(stream, 2);
@@ -504,10 +509,11 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, iterator, offset, limit, key */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_SELECT,
+ stream_id);
mpstream_encode_map(stream, 6);
@@ -546,10 +552,11 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync, enum iproto_type type)
+ uint64_t sync, enum iproto_type type,
+ uint64_t stream_id)
{
/* Lua stack at idx: space_id, tuple */
- size_t svp = netbox_begin_encode(stream, sync, type);
+ size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
mpstream_encode_map(stream, 2);
@@ -567,24 +574,27 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_insert(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_INSERT);
+ netbox_encode_insert_or_replace(L, idx, stream, sync,
+ IPROTO_INSERT, stream_id);
}
static void
netbox_encode_replace(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
- netbox_encode_insert_or_replace(L, idx, stream, sync, IPROTO_REPLACE);
+ netbox_encode_insert_or_replace(L, idx, stream, sync,
+ IPROTO_REPLACE, stream_id);
}
static void
netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, key */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_DELETE,
+ stream_id);
mpstream_encode_map(stream, 3);
@@ -607,10 +617,11 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, index_id, key, ops */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPDATE,
+ stream_id);
mpstream_encode_map(stream, 5);
@@ -641,10 +652,11 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: space_id, tuple, ops */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_UPSERT,
+ stream_id);
mpstream_encode_map(stream, 4);
@@ -844,10 +856,11 @@ netbox_send_and_recv_console(int fd, struct ibuf *send_buf,
static void
netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query, parameters, options */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_EXECUTE,
+ stream_id);
mpstream_encode_map(stream, 3);
@@ -873,10 +886,11 @@ netbox_encode_execute(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query */
- size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE);
+ size_t svp = netbox_begin_encode(stream, sync, IPROTO_PREPARE,
+ stream_id);
mpstream_encode_map(stream, 1);
@@ -896,18 +910,19 @@ netbox_encode_prepare(lua_State *L, int idx, struct mpstream *stream,
static void
netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: query, parameters, options */
- netbox_encode_prepare(L, idx, stream, sync);
+ netbox_encode_prepare(L, idx, stream, sync, stream_id);
}
static void
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
- uint64_t sync)
+ uint64_t sync, uint64_t stream_id)
{
/* Lua stack at idx: bytes */
(void)sync;
+ (void)stream_id;
size_t len;
const char *data = lua_tolstring(L, idx, &len);
mpstream_memcpy(stream, data, len);
@@ -921,11 +936,11 @@ netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
*/
static int
netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
- struct ibuf *ibuf, uint64_t sync)
+ struct ibuf *ibuf, uint64_t sync, uint64_t stream_id)
{
typedef void (*method_encoder_f)(struct lua_State *L, int idx,
struct mpstream *stream,
- uint64_t sync);
+ uint64_t sync, uint64_t stream_id);
static method_encoder_f method_encoder[] = {
[NETBOX_PING] = netbox_encode_ping,
[NETBOX_CALL_16] = netbox_encode_call_16,
@@ -949,7 +964,7 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
luamp_error, L);
- method_encoder[method](L, idx, &stream, sync);
+ method_encoder[method](L, idx, &stream, sync, stream_id);
return 0;
}
@@ -1569,6 +1584,7 @@ netbox_new_registry(struct lua_State *L)
* - on_push: on_push trigger function
* - on_push_ctx: on_push trigger function argument
* - format: tuple format to use for decoding the body or nil
+ * - stream_id: determines whether or not the request belongs to stream
* - ...: method-specific arguments passed to the encoder
*/
static void
@@ -1581,7 +1597,8 @@ netbox_make_request(struct lua_State *L, int idx,
enum netbox_method method = lua_tointeger(L, idx + 4);
assert(method < netbox_method_MAX);
uint64_t sync = registry->next_sync++;
- netbox_encode_method(L, idx + 8, method, send_buf, sync);
+ uint64_t stream_id = luaL_touint64(L, idx + 8);
+ netbox_encode_method(L, idx + 9, method, send_buf, sync, stream_id);
/* Initialize and register the request object. */
request->method = method;
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 8f5671c15..3dffc245f 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -275,14 +275,14 @@ local function create_transport(host, port, user, password, callback,
-- @retval not nil Future object.
--
local function perform_async_request(buffer, skip_header, method, on_push,
- on_push_ctx, format, ...)
+ on_push_ctx, format, stream_id, ...)
local err = prepare_perform_request()
if err then
return nil, err
end
return perform_async_request_impl(requests, send_buf, buffer,
skip_header, method, on_push,
- on_push_ctx, format, ...)
+ on_push_ctx, format, stream_id, ...)
end
--
@@ -291,14 +291,15 @@ local function create_transport(host, port, user, password, callback,
-- @retval not nil Response object.
--
local function perform_request(timeout, buffer, skip_header, method,
- on_push, on_push_ctx, format, ...)
+ on_push, on_push_ctx, format,
+ stream_id, ...)
local err = prepare_perform_request()
if err then
return nil, err
end
return perform_request_impl(timeout, requests, send_buf, buffer,
skip_header, method, on_push, on_push_ctx,
- format, ...)
+ format, stream_id, ...)
end
-- PROTOCOL STATE MACHINE (WORKER FIBER) --
@@ -487,6 +488,37 @@ local function remote_serialize(self)
}
end
+local function stream_serialize(self)
+ return {
+ host = self._conn.host,
+ port = self._conn.port,
+ opts = next(self._conn.opts) and self._conn.opts,
+ state = self._conn.state,
+ error = self._conn.error,
+ protocol = self._conn.protocol,
+ schema_version = self._conn.schema_version,
+ peer_uuid = self._conn.peer_uuid,
+ peer_version_id = self._conn.peer_version_id,
+ stream_id = self._stream_id
+ }
+end
+
+local function stream_spaces_serialize(self)
+ return self._stream._conn.space
+end
+
+local function stream_space_serialize(self)
+ return self._src
+end
+
+local function stream_indexes_serialize(self)
+ return self._space._src.index
+end
+
+local function stream_index_serialize(self)
+ return self._src
+end
+
local remote_methods = {}
local remote_mt = {
__index = remote_methods, __serialize = remote_serialize,
@@ -499,6 +531,86 @@ local console_mt = {
__metatable = false
}
+-- Create stream space index, which is same as connection space
+-- index, but have non zero stream ID.
+local function stream_wrap_index(stream_id, src)
+ return setmetatable({
+ _stream_id = stream_id,
+ _src = src,
+ }, {
+ __index = src,
+ __serialize = stream_index_serialize
+ })
+end
+
+-- Metatable for stream space indexes. When stream space being
+-- created there are no indexes in it. When accessing the space
+-- index, we look for corresponding space index in corresponding
+-- connection space. If it is found we create same index for the
+-- stream space but with corresponding stream ID. We do not need
+-- to compare stream _schema_version and connection schema_version,
+-- because all access to index is carried out through it's space.
+-- So we update schema_version when we access space.
+local stream_indexes_mt = {
+ __index = function(self, key)
+ local _space = self._space
+ local src = _space._src.index[key]
+ if not src then
+ return nil
+ end
+ local res = stream_wrap_index(_space._stream_id, src)
+ self[key] = res
+ return res
+ end,
+ __serialize = stream_indexes_serialize
+}
+
+-- Create stream space, which is same as connection space,
+-- but have non zero stream ID.
+local function stream_wrap_space(stream, src)
+ local res = setmetatable({
+ _stream_id = stream._stream_id,
+ _src = src,
+ index = setmetatable({
+ _space = nil,
+ }, stream_indexes_mt)
+ }, {
+ __index = src,
+ __serialize = stream_space_serialize
+ })
+ res.index._space = res
+ return res
+end
+
+-- Metatable for stream spaces. When stream being created there
+-- are no spaces in it. When user try to access some space in
+-- stream, we first of all compare _schema_version of stream with
+-- schema_version from connection and if they are not equal, we
+-- clear stream space cache and update it's schema_version. Then
+-- we look for corresponding space in the connection. If it is
+-- found we create same space for the stream but with corresponding
+-- stream ID.
+local stream_spaces_mt = {
+ __index = function(self, key)
+ local stream = self._stream
+ if stream._schema_version ~= stream._conn.schema_version then
+ stream._schema_version = stream._conn.schema_version
+ self._stream_space_cache = {}
+ end
+ if self._stream_space_cache[key] then
+ return self._stream_space_cache[key]
+ end
+ local src = stream._conn.space[key]
+ if not src then
+ return nil
+ end
+ local res = stream_wrap_space(stream, src)
+ self._stream_space_cache[key] = res
+ return res
+ end,
+ __serialize = stream_spaces_serialize
+}
+
local space_metatable, index_metatable
local function new_sm(host, port, opts, connection, greeting)
@@ -578,6 +690,8 @@ local function new_sm(host, port, opts, connection, greeting)
if opts.wait_connected ~= false then
remote._transport.wait_state('active', tonumber(opts.wait_connected))
end
+ -- Last stream ID used for this connection
+ remote._last_stream_id = 0
return remote
end
@@ -635,6 +749,28 @@ local function check_eval_args(args)
end
end
+local function new_stream(stream)
+ check_remote_arg(stream, 'new_stream')
+ return stream._conn:new_stream()
+end
+
+function remote_methods:new_stream()
+ check_remote_arg(self, 'new_stream')
+ self._last_stream_id = self._last_stream_id + 1
+ local stream = setmetatable({
+ new_stream = new_stream,
+ _stream_id = self._last_stream_id,
+ space = setmetatable({
+ _stream_space_cache = {},
+ _stream = nil,
+ }, stream_spaces_mt),
+ _conn = self,
+ _schema_version = self.schema_version,
+ }, { __index = self, __serialize = stream_serialize })
+ stream.space._stream = stream
+ return stream
+end
+
function remote_methods:close()
check_remote_arg(self, 'close')
self._transport.stop()
@@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
return self._transport.wait_state('active', timeout)
end
-function remote_methods:_request(method, opts, format, ...)
+function remote_methods:_request(method, opts, format, stream_id, ...)
local transport = self._transport
local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
@@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
local res, err =
transport.perform_async_request(buffer, skip_header, method,
table.insert, {}, format,
- ...)
+ stream_id, ...)
if err then
box.error(err)
end
@@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
end
local res, err = transport.perform_request(timeout, buffer, skip_header,
method, on_push, on_push_ctx,
- format, ...)
+ format, stream_id, ...)
if err then
box.error(err)
end
@@ -718,7 +854,7 @@ end
function remote_methods:ping(opts)
check_remote_arg(self, 'ping')
- return (pcall(self._request, self, M_PING, opts))
+ return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
end
function remote_methods:reload_schema()
@@ -729,14 +865,16 @@ end
-- @deprecated since 1.7.4
function remote_methods:call_16(func_name, ...)
check_remote_arg(self, 'call')
- return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
+ return (self:_request(M_CALL_16, nil, nil, self._stream_id,
+ tostring(func_name), {...}))
end
function remote_methods:call(func_name, args, opts)
check_remote_arg(self, 'call')
check_call_args(args)
args = args or {}
- local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
+ local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
+ tostring(func_name), args)
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
@@ -746,14 +884,15 @@ end
-- @deprecated since 1.7.4
function remote_methods:eval_16(code, ...)
check_remote_arg(self, 'eval')
- return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
+ return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
+ code, {...})))
end
function remote_methods:eval(code, args, opts)
check_remote_arg(self, 'eval')
check_eval_args(args)
args = args or {}
- local res = self:_request(M_EVAL, opts, nil, code, args)
+ local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
@@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "execute", "options")
end
- return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
- sql_opts or {})
+ return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
+ query, parameters or {}, sql_opts or {})
end
function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
@@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "prepare", "options")
end
- return self:_request(M_PREPARE, netbox_opts, nil, query)
+ return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
end
function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
@@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
if sql_opts ~= nil then
box.error(box.error.UNSUPPORTED, "unprepare", "options")
end
- return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
- sql_opts or {})
+ return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
+ query, parameters or {}, sql_opts or {})
end
function remote_methods:wait_state(state, timeout)
@@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
+ res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
{line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
+ res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
line..'$EOF$\n')
end
if err then
@@ -951,14 +1090,14 @@ space_metatable = function(remote)
function methods:insert(tuple, opts)
check_space_arg(self, 'insert')
- return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
- tuple)
+ return remote:_request(M_INSERT, opts, self._format_cdata,
+ self._stream_id, self.id, tuple)
end
function methods:replace(tuple, opts)
check_space_arg(self, 'replace')
- return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
- tuple)
+ return remote:_request(M_REPLACE, opts, self._format_cdata,
+ self._stream_id, self.id, tuple)
end
function methods:select(key, opts)
@@ -978,7 +1117,8 @@ space_metatable = function(remote)
function methods:upsert(key, oplist, opts)
check_space_arg(self, 'upsert')
- return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
+ return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
+ self._stream_id, self.id,
key, oplist))
end
@@ -1009,8 +1149,8 @@ index_metatable = function(remote)
local offset = tonumber(opts and opts.offset) or 0
local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
return (remote:_request(M_SELECT, opts, self.space._format_cdata,
- self.space.id, self.id, iterator, offset,
- limit, key))
+ self._stream_id, self.space.id, self.id,
+ iterator, offset, limit, key))
end
function methods:get(key, opts)
@@ -1020,6 +1160,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_GET, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.EQ, 0, 2, key))
end
@@ -1031,6 +1172,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_MIN, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.GE, 0, 1, key))
end
@@ -1042,6 +1184,7 @@ index_metatable = function(remote)
end
return nothing_or_data(remote:_request(M_MAX, opts,
self.space._format_cdata,
+ self._stream_id,
self.space.id, self.id,
box.index.LE, 0, 1, key))
end
@@ -1053,22 +1196,24 @@ index_metatable = function(remote)
end
local code = string.format('box.space.%s.index.%s:count',
self.space.name, self.name)
- return remote:_request(M_COUNT, opts, nil, code, { key, opts })
+ return remote:_request(M_COUNT, opts, nil, self._stream_id,
+ code, { key, opts })
end
function methods:delete(key, opts)
check_index_arg(self, 'delete')
return nothing_or_data(remote:_request(M_DELETE, opts,
self.space._format_cdata,
- self.space.id, self.id, key))
+ self._stream_id, self.space.id,
+ self.id, key))
end
function methods:update(key, oplist, opts)
check_index_arg(self, 'update')
return nothing_or_data(remote:_request(M_UPDATE, opts,
self.space._format_cdata,
- self.space.id, self.id, key,
- oplist))
+ self._stream_id, self.space.id,
+ self.id, key, oplist))
end
return { __index = methods, __metatable = false }
diff --git a/test/box/access.result b/test/box/access.result
index 712cd68f8..6434da907 100644
--- a/test/box/access.result
+++ b/test/box/access.result
@@ -908,15 +908,15 @@ LISTEN = require('uri').parse(box.cfg.listen)
c = net.connect(LISTEN.host, LISTEN.service)
---
...
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '1' does not exist
...
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '65537' does not exist
...
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
---
- error: Space '4294967295' does not exist
...
diff --git a/test/box/access.test.lua b/test/box/access.test.lua
index 6060475d1..6abdb780d 100644
--- a/test/box/access.test.lua
+++ b/test/box/access.test.lua
@@ -351,9 +351,9 @@ box.schema.func.drop(name)
-- very large space id, no crash occurs.
LISTEN = require('uri').parse(box.cfg.listen)
c = net.connect(LISTEN.host, LISTEN.service)
-c:_request(net._method.select, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
-c:_request(net._method.select, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 1, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 65537, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
+c:_request(net._method.select, nil, nil, nil, 4294967295, box.index.EQ, 0, 0, 0xFFFFFFFF, {})
c:close()
session = box.session
diff --git a/test/box/net.box_console_connections_gh-2677.result b/test/box/net.box_console_connections_gh-2677.result
index f45aa0b56..7cea0a1da 100644
--- a/test/box/net.box_console_connections_gh-2677.result
+++ b/test/box/net.box_console_connections_gh-2677.result
@@ -74,7 +74,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
diff --git a/test/box/net.box_console_connections_gh-2677.test.lua b/test/box/net.box_console_connections_gh-2677.test.lua
index 40d099e70..6c4e6ea4f 100644
--- a/test/box/net.box_console_connections_gh-2677.test.lua
+++ b/test/box/net.box_console_connections_gh-2677.test.lua
@@ -30,7 +30,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
diff --git a/test/box/net.box_incorrect_iterator_gh-841.result b/test/box/net.box_incorrect_iterator_gh-841.result
index fbd2a7700..cd2a86787 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.result
+++ b/test/box/net.box_incorrect_iterator_gh-841.result
@@ -16,13 +16,13 @@ test_run:cmd("setopt delimiter ';'")
- true
...
function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
- local ret = cn:_request(remote._method.select, opts, nil, space_id,
+ local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
index_id, iterator, offset, limit, key)
return ret
end
function x_fatal(cn)
cn._transport.perform_request(nil, nil, false, remote._method.inject,
- nil, nil, nil, '\x80')
+ nil, nil, nil, nil, '\x80')
end
test_run:cmd("setopt delimiter ''");
---
diff --git a/test/box/net.box_incorrect_iterator_gh-841.test.lua b/test/box/net.box_incorrect_iterator_gh-841.test.lua
index 1d24f9f56..9c42175ef 100644
--- a/test/box/net.box_incorrect_iterator_gh-841.test.lua
+++ b/test/box/net.box_incorrect_iterator_gh-841.test.lua
@@ -5,13 +5,13 @@ test_run:cmd("push filter ".."'\\.lua.*:[0-9]+: ' to '.lua...\"]:<line>: '")
test_run:cmd("setopt delimiter ';'")
function x_select(cn, space_id, index_id, iterator, offset, limit, key, opts)
- local ret = cn:_request(remote._method.select, opts, nil, space_id,
+ local ret = cn:_request(remote._method.select, opts, nil, nil, space_id,
index_id, iterator, offset, limit, key)
return ret
end
function x_fatal(cn)
cn._transport.perform_request(nil, nil, false, remote._method.inject,
- nil, nil, nil, '\x80')
+ nil, nil, nil, nil, '\x80')
end
test_run:cmd("setopt delimiter ''");
diff --git a/test/box/net.box_iproto_hangs_gh-3464.result b/test/box/net.box_iproto_hangs_gh-3464.result
index 3b5458c9a..cbf8181b3 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.result
+++ b/test/box/net.box_iproto_hangs_gh-3464.result
@@ -17,7 +17,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box_iproto_hangs_gh-3464.test.lua b/test/box/net.box_iproto_hangs_gh-3464.test.lua
index a7c41ae76..51a9ddece 100644
--- a/test/box/net.box_iproto_hangs_gh-3464.test.lua
+++ b/test/box/net.box_iproto_hangs_gh-3464.test.lua
@@ -8,6 +8,6 @@ net = require('net.box')
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, data)
+c._transport.perform_request(nil, nil, false, net._method.inject, nil, nil, nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil
diff --git a/test/box/net.box_long-poll_input_gh-3400.result b/test/box/net.box_long-poll_input_gh-3400.result
index a16110ee6..a98eea655 100644
--- a/test/box/net.box_long-poll_input_gh-3400.result
+++ b/test/box/net.box_long-poll_input_gh-3400.result
@@ -24,10 +24,10 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, false, \
- net._method.call_17, nil, nil, nil, 'long', {}) \
-c._transport.perform_request(nil, nil, false, net._method.inject, \
- nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, \
+ net._method.call_17, nil, nil, nil, nil, 'long', {}) \
+c._transport.perform_request(nil, nil, false, net._method.inject, \
+ nil, nil, nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
diff --git a/test/box/net.box_long-poll_input_gh-3400.test.lua b/test/box/net.box_long-poll_input_gh-3400.test.lua
index 891b59224..a6f302ee0 100644
--- a/test/box/net.box_long-poll_input_gh-3400.test.lua
+++ b/test/box/net.box_long-poll_input_gh-3400.test.lua
@@ -14,9 +14,9 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, false, \
- net._method.call_17, nil, nil, nil, 'long', {}) \
-c._transport.perform_request(nil, nil, false, net._method.inject, \
- nil, nil, nil, '\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, \
+ net._method.call_17, nil, nil, nil, nil, 'long', {}) \
+c._transport.perform_request(nil, nil, false, net._method.inject, \
+ nil, nil, nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
diff --git a/test/box/stream.lua b/test/box/stream.lua
new file mode 100644
index 000000000..db6a29a8a
--- /dev/null
+++ b/test/box/stream.lua
@@ -0,0 +1,13 @@
+#!/usr/bin/env tarantool
+
+require('console').listen(os.getenv('ADMIN'))
+
+local memtx_use_mvcc_engine = (arg[2] and arg[2] == 'true' and true or false)
+
+box.cfg({
+ listen = os.getenv('LISTEN'),
+ iproto_threads = tonumber(arg[1]),
+ memtx_use_mvcc_engine = memtx_use_mvcc_engine
+})
+
+box.schema.user.grant('guest', 'read,write,execute,create,drop', 'universe', nil, {if_not_exists = true})
diff --git a/test/box/stream.result b/test/box/stream.result
new file mode 100644
index 000000000..03200ecf6
--- /dev/null
+++ b/test/box/stream.result
@@ -0,0 +1,485 @@
+-- test-run result file version 2
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+ | ---
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+test_run = require('test_run').new()
+ | ---
+ | ...
+
+test_run:cmd("create server test with script='box/stream.lua'")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function get_current_connection_count()
+ local total_net_stat_table =
+ test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+ assert(total_net_stat_table)
+ local connection_stat_table = total_net_stat_table.CONNECTIONS
+ assert(connection_stat_table)
+ return connection_stat_table.current
+end;
+ | ---
+ | ...
+function wait_and_return_results(futures)
+ local results = {}
+ for name, future in pairs(futures) do
+ local err
+ results[name], err = future:wait_result()
+ if err then
+ results[name] = err
+ end
+ end
+ return results
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+ | ---
+ | - true
+ | ...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+ | ---
+ | ...
+conn_1 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_1 = conn_1:new_stream()
+ | ---
+ | ...
+conn_2 = net_box.connect(server_addr)
+ | ---
+ | ...
+stream_2 = conn_2:new_stream()
+ | ---
+ | ...
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+ | ---
+ | ...
+assert(not stream_1:ping())
+ | ---
+ | - true
+ | ...
+stream_2:close()
+ | ---
+ | ...
+assert(not conn_2:ping())
+ | ---
+ | - true
+ | ...
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(conn.schema_version ~= stream._schema_version)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+ | ---
+ | - true
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+collectgarbage()
+ | ---
+ | - 0
+ | ...
+assert(conn.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+assert(stream.space.test ~= nil)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+s:drop()
+ | ---
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+conn:reload_schema()
+ | ---
+ | ...
+assert(not conn.space.test)
+ | ---
+ | - true
+ | ...
+assert(not stream.space.test)
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+ | ---
+ | - true
+ | ...
+test_run:switch('test')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+s = box.schema.space.create('test', { engine = 'memtx' })
+ | ---
+ | ...
+_ = s:create_index('primary')
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function replace_with_yeild(item)
+ fiber.sleep(0.1)
+ return s:replace({item})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+assert(conn:ping())
+ | ---
+ | - true
+ | ...
+conn_space = conn.space.test
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+stream_space = stream.space.test
+ | ---
+ | ...
+
+-- Check that all requests in stream processed consistently
+futures = {}
+ | ---
+ | ...
+replace_count = 3
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+for i = 1, replace_count do
+ futures[string.format("replace_%d", i)] =
+ stream_space:replace({i}, {is_async = true})
+ futures[string.format("select_%d", i)] =
+ stream_space:select({}, {is_async = true})
+end;
+ | ---
+ | ...
+futures["replace_with_yeild_for_stream"] =
+ stream:call("replace_with_yeild",
+ { replace_count + 1 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_stream"] =
+ stream_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1]
+assert(results["select_1"])
+ | ---
+ | - - [1]
+ | ...
+-- [1] [2]
+assert(results["select_2"])
+ | ---
+ | - - [1]
+ | - [2]
+ | ...
+-- [1] [2] [3]
+assert(results["select_3"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | ...
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | ...
+
+-- There is no request execution order for the connection
+futures = {}
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+futures["replace_with_yeild_for_connection"] =
+ conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+ | ---
+ | ...
+futures["select_with_yeild_for_connection"] =
+ conn_space:select({}, {is_async = true});
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+results = wait_and_return_results(futures)
+ | ---
+ | ...
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- [1] [2] [3] [4] [5]
+s:select()
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch('default')
+ | ---
+ | - true
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+ | ---
+ | ...
+stream = conn:new_stream()
+ | ---
+ | ...
+space = stream.space.test
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+replace_count = 20
+for i = 1, replace_count do
+ space:replace({i}, {is_async = true})
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+-- Give time to send
+fiber.sleep(0)
+ | ---
+ | ...
+conn:close()
+ | ---
+ | ...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+ | ---
+ | - true
+ | ...
+test_run:switch("test")
+ | ---
+ | - true
+ | ...
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+ | ---
+ | - - [1]
+ | - [2]
+ | - [3]
+ | - [4]
+ | - [5]
+ | - [6]
+ | - [7]
+ | - [8]
+ | - [9]
+ | - [10]
+ | - [11]
+ | - [12]
+ | - [13]
+ | - [14]
+ | - [15]
+ | - [16]
+ | - [17]
+ | - [18]
+ | - [19]
+ | - [20]
+ | ...
+s:drop()
+ | ---
+ | ...
+errinj = box.error.injection
+ | ---
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+ | ---
+ | - true
+ | ...
+test_run:switch("default")
+ | ---
+ | - true
+ | ...
+test_run:cmd("stop server test")
+ | ---
+ | - true
+ | ...
+
+test_run:cmd("cleanup server test")
+ | ---
+ | - true
+ | ...
+test_run:cmd("delete server test")
+ | ---
+ | - true
+ | ...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
new file mode 100644
index 000000000..72129a228
--- /dev/null
+++ b/test/box/stream.test.lua
@@ -0,0 +1,182 @@
+-- This test checks streams iplementation in iproto (gh-5860).
+net_box = require('net.box')
+fiber = require('fiber')
+test_run = require('test_run').new()
+
+test_run:cmd("create server test with script='box/stream.lua'")
+
+test_run:cmd("setopt delimiter ';'")
+function get_current_connection_count()
+ local total_net_stat_table =
+ test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
+ assert(total_net_stat_table)
+ local connection_stat_table = total_net_stat_table.CONNECTIONS
+ assert(connection_stat_table)
+ return connection_stat_table.current
+end;
+function wait_and_return_results(futures)
+ local results = {}
+ for name, future in pairs(futures) do
+ local err
+ results[name], err = future:wait_result()
+ if err then
+ results[name] = err
+ end
+ end
+ return results
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Some simple checks for new object - stream
+test_run:cmd("start server test with args='1'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn_1 = net_box.connect(server_addr)
+stream_1 = conn_1:new_stream()
+conn_2 = net_box.connect(server_addr)
+stream_2 = conn_2:new_stream()
+-- Stream is a wrapper around connection, so if you close connection
+-- you close stream, and vice versa.
+conn_1:close()
+assert(not stream_1:ping())
+stream_2:close()
+assert(not conn_2:ping())
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+-- The new method `new_stream`, for the stream object, returns a new
+-- stream object, just as in the case of connection.
+_ = stream:new_stream()
+conn:close()
+
+-- Check that spaces in stream object updates, during reload_schema
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+test_run:switch("test")
+-- Create one space on server
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch("default")
+assert(not conn.space.test)
+assert(not stream.space.test)
+assert(conn.schema_version == stream._schema_version)
+conn:reload_schema()
+assert(conn.space.test ~= nil)
+assert(conn.schema_version ~= stream._schema_version)
+assert(stream.space.test ~= nil)
+-- When we touch stream.space, we compare stream._schema_version
+-- and conn.schema_version if they are not equal, we clear stream
+-- space cache, update it's _schema_version and load space from
+-- connection to stream space cache.
+assert(conn.schema_version == stream._schema_version)
+collectgarbage()
+collectgarbage()
+assert(conn.space.test ~= nil)
+assert(stream.space.test ~= nil)
+test_run:switch("test")
+s:drop()
+test_run:switch("default")
+conn:reload_schema()
+assert(not conn.space.test)
+assert(not stream.space.test)
+test_run:cmd("stop server test")
+
+-- All test works with iproto_thread count = 10
+
+test_run:cmd("start server test with args='10'")
+test_run:switch('test')
+fiber = require('fiber')
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:cmd("setopt delimiter ';'")
+function replace_with_yeild(item)
+ fiber.sleep(0.1)
+ return s:replace({item})
+end;
+test_run:cmd("setopt delimiter ''");
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+conn_space = conn.space.test
+stream = conn:new_stream()
+stream_space = stream.space.test
+
+-- Check that all requests in stream processed consistently
+futures = {}
+replace_count = 3
+test_run:cmd("setopt delimiter ';'")
+for i = 1, replace_count do
+ futures[string.format("replace_%d", i)] =
+ stream_space:replace({i}, {is_async = true})
+ futures[string.format("select_%d", i)] =
+ stream_space:select({}, {is_async = true})
+end;
+futures["replace_with_yeild_for_stream"] =
+ stream:call("replace_with_yeild",
+ { replace_count + 1 }, {is_async = true});
+futures["select_with_yeild_for_stream"] =
+ stream_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1]
+assert(results["select_1"])
+-- [1] [2]
+assert(results["select_2"])
+-- [1] [2] [3]
+assert(results["select_3"])
+-- [1] [2] [3] [4]
+-- Even yeild in replace function does not affect
+-- the order of requests execution in stream
+assert(results["select_with_yeild_for_stream"])
+
+-- There is no request execution order for the connection
+futures = {}
+test_run:cmd("setopt delimiter ';'")
+futures["replace_with_yeild_for_connection"] =
+ conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
+futures["select_with_yeild_for_connection"] =
+ conn_space:select({}, {is_async = true});
+test_run:cmd("setopt delimiter ''");
+results = wait_and_return_results(futures)
+-- [1] [2] [3] [4]
+-- Select will be processed earlier because of
+-- yeild in `replace_with_yeild` function
+assert(results["select_with_yeild_for_connection"])
+test_run:switch("test")
+-- [1] [2] [3] [4] [5]
+s:select()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Check that all request will be processed
+-- after connection close.
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+test_run:cmd("setopt delimiter ';'")
+replace_count = 20
+for i = 1, replace_count do
+ space:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+-- Give time to send
+fiber.sleep(0)
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:switch("test")
+-- select return tuples from [1] to [20]
+-- because all messages processed after
+-- connection closed
+s:select{}
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:cmd("stop server test")
+
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
diff --git a/test/box/suite.ini b/test/box/suite.ini
index b5d869fb3..94cf7811f 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -5,7 +5,7 @@ script = box.lua
disabled = rtree_errinj.test.lua tuple_bench.test.lua
long_run = huge_field_map_long.test.lua
config = engine.cfg
-release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua
+release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua gh-5645-several-iproto-threads.test.lua net.box_discard_console_request_gh-6249.test.lua stream.test.lua
lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua lua/txn_proxy.lua
use_unix_sockets = True
use_unix_sockets_iproto = True
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
@ 2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches
2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches
1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-11 11:52 UTC (permalink / raw)
To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988
On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 8f5671c15..3dffc245f 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -635,6 +749,28 @@ local function check_eval_args(args)
> end
> end
>
> +local function new_stream(stream)
Let's rename this function to stream_new_stream to emphasize that
it's a stream method.
> + check_remote_arg(stream, 'new_stream')
> + return stream._conn:new_stream()
> +end
> +
> +function remote_methods:new_stream()
> + check_remote_arg(self, 'new_stream')
> + self._last_stream_id = self._last_stream_id + 1
> + local stream = setmetatable({
> + new_stream = new_stream,
> + _stream_id = self._last_stream_id,
> + space = setmetatable({
> + _stream_space_cache = {},
> + _stream = nil,
> + }, stream_spaces_mt),
> + _conn = self,
> + _schema_version = self.schema_version,
> + }, { __index = self, __serialize = stream_serialize })
> + stream.space._stream = stream
> + return stream
> +end
> +
> function remote_methods:close()
> check_remote_arg(self, 'close')
> self._transport.stop()
> @@ -665,7 +801,7 @@ function remote_methods:wait_connected(timeout)
> return self._transport.wait_state('active', timeout)
> end
>
> -function remote_methods:_request(method, opts, format, ...)
> +function remote_methods:_request(method, opts, format, stream_id, ...)
> local transport = self._transport
> local on_push, on_push_ctx, buffer, skip_header, deadline
> -- Extract options, set defaults, check if the request is
> @@ -680,7 +816,7 @@ function remote_methods:_request(method, opts, format, ...)
> local res, err =
> transport.perform_async_request(buffer, skip_header, method,
> table.insert, {}, format,
> - ...)
> + stream_id, ...)
> if err then
> box.error(err)
> end
> @@ -702,7 +838,7 @@ function remote_methods:_request(method, opts, format, ...)
> end
> local res, err = transport.perform_request(timeout, buffer, skip_header,
> method, on_push, on_push_ctx,
> - format, ...)
> + format, stream_id, ...)
Please use self._stream_id here. Then you won't need to pass stream_id
from all the functions below.
> if err then
> box.error(err)
> end
> @@ -718,7 +854,7 @@ end
>
> function remote_methods:ping(opts)
> check_remote_arg(self, 'ping')
> - return (pcall(self._request, self, M_PING, opts))
> + return (pcall(self._request, self, M_PING, opts, nil, self._stream_id))
> end
>
> function remote_methods:reload_schema()
> @@ -729,14 +865,16 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:call_16(func_name, ...)
> check_remote_arg(self, 'call')
> - return (self:_request(M_CALL_16, nil, nil, tostring(func_name), {...}))
> + return (self:_request(M_CALL_16, nil, nil, self._stream_id,
> + tostring(func_name), {...}))
> end
>
> function remote_methods:call(func_name, args, opts)
> check_remote_arg(self, 'call')
> check_call_args(args)
> args = args or {}
> - local res = self:_request(M_CALL_17, opts, nil, tostring(func_name), args)
> + local res = self:_request(M_CALL_17, opts, nil, self._stream_id,
> + tostring(func_name), args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -746,14 +884,15 @@ end
> -- @deprecated since 1.7.4
> function remote_methods:eval_16(code, ...)
> check_remote_arg(self, 'eval')
> - return unpack((self:_request(M_EVAL, nil, nil, code, {...})))
> + return unpack((self:_request(M_EVAL, nil, nil, self._stream_id,
> + code, {...})))
> end
>
> function remote_methods:eval(code, args, opts)
> check_remote_arg(self, 'eval')
> check_eval_args(args)
> args = args or {}
> - local res = self:_request(M_EVAL, opts, nil, code, args)
> + local res = self:_request(M_EVAL, opts, nil, self._stream_id, code, args)
> if type(res) ~= 'table' or opts and opts.is_async then
> return res
> end
> @@ -765,8 +904,8 @@ function remote_methods:execute(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "execute", "options")
> end
> - return self:_request(M_EXECUTE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_EXECUTE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- luacheck: no unused args
> @@ -777,7 +916,7 @@ function remote_methods:prepare(query, parameters, sql_opts, netbox_opts) -- lua
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "prepare", "options")
> end
> - return self:_request(M_PREPARE, netbox_opts, nil, query)
> + return self:_request(M_PREPARE, netbox_opts, nil, self._stream_id, query)
> end
>
> function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> @@ -788,8 +927,8 @@ function remote_methods:unprepare(query, parameters, sql_opts, netbox_opts)
> if sql_opts ~= nil then
> box.error(box.error.UNSUPPORTED, "unprepare", "options")
> end
> - return self:_request(M_UNPREPARE, netbox_opts, nil, query, parameters or {},
> - sql_opts or {})
> + return self:_request(M_UNPREPARE, netbox_opts, nil, self._stream_id,
> + query, parameters or {}, sql_opts or {})
> end
>
> function remote_methods:wait_state(state, timeout)
> @@ -927,11 +1066,11 @@ function console_methods:eval(line, timeout)
> end
> if self.protocol == 'Binary' then
> local loader = 'return require("console").eval(...)'
> - res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, loader,
> + res, err = pr(timeout, nil, false, M_EVAL, nil, nil, nil, nil, loader,
> {line})
> else
> assert(self.protocol == 'Lua console')
> - res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil,
> + res, err = pr(timeout, nil, false, M_INJECT, nil, nil, nil, nil,
> line..'$EOF$\n')
> end
> if err then
> @@ -951,14 +1090,14 @@ space_metatable = function(remote)
>
> function methods:insert(tuple, opts)
> check_space_arg(self, 'insert')
> - return remote:_request(M_INSERT, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_INSERT, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:replace(tuple, opts)
> check_space_arg(self, 'replace')
> - return remote:_request(M_REPLACE, opts, self._format_cdata, self.id,
> - tuple)
> + return remote:_request(M_REPLACE, opts, self._format_cdata,
> + self._stream_id, self.id, tuple)
> end
>
> function methods:select(key, opts)
> @@ -978,7 +1117,8 @@ space_metatable = function(remote)
>
> function methods:upsert(key, oplist, opts)
> check_space_arg(self, 'upsert')
> - return nothing_or_data(remote:_request(M_UPSERT, opts, nil, self.id,
> + return nothing_or_data(remote:_request(M_UPSERT, opts, nil,
> + self._stream_id, self.id,
> key, oplist))
> end
>
> @@ -1009,8 +1149,8 @@ index_metatable = function(remote)
> local offset = tonumber(opts and opts.offset) or 0
> local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
> return (remote:_request(M_SELECT, opts, self.space._format_cdata,
> - self.space.id, self.id, iterator, offset,
> - limit, key))
> + self._stream_id, self.space.id, self.id,
> + iterator, offset, limit, key))
> end
>
> function methods:get(key, opts)
> @@ -1020,6 +1160,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_GET, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.EQ, 0, 2, key))
> end
> @@ -1031,6 +1172,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MIN, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.GE, 0, 1, key))
> end
> @@ -1042,6 +1184,7 @@ index_metatable = function(remote)
> end
> return nothing_or_data(remote:_request(M_MAX, opts,
> self.space._format_cdata,
> + self._stream_id,
> self.space.id, self.id,
> box.index.LE, 0, 1, key))
> end
> @@ -1053,22 +1196,24 @@ index_metatable = function(remote)
> end
> local code = string.format('box.space.%s.index.%s:count',
> self.space.name, self.name)
> - return remote:_request(M_COUNT, opts, nil, code, { key, opts })
> + return remote:_request(M_COUNT, opts, nil, self._stream_id,
> + code, { key, opts })
> end
>
> function methods:delete(key, opts)
> check_index_arg(self, 'delete')
> return nothing_or_data(remote:_request(M_DELETE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key))
> + self._stream_id, self.space.id,
> + self.id, key))
> end
>
> function methods:update(key, oplist, opts)
> check_index_arg(self, 'update')
> return nothing_or_data(remote:_request(M_UPDATE, opts,
> self.space._format_cdata,
> - self.space.id, self.id, key,
> - oplist))
> + self._stream_id, self.space.id,
> + self.id, key, oplist))
> end
>
> return { __index = methods, __metatable = false }
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
2021-08-11 11:52 ` Vladimir Davydov via Tarantool-patches
@ 2021-08-11 12:09 ` Vladimir Davydov via Tarantool-patches
1 sibling, 0 replies; 14+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-11 12:09 UTC (permalink / raw)
To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988
On Wed, Aug 11, 2021 at 11:56:56AM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988@gmail.com>
>
> Add stream support to `net.box`. In "net.box", stream
> is an object over connection that has the same methods,
> but all requests from it sends with non-zero stream ID.
> Since there can be a lot of streams, we do not copy the
> spaces from the connection to the stream immediately when
> creating a stream, but do it only when we first access space.
> Also, when updating the schema, we update the spaces in lazy
> mode: each stream has it's own schema_version, when there is
> some access to stream space we compare stream schema_version
> and connection schema_version and if they are different update
> clear stream space cache and wrap space that is being accessed
> to stream cache.
>
> Part of #5860
>
> @TarantoolBot document
> Title: stream support was added to net.box
> In "net.box", stream is an object over connection that
> has the same methods, but all requests from it sends
> with non-zero stream ID. Stream ID is generated on the
> client automatically. Simple example of stream creation
> using net.box:
> ```lua
> stream = conn:new_stream()
> -- all connection methods are valid, but send requests
> -- with non zero stream_id.
> ```
> ---
> src/box/lua/net_box.c | 95 ++--
> src/box/lua/net_box.lua | 205 ++++++--
> test/box/access.result | 6 +-
> test/box/access.test.lua | 6 +-
> ...net.box_console_connections_gh-2677.result | 2 +-
> ...t.box_console_connections_gh-2677.test.lua | 2 +-
> .../net.box_incorrect_iterator_gh-841.result | 4 +-
> ...net.box_incorrect_iterator_gh-841.test.lua | 4 +-
> test/box/net.box_iproto_hangs_gh-3464.result | 2 +-
> .../box/net.box_iproto_hangs_gh-3464.test.lua | 2 +-
> .../net.box_long-poll_input_gh-3400.result | 8 +-
> .../net.box_long-poll_input_gh-3400.test.lua | 8 +-
> test/box/stream.lua | 13 +
> test/box/stream.result | 485 ++++++++++++++++++
> test/box/stream.test.lua | 182 +++++++
Please come up with a better test name. 'Stream' is way too generic.
Should be at least net.box.streams.
> test/box/suite.ini | 2 +-
> 16 files changed, 934 insertions(+), 92 deletions(-)
> create mode 100644 test/box/stream.lua
> create mode 100644 test/box/stream.result
> create mode 100644 test/box/stream.test.lua
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
` (5 preceding siblings ...)
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 6/8] net.box: add stream support to net.box mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
7 siblings, 1 reply; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
Implement interactive transactions over iproto streams. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. If any request fails during the
transaction, it will not affect the other requests in the transaction.
If disconnect occurs when there is some active transaction in stream,
this transaction will be rollbacked, if it does not have time to commit
before this moment.
Part of #5860
@TarantoolBot document
Title: interactive transactions was implemented over iproto streams.
The main purpose of streams is transactions via iproto. Each stream
can start its own transaction, so they allows multiplexing several
transactions over one connection. There are multiple ways to begin,
commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
with corresponding function (box.begin, box.commit and box.rollback),
IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
some active transaction in stream, this transaction will be rollbacked, if it
does not have time to commit before this moment.
Add new command codes for begin, commit and rollback transactions:
`IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
`IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
---
src/box/call.c | 12 --
src/box/errcode.h | 1 +
src/box/iproto.cc | 243 +++++++++++++++++++++++++-
src/box/iproto_constants.c | 6 +
src/box/iproto_constants.h | 6 +
src/box/txn.c | 22 +++
src/box/txn.h | 19 ++
test/box-tap/feedback_daemon.test.lua | 2 +-
test/box/error.result | 1 +
test/box/misc.result | 5 +-
10 files changed, 300 insertions(+), 17 deletions(-)
diff --git a/src/box/call.c b/src/box/call.c
index a6384efe2..0ce84b1ed 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -141,8 +141,6 @@ box_process_call(struct call_request *request, struct port *port)
const char *name = request->name;
assert(name != NULL);
uint32_t name_len = mp_decode_strl(&name);
- /* Transaction is not started. */
- assert(!in_txn());
int rc;
struct port args;
@@ -157,11 +155,6 @@ box_process_call(struct call_request *request, struct port *port)
}
if (rc != 0)
return -1;
- if (in_txn() != NULL) {
- diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- port_destroy(port);
- return -1;
- }
return 0;
}
@@ -179,10 +172,5 @@ box_process_eval(struct call_request *request, struct port *port)
uint32_t expr_len = mp_decode_strl(&expr);
if (box_lua_eval(expr, expr_len, &args, port) != 0)
return -1;
- if (in_txn() != 0) {
- diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
- port_destroy(port);
- return -1;
- }
return 0;
}
diff --git a/src/box/errcode.h b/src/box/errcode.h
index f8fda23c1..a6f096698 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -282,6 +282,7 @@ struct errcode_record {
/*227 */_(ER_SYNC_QUEUE_UNCLAIMED, "The synchronous transaction queue doesn't belong to any instance")\
/*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\
/*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \
+ /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3b792130b..376abbff0 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -66,6 +66,7 @@
#include "tt_static.h"
#include "salad/stailq.h"
#include "assoc.h"
+#include "txn.h"
enum {
IPROTO_SALT_SIZE = 32,
@@ -79,6 +80,8 @@ enum {
struct iproto_connection;
struct iproto_stream {
+ /** Currently active stream transaction or NULL */
+ struct txn *txn;
/**
* Queue of pending requests (iproto messages) for this stream,
* processed sequentially. This field is accesable only from
@@ -89,6 +92,11 @@ struct iproto_stream {
uint64_t id;
/** This stream connection */
struct iproto_connection *connection;
+ /**
+ * Pre-allocated disconnect msg to gracefully rollback stream
+ * transaction and destroy stream object.
+ */
+ struct cmsg on_disconnect;
};
/**
@@ -135,6 +143,10 @@ struct iproto_thread {
/**
* Static routes for this iproto thread
*/
+ struct cmsg_hop begin_route[2];
+ struct cmsg_hop commit_route[2];
+ struct cmsg_hop rollback_route[2];
+ struct cmsg_hop rollback_on_disconnect_stream_route[2];
struct cmsg_hop destroy_route[2];
struct cmsg_hop disconnect_route[2];
struct cmsg_hop misc_route[2];
@@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
return NULL;
}
errinj_stream_count_add(1);
+ stream->txn = NULL;
stailq_create(&stream->pending_requests);
stream->id = stream_id;
stream->connection = connection;
return stream;
}
+static inline void
+iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream)
+{
+ struct iproto_connection *conn = stream->connection;
+ struct iproto_thread *iproto_thread = conn->iproto_thread;
+ struct cmsg_hop *route =
+ iproto_thread->rollback_on_disconnect_stream_route;
+ cmsg_init(&stream->on_disconnect, route);
+ cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
+}
+
/**
* Return true if we have not enough spare messages
* in the message pool.
@@ -670,6 +694,7 @@ static void
iproto_stream_delete(struct iproto_stream *stream)
{
assert(stailq_empty(&stream->pending_requests));
+ assert(stream->txn == NULL);
errinj_stream_count_add(-1);
mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
}
@@ -716,6 +741,7 @@ static inline bool
iproto_connection_is_idle(struct iproto_connection *con)
{
return con->long_poll_count == 0 &&
+ mh_size(con->streams) == 0 &&
ibuf_used(&con->ibuf[0]) == 0 &&
ibuf_used(&con->ibuf[1]) == 0;
}
@@ -805,6 +831,23 @@ iproto_connection_close(struct iproto_connection *con)
* is done only once.
*/
con->p_ibuf->wpos -= con->parse_size;
+ mh_int_t node;
+ mh_foreach(con->streams, node) {
+ struct iproto_stream *stream = (struct iproto_stream *)
+ mh_i64ptr_node(con->streams, node)->val;
+ /**
+ * If stream requests queue is empty, it means that
+ * that there is some active transaction which was
+ * not commited yet. We need to rollback it, since
+ * we push on_disconnect message to tx thread here.
+ * If stream requests queue is not empty, it means
+ * that stream processing some request in tx thread
+ * now. We destroy stream in `net_send_msg` after
+ * processing all requests.
+ */
+ if (stailq_empty(&stream->pending_requests))
+ iproto_stream_push_on_disconnect_msg(stream);
+ }
cpipe_push(&con->iproto_thread->tx_pipe, &con->disconnect_msg);
assert(con->state == IPROTO_CONNECTION_ALIVE);
con->state = IPROTO_CONNECTION_CLOSED;
@@ -965,6 +1008,7 @@ iproto_msg_start_processing_in_stream(struct iproto_msg *msg)
*/
errinj_stream_msg_count_add(1);
stream = (struct iproto_stream *)mh_i64ptr_node(con->streams, pos)->val;
+ assert(stream != NULL);
msg->stream = stream;
/*
* If the request queue in the stream is not empty, it means
@@ -1407,6 +1451,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
uint64_t stream_id;
uint8_t type;
bool request_is_not_for_stream;
+ bool request_is_only_for_stream;
struct iproto_thread *iproto_thread = msg->connection->iproto_thread;
if (xrow_header_decode(&msg->header, pos, reqend, true))
@@ -1418,11 +1463,19 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
request_is_not_for_stream =
((type > IPROTO_TYPE_STAT_MAX &&
type != IPROTO_PING) || type == IPROTO_AUTH);
+ request_is_only_for_stream =
+ (type == IPROTO_TRANSACTION_BEGIN ||
+ type == IPROTO_TRANSACTION_COMMIT ||
+ type == IPROTO_TRANSACTION_ROLLBACK);
if (stream_id != 0 && request_is_not_for_stream) {
diag_set(ClientError, ER_UNABLE_TO_PROCESS_IN_STREAM,
iproto_type_name(type));
goto error;
+ } else if (stream_id == 0 && request_is_only_for_stream) {
+ diag_set(ClientError, ER_UNABLE_TO_PROCESS_OUT_OF_STREAM,
+ iproto_type_name(type));
+ goto error;
}
/*
@@ -1450,6 +1503,15 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
sizeof(*(iproto_thread->dml_route)));
cmsg_init(&msg->base, iproto_thread->dml_route[type]);
break;
+ case IPROTO_TRANSACTION_BEGIN:
+ cmsg_init(&msg->base, iproto_thread->begin_route);
+ break;
+ case IPROTO_TRANSACTION_COMMIT:
+ cmsg_init(&msg->base, iproto_thread->commit_route);
+ break;
+ case IPROTO_TRANSACTION_ROLLBACK:
+ cmsg_init(&msg->base, iproto_thread->rollback_route);
+ break;
case IPROTO_CALL_16:
case IPROTO_CALL:
case IPROTO_EVAL:
@@ -1523,6 +1585,38 @@ tx_fiber_init(struct session *session, uint64_t sync)
fiber_set_user(f, &session->credentials);
}
+static void
+tx_process_rollback_on_disconnect(struct cmsg *m)
+{
+ struct iproto_stream *stream =
+ container_of(m, struct iproto_stream,
+ on_disconnect);
+
+ if (stream->txn != NULL) {
+ tx_fiber_init(stream->connection->session, 0);
+ txn_attach(stream->txn);
+ if (box_txn_rollback() != 0)
+ panic("failed to rollback transaction on disconnect");
+ stream->txn = NULL;
+ }
+}
+
+static void
+net_finish_rollback_on_disconnect(struct cmsg *m)
+{
+ struct iproto_stream *stream =
+ container_of(m, struct iproto_stream,
+ on_disconnect);
+ struct iproto_connection *con = stream->connection;
+
+ struct mh_i64ptr_node_t node = { stream->id, NULL };
+ mh_i64ptr_remove(con->streams, &node, 0);
+ iproto_stream_delete(stream);
+ assert(!evio_has_fd(&con->input));
+ if (con->state == IPROTO_CONNECTION_PENDING_DESTROY)
+ iproto_connection_try_to_start_destroy(con);
+}
+
static void
tx_process_disconnect(struct cmsg *m)
{
@@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
}
}
+/**
+ * Since the processing of requests within a transaction
+ * for a stream can occur in different fibers, we store
+ * a pointer to transaction in the stream structure.
+ * Check if message belongs to stream and there is active
+ * transaction for this stream. In case it is so, sets this
+ * transaction for current fiber.
+ */
+static inline void
+tx_prepare_transaction_for_request(struct iproto_msg *msg)
+{
+ if (msg->stream != NULL && msg->stream->txn != NULL) {
+ txn_attach(msg->stream->txn);
+ msg->stream->txn = NULL;
+ }
+ assert(!in_txn() || msg->stream != NULL);
+}
+
static inline struct iproto_msg *
tx_accept_msg(struct cmsg *m)
{
struct iproto_msg *msg = (struct iproto_msg *) m;
tx_accept_wpos(msg->connection, &msg->wpos);
tx_fiber_init(msg->connection->session, msg->header.sync);
+ tx_prepare_transaction_for_request(msg);
return msg;
}
+static inline void
+tx_end_msg(struct iproto_msg *msg)
+{
+ if (msg->stream != NULL)
+ msg->stream->txn = txn_detach();
+}
+
/**
* Write error message to the output buffer and advance
* write position. Doesn't throw.
@@ -1690,6 +1810,7 @@ tx_reply_iproto_error(struct cmsg *m)
iproto_reply_error(out, diag_last_error(&msg->diag),
msg->header.sync, ::schema_version);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
}
/** Inject a short delay on tx request processing for testing. */
@@ -1702,6 +1823,72 @@ tx_inject_delay(void)
});
}
+static void
+tx_process_begin(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_begin() != 0)
+ goto error;
+
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
+static void
+tx_process_commit(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_commit() != 0)
+ goto error;
+
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
+static void
+tx_process_rollback(struct cmsg *m)
+{
+ struct iproto_msg *msg = tx_accept_msg(m);
+ struct obuf *out;
+
+ if (tx_check_schema(msg->header.schema_version))
+ goto error;
+
+ if (box_txn_rollback() != 0)
+ goto error;
+
+ out = msg->connection->tx.p_obuf;
+ iproto_reply_ok(out, msg->header.sync, ::schema_version);
+ iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
+ return;
+error:
+ tx_reply_error(msg);
+ tx_end_msg(msg);
+}
+
static void
tx_process1(struct cmsg *m)
{
@@ -1723,9 +1910,11 @@ tx_process1(struct cmsg *m)
iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
tuple != 0);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -1766,9 +1955,11 @@ tx_process_select(struct cmsg *m)
iproto_reply_select(out, &svp, msg->header.sync,
::schema_version, count);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static int
@@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m)
trigger_clear(&fiber_on_yield);
+ if (in_txn() != NULL && msg->header.stream_id == 0) {
+ diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
+ port_destroy(&port);
+ goto error;
+ }
+
if (rc != 0)
goto error;
@@ -1856,9 +2053,11 @@ tx_process_call(struct cmsg *m)
iproto_reply_select(out, &svp, msg->header.sync,
::schema_version, count);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -1867,6 +2066,7 @@ tx_process_misc(struct cmsg *m)
struct iproto_msg *msg = tx_accept_msg(m);
struct iproto_connection *con = msg->connection;
struct obuf *out = con->tx.p_obuf;
+ assert(!(msg->header.type != IPROTO_PING && in_txn()));
if (tx_check_schema(msg->header.schema_version))
goto error;
@@ -1899,9 +2099,11 @@ tx_process_misc(struct cmsg *m)
} catch (Exception *e) {
tx_reply_error(msg);
}
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -1995,9 +2197,11 @@ tx_process_sql(struct cmsg *m)
port_destroy(&port);
iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
iproto_wpos_create(&msg->wpos, out);
+ tx_end_msg(msg);
return;
error:
tx_reply_error(msg);
+ tx_end_msg(msg);
}
static void
@@ -2007,6 +2211,7 @@ tx_process_replication(struct cmsg *m)
struct iproto_connection *con = msg->connection;
struct ev_io io;
coio_create(&io, con->input.fd);
+ assert(!in_txn());
try {
switch (msg->header.type) {
case IPROTO_JOIN:
@@ -2064,9 +2269,24 @@ iproto_msg_finish_processing_in_stream(struct iproto_msg *msg)
errinj_stream_msg_count_add(-1);
if (stailq_empty(&stream->pending_requests)) {
- struct mh_i64ptr_node_t node = { stream->id, NULL };
- mh_i64ptr_remove(con->streams, &node, 0);
- iproto_stream_delete(stream);
+ /*
+ * If no more messages for the current stream
+ * and no transaction started, then delete it.
+ */
+ if (stream->txn == NULL) {
+ struct mh_i64ptr_node_t node = { stream->id, NULL };
+ mh_i64ptr_remove(con->streams, &node, 0);
+ iproto_stream_delete(stream);
+ } else if (!evio_has_fd(&con->input)) {
+ /*
+ * Here we are in case when connection was closed,
+ * there is no messages in stream queue, but there
+ * is some active transaction in stream.
+ * Send disconnect message to rollback this
+ * transaction.
+ */
+ iproto_stream_push_on_disconnect_msg(stream);
+ }
} else {
/*
* If there are new messages for this stream
@@ -2407,6 +2627,23 @@ iproto_session_push(struct session *session, struct port *port)
static inline void
iproto_thread_init_routes(struct iproto_thread *iproto_thread)
{
+ iproto_thread->begin_route[0] =
+ { tx_process_begin, &iproto_thread->net_pipe };
+ iproto_thread->begin_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->commit_route[0] =
+ { tx_process_commit, &iproto_thread->net_pipe };
+ iproto_thread->commit_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->rollback_route[0] =
+ { tx_process_rollback, &iproto_thread->net_pipe };
+ iproto_thread->rollback_route[1] =
+ { net_send_msg, NULL };
+ iproto_thread->rollback_on_disconnect_stream_route[0] =
+ { tx_process_rollback_on_disconnect,
+ &iproto_thread->net_pipe };
+ iproto_thread->rollback_on_disconnect_stream_route[1] =
+ { net_finish_rollback_on_disconnect, NULL };
iproto_thread->destroy_route[0] =
{ tx_process_destroy, &iproto_thread->net_pipe };
iproto_thread->destroy_route[1] =
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index f2902946a..913a64de5 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -166,6 +166,9 @@ const char *iproto_type_strs[] =
"EXECUTE",
NULL, /* NOP */
"PREPARE",
+ "BEGIN",
+ "COMMIT",
+ "ROLLBACK",
};
#define bit(c) (1ULL<<IPROTO_##c)
@@ -184,6 +187,9 @@ const uint64_t iproto_body_key_map[IPROTO_TYPE_STAT_MAX] = {
0, /* EXECUTE */
0, /* NOP */
0, /* PREPARE */
+ 0, /* BEGIN */
+ 0, /* COMMIT */
+ 0, /* ROLLBACK */
};
#undef bit
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b9498868c..3210588db 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -237,6 +237,12 @@ enum iproto_type {
IPROTO_NOP = 12,
/** Prepare SQL statement. */
IPROTO_PREPARE = 13,
+ /* Begin transaction */
+ IPROTO_TRANSACTION_BEGIN = 14,
+ /* Commit transaction */
+ IPROTO_TRANSACTION_COMMIT = 15,
+ /* Rollback transaction */
+ IPROTO_TRANSACTION_ROLLBACK = 16,
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,
diff --git a/src/box/txn.c b/src/box/txn.c
index b80e722a4..796ab4529 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event)
txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
return 0;
}
+
+struct txn *
+txn_detach(void)
+{
+ struct txn *txn = in_txn();
+ if (txn == NULL)
+ return NULL;
+ if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
+ txn_on_yield(NULL, NULL);
+ trigger_clear(&txn->fiber_on_yield);
+ }
+ trigger_clear(&txn->fiber_on_stop);
+ fiber_set_txn(fiber(), NULL);
+ return txn;
+}
+
+void
+txn_attach(struct txn *txn)
+{
+ assert(txn != NULL);
+ fiber_set_txn(fiber(), txn);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 8741dc6a1..f11144567 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -457,6 +457,25 @@ fiber_set_txn(struct fiber *fiber, struct txn *txn)
fiber->storage.txn = txn;
}
+/**
+ * Detach transaction from fiber.
+ * By default if the fiber is stopped the transaction started
+ * in this fiber is rollback. This function detaches transaction
+ * from fiber - detached transaction does not rollback in case
+ * when fiber stopped, but can be aborted in case it does not
+ * support yeild.
+ */
+struct txn *
+txn_detach(void);
+
+/**
+ * Attach transaction to fiber.
+ * Attach @a txn that has been detached previously and saved
+ * somewhere to a new fiber.
+ */
+void
+txn_attach(struct txn *txn);
+
/**
* Start a transaction explicitly.
* @pre no transaction is active
diff --git a/test/box-tap/feedback_daemon.test.lua b/test/box-tap/feedback_daemon.test.lua
index a2e041649..f700f3f72 100755
--- a/test/box-tap/feedback_daemon.test.lua
+++ b/test/box-tap/feedback_daemon.test.lua
@@ -251,7 +251,7 @@ box.space.features_sync:drop()
local function check_stats(stat)
local sub = test:test('feedback operation stats')
- sub:plan(18)
+ sub:plan(21)
local box_stat = box.stat()
local net_stat = box.stat.net()
for op, val in pairs(box_stat) do
diff --git a/test/box/error.result b/test/box/error.result
index f80fdfed5..bc804197a 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -448,6 +448,7 @@ t;
| 227: box.error.SYNC_QUEUE_UNCLAIMED
| 228: box.error.SYNC_QUEUE_FOREIGN
| 229: box.error.UNABLE_TO_PROCESS_IN_STREAM
+ | 230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM
| ...
test_run:cmd("setopt delimiter ''");
diff --git a/test/box/misc.result b/test/box/misc.result
index b62a64355..c86245914 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -136,11 +136,14 @@ end;
t;
---
- - DELETE
+ - COMMIT
- SELECT
+ - ROLLBACK
- INSERT
- EVAL
- - CALL
- ERROR
+ - CALL
+ - BEGIN
- PREPARE
- REPLACE
- UPSERT
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-11 12:39 ` Vladimir Davydov via Tarantool-patches
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-11 12:39 UTC (permalink / raw)
To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988
On Wed, Aug 11, 2021 at 11:56:57AM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988@gmail.com>
>
> Implement interactive transactions over iproto streams. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. If any request fails during the
> transaction, it will not affect the other requests in the transaction.
> If disconnect occurs when there is some active transaction in stream,
> this transaction will be rollbacked, if it does not have time to commit
> before this moment.
>
> Part of #5860
>
> @TarantoolBot document
> Title: interactive transactions was implemented over iproto streams.
> The main purpose of streams is transactions via iproto. Each stream
> can start its own transaction, so they allows multiplexing several
> transactions over one connection. There are multiple ways to begin,
> commit and rollback transaction: using IPROTO_CALL and IPROTO_EVAL
> with corresponding function (box.begin, box.commit and box.rollback),
> IPROTO_EXECUTE with corresponding sql request ('TRANSACTION START',
> 'COMMIT', 'ROLLBACK') and IPROTO_TRANSACTION_BEGIN, IPROTO_TRANSACTION_COMMIT,
> IPROTO_TRANSACTION_ROLLBACK accordingly. If disconnect occurs when there is
> some active transaction in stream, this transaction will be rollbacked, if it
> does not have time to commit before this moment.
> Add new command codes for begin, commit and rollback transactions:
> `IPROTO_TRANSACTION_BEGIN 14`, `IPROTO_TRANSACTION_COMMIT 15` and
> `IPROTO_TRANSACTION_ROLLBACK 16` accordingly.
The suggestion was to rename IPROTO_ROLLBACK along with IPROTO_CONFIRM
along with IPROTO_CONFIRM to something else (IPROTO_RAFT_ROLLBACK?
IPROTO_SYNC_ROLLBACK? Please discuss the name with TeamS) and use
IPROTO_ROLLBACK for interactive transactions.
> diff --git a/src/box/iproto.cc b/src/box/iproto.cc
> index 3b792130b..376abbff0 100644
> --- a/src/box/iproto.cc
> +++ b/src/box/iproto.cc
> @@ -66,6 +66,7 @@
> #include "tt_static.h"
> #include "salad/stailq.h"
> #include "assoc.h"
> +#include "txn.h"
>
> enum {
> IPROTO_SALT_SIZE = 32,
> @@ -79,6 +80,8 @@ enum {
> struct iproto_connection;
>
> struct iproto_stream {
> + /** Currently active stream transaction or NULL */
> + struct txn *txn;
> /**
> * Queue of pending requests (iproto messages) for this stream,
> * processed sequentially. This field is accesable only from
> @@ -89,6 +92,11 @@ struct iproto_stream {
> uint64_t id;
> /** This stream connection */
> struct iproto_connection *connection;
> + /**
> + * Pre-allocated disconnect msg to gracefully rollback stream
> + * transaction and destroy stream object.
> + */
> + struct cmsg on_disconnect;
> };
>
> /**
> @@ -135,6 +143,10 @@ struct iproto_thread {
> /**
> * Static routes for this iproto thread
> */
> + struct cmsg_hop begin_route[2];
> + struct cmsg_hop commit_route[2];
> + struct cmsg_hop rollback_route[2];
> + struct cmsg_hop rollback_on_disconnect_stream_route[2];
I think _stream_ is rdundant here. Rollback only makes sense for
streams.
> struct cmsg_hop destroy_route[2];
> struct cmsg_hop disconnect_route[2];
> struct cmsg_hop misc_route[2];
> @@ -641,12 +653,24 @@ iproto_stream_new(struct iproto_connection *connection, uint64_t stream_id)
> return NULL;
> }
> errinj_stream_count_add(1);
> + stream->txn = NULL;
> stailq_create(&stream->pending_requests);
> stream->id = stream_id;
> stream->connection = connection;
> return stream;
> }
>
> +static inline void
> +iproto_stream_push_on_disconnect_msg(struct iproto_stream *stream)
iproto_rollback_on_disconnect?
> +{
> + struct iproto_connection *conn = stream->connection;
> + struct iproto_thread *iproto_thread = conn->iproto_thread;
> + struct cmsg_hop *route =
> + iproto_thread->rollback_on_disconnect_stream_route;
> + cmsg_init(&stream->on_disconnect, route);
> + cpipe_push(&iproto_thread->tx_pipe, &stream->on_disconnect);
> +}
> +
> /**
> * Return true if we have not enough spare messages
> * in the message pool.
> @@ -670,6 +694,7 @@ static void
> iproto_stream_delete(struct iproto_stream *stream)
> {
> assert(stailq_empty(&stream->pending_requests));
> + assert(stream->txn == NULL);
> errinj_stream_count_add(-1);
> mempool_free(&stream->connection->iproto_thread->iproto_stream_pool, stream);
> }
> @@ -716,6 +741,7 @@ static inline bool
> iproto_connection_is_idle(struct iproto_connection *con)
> {
> return con->long_poll_count == 0 &&
> + mh_size(con->streams) == 0 &&
Why do you need this? Stream-lined messages should already pin iproto
connection via ibuf. I tried to remove this and rerun the test you added
and got no failures.
> ibuf_used(&con->ibuf[0]) == 0 &&
> ibuf_used(&con->ibuf[1]) == 0;
> }
> @@ -1656,15 +1750,41 @@ tx_accept_wpos(struct iproto_connection *con, const struct iproto_wpos *wpos)
> }
> }
>
> +/**
> + * Since the processing of requests within a transaction
> + * for a stream can occur in different fibers, we store
> + * a pointer to transaction in the stream structure.
> + * Check if message belongs to stream and there is active
> + * transaction for this stream. In case it is so, sets this
> + * transaction for current fiber.
> + */
> +static inline void
> +tx_prepare_transaction_for_request(struct iproto_msg *msg)
> +{
> + if (msg->stream != NULL && msg->stream->txn != NULL) {
> + txn_attach(msg->stream->txn);
> + msg->stream->txn = NULL;
> + }
> + assert(!in_txn() || msg->stream != NULL);
> +}
> +
> static inline struct iproto_msg *
> tx_accept_msg(struct cmsg *m)
> {
> struct iproto_msg *msg = (struct iproto_msg *) m;
> tx_accept_wpos(msg->connection, &msg->wpos);
> tx_fiber_init(msg->connection->session, msg->header.sync);
> + tx_prepare_transaction_for_request(msg);
> return msg;
> }
>
> +static inline void
> +tx_end_msg(struct iproto_msg *msg)
> +{
> + if (msg->stream != NULL)
assert(msg->stream->txn == NULL);
> + msg->stream->txn = txn_detach();
> +}
> +
> /**
> * Write error message to the output buffer and advance
> * write position. Doesn't throw.
> @@ -1815,6 +2006,12 @@ tx_process_call(struct cmsg *m)
>
> trigger_clear(&fiber_on_yield);
>
> + if (in_txn() != NULL && msg->header.stream_id == 0) {
> + diag_set(ClientError, ER_FUNCTION_TX_ACTIVE);
> + port_destroy(&port);
> + goto error;
> + }
> +
Please move this after the rc != 0 check below.
> if (rc != 0)
> goto error;
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index b80e722a4..796ab4529 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -1255,3 +1255,25 @@ txn_on_yield(struct trigger *trigger, void *event)
> txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD);
> return 0;
> }
> +
> +struct txn *
> +txn_detach(void)
> +{
> + struct txn *txn = in_txn();
> + if (txn == NULL)
> + return NULL;
> + if (!txn_has_flag(txn, TXN_CAN_YIELD)) {
> + txn_on_yield(NULL, NULL);
> + trigger_clear(&txn->fiber_on_yield);
> + }
> + trigger_clear(&txn->fiber_on_stop);
> + fiber_set_txn(fiber(), NULL);
> + return txn;
> +}
> +
> +void
> +txn_attach(struct txn *txn)
> +{
> + assert(txn != NULL);
assert(!in_txn());
> + fiber_set_txn(fiber(), txn);
> +}
^ permalink raw reply [flat|nested] 14+ messages in thread
* [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box
2021-08-11 8:56 [Tarantool-patches] [PATCH v3 0/8] implement iproto streams mechanik20051988 via Tarantool-patches
` (6 preceding siblings ...)
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 7/8] iproto: implement interactive transactions over iproto streams mechanik20051988 via Tarantool-patches
@ 2021-08-11 8:56 ` mechanik20051988 via Tarantool-patches
2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
7 siblings, 1 reply; 14+ messages in thread
From: mechanik20051988 via Tarantool-patches @ 2021-08-11 8:56 UTC (permalink / raw)
To: tarantool-patches, vdavydov, v.shpilevoy; +Cc: mechanik20051988
From: mechanik20051988 <mechanik20.05.1988@gmail.com>
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly.
Closes #5860
@TarantoolBot document
Title: add interactive transaction support in net.box
Implement `begin`, `commit` and `rollback` methods for stream object
in `net.box`, which allows to begin, commit and rollback transaction
accordingly. Now there are multiple ways to begin, commit and rollback
transaction from `net.box`: using appropriate stream methods, using 'call`
or 'eval' methods or using `execute` method with sql transaction syntax.
User can mix these methods, for example, start transaction using
`stream:begin()`, and commit transaction using `stream:call('box.commit')`
or stream:execute('COMMIT').
Simple example of using interactive transactions via iproto from net.box:
```lua
stream = conn:new_stream()
space = stream.space.test
space_not_from_stream = conn.space.test
stream:begin()
space:replace({1})
-- return previously inserted tuple, because request
-- belongs to transaction.
space:select({})
-- empty select, because select doesn't belongs to
-- transaction
space_not_from_stream:select({})
stream:call('box.commit')
-- now transaction was commited, so all requests
-- returns tuple.
```
Different examples of using streams you can find in
gh-5860-implement-streams-in-iproto.test.lua
---
.../gh-5860-implement-streams-in-iproto.md | 26 +
src/box/lua/net_box.c | 49 +-
src/box/lua/net_box.lua | 35 +-
test/box/stream.result | 3558 +++++++++++++++--
test/box/stream.test.lua | 1202 ++++++
5 files changed, 4554 insertions(+), 316 deletions(-)
create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
diff --git a/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
new file mode 100644
index 000000000..8a8eec3e7
--- /dev/null
+++ b/changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
@@ -0,0 +1,26 @@
+## feature/core
+
+* Streams and interactive transactions over streams are implemented
+ in iproto. Stream is associated with it's ID, which is unique within
+ one connection. All requests with same not zero stream ID belongs to
+ the same stream. All requests in stream processed synchronously. The
+ execution of the next request will not start until the previous one is
+ completed. If request has zero stream ID it does not belong to stream
+ and is processed in the old way.
+ In `net.box`, stream is an object above connection that has the same
+ methods, but allows to execute requests sequentially. ID is generated
+ on the client side automatically. If user writes his own connector and
+ wants to use streams, he must transmit stream_id over iproto protocol.
+ The main purpose of streams is transactions via iproto. Each stream
+ can start its own transaction, so they allows multiplexing several
+ transactions over one connection. There are multiple ways to begin,
+ commit and rollback transaction: using appropriate stream methods, using
+ `call` or `eval` methods or using `execute` method with sql transaction
+ syntax. User can mix these methods, for example, start transaction using
+ `stream:begin()`, and commit transaction using `stream:call('box.commit')`
+ or stream:execute('COMMIT').
+ If any request fails during the transaction, it will not affect the other
+ requests in the transaction. If disconnect occurs when there is some active
+ transaction in stream, this transaction will be rollbacked, if it does not
+ have time to commit before this moment.
+
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 3bc49af23..70294ae27 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -75,7 +75,10 @@ enum netbox_method {
NETBOX_MIN = 14,
NETBOX_MAX = 15,
NETBOX_COUNT = 16,
- NETBOX_INJECT = 17,
+ NETBOX_BEGIN = 17,
+ NETBOX_COMMIT = 18,
+ NETBOX_ROLLBACK = 19,
+ NETBOX_INJECT = 20,
netbox_method_MAX
};
@@ -916,6 +919,44 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
netbox_encode_prepare(L, idx, stream, sync, stream_id);
}
+static inline void
+netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
+ struct mpstream *stream, uint64_t sync,
+ uint64_t stream_id)
+{
+ (void)L;
+ (void) idx;
+ assert(type == IPROTO_TRANSACTION_BEGIN ||
+ type == IPROTO_TRANSACTION_COMMIT ||
+ type == IPROTO_TRANSACTION_ROLLBACK);
+ size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
+ netbox_end_encode(stream, svp);
+}
+
+static void
+netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_BEGIN, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_COMMIT, idx, stream,
+ sync, stream_id);
+}
+
+static void
+netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
+ uint64_t sync, uint64_t stream_id)
+{
+ return netbox_encode_txn(L, IPROTO_TRANSACTION_ROLLBACK, idx, stream,
+ sync, stream_id);
+}
+
static void
netbox_encode_inject(struct lua_State *L, int idx, struct mpstream *stream,
uint64_t sync, uint64_t stream_id)
@@ -959,6 +1000,9 @@ netbox_encode_method(struct lua_State *L, int idx, enum netbox_method method,
[NETBOX_MIN] = netbox_encode_select,
[NETBOX_MAX] = netbox_encode_select,
[NETBOX_COUNT] = netbox_encode_call,
+ [NETBOX_BEGIN] = netbox_encode_begin,
+ [NETBOX_COMMIT] = netbox_encode_commit,
+ [NETBOX_ROLLBACK] = netbox_encode_rollback,
[NETBOX_INJECT] = netbox_encode_inject,
};
struct mpstream stream;
@@ -1330,6 +1374,9 @@ netbox_decode_method(struct lua_State *L, enum netbox_method method,
[NETBOX_MIN] = netbox_decode_tuple,
[NETBOX_MAX] = netbox_decode_tuple,
[NETBOX_COUNT] = netbox_decode_value,
+ [NETBOX_BEGIN] = netbox_decode_nil,
+ [NETBOX_COMMIT] = netbox_decode_nil,
+ [NETBOX_ROLLBACK] = netbox_decode_nil,
[NETBOX_INJECT] = netbox_decode_table,
};
method_decoder[method](L, data, data_end, format);
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 3dffc245f..745a8c0f5 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -51,8 +51,11 @@ local M_GET = 13
local M_MIN = 14
local M_MAX = 15
local M_COUNT = 16
+local M_BEGIN = 17
+local M_COMMIT = 18
+local M_ROLLBACK = 19
-- Injects raw data into connection. Used by console and tests.
-local M_INJECT = 17
+local M_INJECT = 20
-- utility tables
local is_final_state = {closed = 1, error = 1}
@@ -754,11 +757,38 @@ local function new_stream(stream)
return stream._conn:new_stream()
end
+local function begin(stream, opts)
+ check_remote_arg(stream, 'begin')
+ local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
+local function commit(stream, opts)
+ check_remote_arg(stream, 'commit')
+ local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
+local function rollback(stream, opts)
+ check_remote_arg(stream, 'rollback')
+ local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
+ if opts and opts.is_async then
+ return res
+ end
+end
+
function remote_methods:new_stream()
check_remote_arg(self, 'new_stream')
self._last_stream_id = self._last_stream_id + 1
local stream = setmetatable({
new_stream = new_stream,
+ begin = begin,
+ commit = commit,
+ rollback = rollback,
_stream_id = self._last_stream_id,
space = setmetatable({
_stream_space_cache = {},
@@ -1243,6 +1273,9 @@ local this_module = {
min = M_MIN,
max = M_MAX,
count = M_COUNT,
+ begin = M_BEGIN,
+ commit = M_COMMIT,
+ rollback = M_ROLLBACK,
inject = M_INJECT,
}
}
diff --git a/test/box/stream.result b/test/box/stream.result
index 03200ecf6..95fd1ca51 100644
--- a/test/box/stream.result
+++ b/test/box/stream.result
@@ -1,24 +1,27 @@
--- test-run result file version 2
-- This test checks streams iplementation in iproto (gh-5860).
net_box = require('net.box')
- | ---
- | ...
+---
+...
+json = require('json')
+---
+...
fiber = require('fiber')
- | ---
- | ...
+---
+...
+msgpack = require('msgpack')
+---
+...
test_run = require('test_run').new()
- | ---
- | ...
-
+---
+...
test_run:cmd("create server test with script='box/stream.lua'")
- | ---
- | - true
- | ...
-
+---
+- true
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
function get_current_connection_count()
local total_net_stat_table =
test_run:cmd(string.format("eval test 'return box.stat.net()'"))[1]
@@ -27,8 +30,8 @@ function get_current_connection_count()
assert(connection_stat_table)
return connection_stat_table.current
end;
- | ---
- | ...
+---
+...
function wait_and_return_results(futures)
local results = {}
for name, future in pairs(futures) do
@@ -40,446 +43,3373 @@ function wait_and_return_results(futures)
end
return results
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- Some simple checks for new object - stream
test_run:cmd("start server test with args='1'")
- | ---
- | - true
- | ...
+---
+- true
+...
server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
- | ---
- | ...
+---
+...
conn_1 = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream_1 = conn_1:new_stream()
- | ---
- | ...
+---
+...
conn_2 = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream_2 = conn_2:new_stream()
- | ---
- | ...
+---
+...
-- Stream is a wrapper around connection, so if you close connection
-- you close stream, and vice versa.
conn_1:close()
- | ---
- | ...
+---
+...
assert(not stream_1:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
stream_2:close()
- | ---
- | ...
+---
+...
assert(not conn_2:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
-- The new method `new_stream`, for the stream object, returns a new
-- stream object, just as in the case of connection.
_ = stream:new_stream()
- | ---
- | ...
+---
+...
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+---
+...
+conn_2 = net_box.connect(server_addr)
+---
+...
+stream_1_1 = conn_1:new_stream()
+---
+...
+stream_1_2 = conn_1:new_stream()
+---
+...
+stream_2 = conn_2:new_stream()
+---
+...
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+---
+...
+stream_1_1:rollback()
+---
+...
+stream_1_1:begin()
+---
+...
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+---
+...
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+---
+...
+box.commit()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+stream_1_1:commit()
+---
+...
+stream_1_2:commit()
+---
+...
+stream_2:commit()
+---
+...
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+---
+- error: Unable to process BEGIN request out of stream
+...
+conn:_request(net_box._method.commit, nil, nil, nil)
+---
+- error: Unable to process COMMIT request out of stream
+...
+conn:_request(net_box._method.rollback, nil, nil, nil)
+---
+- error: Unable to process ROLLBACK request out of stream
+...
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+---
+...
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+---
+...
+IPROTO_REQUEST_TYPE = 0x00
+---
+...
+IPROTO_SYNC = 0x01
+---
+...
+IPROTO_AUTH = 7
+---
+...
+IPROTO_STREAM_ID = 0x0a
+---
+...
+next_request_id = 9
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+---
+...
+body = msgpack.encode({nil});
+---
+...
+size = msgpack.encode(header:len() + body:len());
+---
+...
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+---
+- null
+- Unable to process AUTH request in stream
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
conn:close()
- | ---
- | ...
-
+---
+...
-- Check that spaces in stream object updates, during reload_schema
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
-- Create one space on server
s = box.schema.space.create('test', { engine = 'memtx' })
- | ---
- | ...
+---
+...
_ = s:create_index('primary')
- | ---
- | ...
+---
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not conn.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not stream.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(conn.schema_version == stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
conn:reload_schema()
- | ---
- | ...
+---
+...
assert(conn.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(conn.schema_version ~= stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(stream.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
-- When we touch stream.space, we compare stream._schema_version
-- and conn.schema_version if they are not equal, we clear stream
-- space cache, update it's _schema_version and load space from
-- connection to stream space cache.
assert(conn.schema_version == stream._schema_version)
- | ---
- | - true
- | ...
+---
+- true
+...
collectgarbage()
- | ---
- | - 0
- | ...
+---
+- 0
+...
collectgarbage()
- | ---
- | - 0
- | ...
+---
+- 0
+...
assert(conn.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(stream.space.test ~= nil)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
s:drop()
- | ---
- | ...
+---
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
conn:reload_schema()
- | ---
- | ...
+---
+...
assert(not conn.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(not stream.space.test)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:cmd("stop server test")
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- All test works with iproto_thread count = 10
-
test_run:cmd("start server test with args='10'")
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('test')
- | ---
- | - true
- | ...
+---
+- true
+...
fiber = require('fiber')
- | ---
- | ...
+---
+...
s = box.schema.space.create('test', { engine = 'memtx' })
- | ---
- | ...
+---
+...
_ = s:create_index('primary')
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
function replace_with_yeild(item)
fiber.sleep(0.1)
return s:replace({item})
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('default')
- | ---
- | - true
- | ...
-
+---
+- true
+...
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
assert(conn:ping())
- | ---
- | - true
- | ...
+---
+- true
+...
conn_space = conn.space.test
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
stream_space = stream.space.test
- | ---
- | ...
-
+---
+...
-- Check that all requests in stream processed consistently
futures = {}
- | ---
- | ...
+---
+...
replace_count = 3
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
for i = 1, replace_count do
futures[string.format("replace_%d", i)] =
stream_space:replace({i}, {is_async = true})
futures[string.format("select_%d", i)] =
stream_space:select({}, {is_async = true})
end;
- | ---
- | ...
+---
+...
futures["replace_with_yeild_for_stream"] =
stream:call("replace_with_yeild",
{ replace_count + 1 }, {is_async = true});
- | ---
- | ...
+---
+...
futures["select_with_yeild_for_stream"] =
stream_space:select({}, {is_async = true});
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
results = wait_and_return_results(futures)
- | ---
- | ...
+---
+...
-- [1]
assert(results["select_1"])
- | ---
- | - - [1]
- | ...
+---
+- - [1]
+...
-- [1] [2]
assert(results["select_2"])
- | ---
- | - - [1]
- | - [2]
- | ...
+---
+- - [1]
+ - [2]
+...
-- [1] [2] [3]
assert(results["select_3"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+...
-- [1] [2] [3] [4]
-- Even yeild in replace function does not affect
-- the order of requests execution in stream
assert(results["select_with_yeild_for_stream"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | ...
-
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+...
-- There is no request execution order for the connection
futures = {}
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
futures["replace_with_yeild_for_connection"] =
conn:call("replace_with_yeild", { replace_count + 2 }, {is_async = true});
- | ---
- | ...
+---
+...
futures["select_with_yeild_for_connection"] =
conn_space:select({}, {is_async = true});
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
results = wait_and_return_results(futures)
- | ---
- | ...
+---
+...
-- [1] [2] [3] [4]
-- Select will be processed earlier because of
-- yeild in `replace_with_yeild` function
assert(results["select_with_yeild_for_connection"])
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | ...
-test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+...
+test_run:switch("test")
+---
+- true
+...
-- [1] [2] [3] [4] [5]
s:select()
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | - [5]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+ - [5]
+...
errinj = box.error.injection
- | ---
- | ...
+---
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch('default')
- | ---
- | - true
- | ...
+---
+- true
+...
conn:close()
- | ---
- | ...
+---
+...
test_run:wait_cond(function () return get_current_connection_count() == 0 end)
- | ---
- | - true
- | ...
-
+---
+- true
+...
-- Check that all request will be processed
-- after connection close.
conn = net_box.connect(server_addr)
- | ---
- | ...
+---
+...
stream = conn:new_stream()
- | ---
- | ...
+---
+...
space = stream.space.test
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ';'")
- | ---
- | - true
- | ...
+---
+- true
+...
replace_count = 20
for i = 1, replace_count do
space:replace({i}, {is_async = true})
end;
- | ---
- | ...
+---
+...
test_run:cmd("setopt delimiter ''");
- | ---
- | - true
- | ...
+---
+- true
+...
-- Give time to send
fiber.sleep(0)
- | ---
- | ...
+---
+...
conn:close()
- | ---
- | ...
+---
+...
test_run:wait_cond(function () return get_current_connection_count() == 0 end)
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:switch("test")
- | ---
- | - true
- | ...
+---
+- true
+...
-- select return tuples from [1] to [20]
-- because all messages processed after
-- connection closed
s:select{}
- | ---
- | - - [1]
- | - [2]
- | - [3]
- | - [4]
- | - [5]
- | - [6]
- | - [7]
- | - [8]
- | - [9]
- | - [10]
- | - [11]
- | - [12]
- | - [13]
- | - [14]
- | - [15]
- | - [16]
- | - [17]
- | - [18]
- | - [19]
- | - [20]
- | ...
+---
+- - [1]
+ - [2]
+ - [3]
+ - [4]
+ - [5]
+ - [6]
+ - [7]
+ - [8]
+ - [9]
+ - [10]
+ - [11]
+ - [12]
+ - [13]
+ - [14]
+ - [15]
+ - [16]
+ - [17]
+ - [18]
+ - [19]
+ - [20]
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+---
+- []
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+---
+...
+stream:ping()
+---
+- true
+...
+stream:commit()
+---
+...
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+---
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+stream:call('s:replace', {{1}})
+---
+- [1]
+...
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+---
+- []
+...
+stream:call('s:select', {})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+---
+- error: Transaction has been aborted by a fiber yield
+...
+-- Select is empty, transaction was aborted
+space:select{}
+---
+- []
+...
+test_run:switch('test')
+---
+- true
+...
s:drop()
- | ---
- | ...
+---
+...
+-- Check that there are no streams and messages, which
+-- was not deleted
errinj = box.error.injection
- | ---
- | ...
+---
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
- | ---
- | - true
- | ...
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+stream:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+---
+...
+space_2_no_stream = conn.space.test_2
+---
+...
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+stream_2:begin()
+---
+...
+space_2:replace({1})
+---
+- [1]
+...
+test_run:switch('test')
+---
+- true
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+---
+- []
+...
+space_2_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+space_2:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
test_run:switch("default")
- | ---
- | - true
- | ...
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+---
+- - [1]
+...
+space_2:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+...
+s2:select()
+---
+- - [1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
test_run:cmd("stop server test")
- | ---
- | - true
- | ...
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Simple read/write conflict.
+space_1_1:select({1})
+---
+- []
+...
+space_1_2:select({1})
+---
+- []
+...
+space_1_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_1_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_1_1:select({})
+---
+- - [1, 1]
+...
+space_1_2:select({})
+---
+- - [1, 1]
+...
+-- Same test for vinyl sapce
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_2_1:select({1})
+---
+- []
+...
+space_2_2:select({1})
+---
+- []
+...
+space_2_1:replace({1, 1})
+---
+- [1, 1]
+...
+space_2_2:replace({1, 2})
+---
+- [1, 2]
+...
+stream_1:commit()
+---
+...
+-- This transaction fails, because of conflict
+stream_2:commit()
+---
+- error: Transaction has been aborted by conflict
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- Here we must accept [1, 1]
+space_2_1:select({})
+---
+- - [1, 1]
+...
+space_2_2:select({})
+---
+- - [1, 1]
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Test rollback for memtx space
+space_1:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+---
+- - [1]
+...
+stream_1:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_1:select({})
+---
+- []
+...
+-- Test rollback for vinyl space
+space_2:replace({1})
+---
+- [1]
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+---
+- - [1]
+...
+stream_2:rollback()
+---
+...
+-- Select is empty, transaction rollback
+space_2:select({})
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+---
+...
+stream_1:commit()
+---
+...
+stream_2:begin()
+---
+...
+stream_2:commit()
+---
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+ - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+ - [2]
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty selects, transaction was rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_1:select({})
+---
+- - [1]
+ - [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+-- Select return two previously inserted tuples
+space_2:select({})
+---
+- - [1]
+ - [2]
+...
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select was empty, transaction rollbacked
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = stream_1:commit({is_async = true})
+---
+...
+_ = stream_2:commit({is_async = true})
+---
+...
+fiber.sleep(0)
+---
+...
+conn:close()
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+---
+- true
+...
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+---
+- true
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+---
+...
+rc2 = s2:select()
+---
+...
+assert(#rc1)
+---
+- 100
+...
+assert(#rc2)
+---
+- 100
+...
+s1:truncate()
+---
+...
+s2:truncate()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Reconnect
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+-- Two empty selects
+space_1:select({})
+---
+- []
+...
+space_2:select({})
+---
+- []
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Both select are empty, because transaction rollback
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+stream_1:begin()
+---
+...
+stream_2:begin()
+---
+...
+space_1:replace({1})
+---
+- [1]
+...
+space_1:replace({2})
+---
+- [2]
+...
+space_2:replace({1})
+---
+- [1]
+...
+space_2:replace({2})
+---
+- [2]
+...
+stream_1:commit()
+---
+...
+stream_2:commit()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+s1:select{}
+---
+- - [1]
+ - [2]
+...
+-- Here we get two tuples, commit was successful
+s2:select{}
+---
+- - [1]
+ - [2]
+...
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("start server test with args='1, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+---
+- - [1]
+ - [2]
+...
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+---
+- - [1]
+ - [2]
+...
+box.space.test_1:drop()
+---
+...
+box.space.test_2:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream_1 = conn:new_stream()
+---
+...
+space_1 = stream_1.space.test_1
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_2 = stream_2.space.test_2
+---
+...
+memtx_futures = {}
+---
+...
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+---
+...
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+---
+...
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+---
+...
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+---
+...
+vinyl_futures = {}
+---
+...
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+---
+...
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+---
+...
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+---
+...
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s1:select()
+---
+- []
+...
+s2:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+---
+...
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+---
+...
+memtx_results = wait_and_return_results(memtx_futures)
+---
+...
+vinyl_results = wait_and_return_results(vinyl_futures)
+---
+...
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+---
+- true
+...
+assert(not vinyl_results["begin"])
+---
+- true
+...
+-- [1]
+assert(memtx_results["replace"])
+---
+- [1]
+...
+assert(vinyl_results["replace"])
+---
+- [1]
+...
+-- [2]
+assert(memtx_results["insert"])
+---
+- [2]
+...
+assert(vinyl_results["insert"])
+---
+- [2]
+...
+-- [1] [2]
+assert(memtx_results["select"])
+---
+- - [1]
+ - [2]
+...
+assert(vinyl_results["select"])
+---
+- - [1]
+ - [2]
+...
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+---
+- true
+...
+assert(not vinyl_results["commit"])
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+---
+- - [1]
+ - [2]
+...
+s2:select()
+---
+- - [1]
+ - [2]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+---
+...
+_ = s1:create_index('primary')
+---
+...
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+---
+...
+_ = s2:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream_1 = conn:new_stream()
+---
+...
+stream_2 = conn:new_stream()
+---
+...
+space_1_1 = stream_1.space.test_1
+---
+...
+space_1_2 = stream_2.space.test_1
+---
+...
+space_2_1 = stream_1.space.test_2
+---
+...
+space_2_2 = stream_2.space.test_2
+---
+...
+futures_1 = {}
+---
+...
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+---
+...
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+---
+...
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+---
+...
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+---
+...
+results_1 = wait_and_return_results(futures_1)
+---
+...
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+---
+- true
+...
+assert(not results_1["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_1["select_1_1"][1])
+---
+- true
+...
+assert(not results_1["select_1_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_1["replace_1_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_1["replace_1_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_1["replace_1_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+futures_2 = {}
+---
+...
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+---
+...
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+---
+...
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+---
+...
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+---
+...
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+---
+...
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+---
+...
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+---
+...
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+---
+...
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+---
+...
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+---
+...
+results_2 = wait_and_return_results(futures_2)
+---
+...
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+---
+- true
+...
+assert(not results_2["begin_2"])
+---
+- true
+...
+-- []
+assert(not results_2["select_2_1"][1])
+---
+- true
+...
+assert(not results_2["select_2_2"][1])
+---
+- true
+...
+-- [1]
+assert(results_2["replace_2_1"][1])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_1"][2])
+---
+- 1
+...
+-- [1]
+assert(results_2["replace_2_2"][1])
+---
+- 1
+...
+-- [2]
+assert(results_2["replace_2_2"][2])
+---
+- 2
+...
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+---
+- true
+...
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+---
+- Transaction has been aborted by conflict
+...
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+---
+- [1, 1]
+...
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+test_run:switch('test')
+---
+- true
+...
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+---
+- - [1, 1]
+...
+s2:select()
+---
+- - [1, 1]
+...
+s1:drop()
+---
+...
+s2:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+function ping() return "pong" end
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+space_no_stream = conn.space.test
+---
+...
+-- successful begin using stream:call
+stream:call('box.begin')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:call('ping')
+---
+- pong
+...
+stream:eval('ping()')
+---
+...
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+stream:eval('box.begin()')
+---
+- error: 'Operation is not permitted when there is an active transaction '
+...
+-- successful commit using stream:call
+stream:call('box.commit')
+---
+...
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+---
+...
+space:replace({1})
+---
+- [1]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+---
+- - [1]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select return tuple, because transaction was successful
+s:select()
+---
+- - [1]
+...
+s:delete{1}
+---
+- [1]
+...
+test_run:switch('default')
+---
+- true
+...
+-- Check rollback using stream:call
+stream:begin()
+---
+...
+space:replace({2})
+---
+- [2]
+...
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+---
+- []
+...
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+---
+- - [2]
+...
+test_run:switch("test")
+---
+- true
+...
+-- Select is empty, transaction was not commited
+s:select()
+---
+- []
+...
+test_run:switch('default')
+---
+- true
+...
+--Successful rollback using stream:call
+stream:call('box.rollback')
+---
+...
+-- Empty selects transaction rollbacked
+space:select({})
+---
+- []
+...
+space_no_stream:select{}
+---
+- []
+...
+test_run:switch("test")
+---
+- true
+...
+-- Empty select transaction rollbacked
+s:select()
+---
+- []
+...
+s:drop()
+---
+...
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create('test', { engine = 'memtx' })
+---
+...
+_ = s:create_index('primary')
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+for i = 1, 10 do space:replace{i} end
+---
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+s:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+---
+...
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+---
+...
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+---
+...
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+---
+...
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+---
+...
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+---
+...
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:cmd("start server test with args='10, true'")
+---
+- true
+...
+test_run:switch('test')
+---
+- true
+...
+test_run:cmd("setopt delimiter '$'")
+---
+- true
+...
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+---
+...
+test_run:cmd("setopt delimiter ''")$
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+---
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+-- No txn.
+true_ddl_res = monster_ddl()
+---
+...
+true_ddl_res
+---
+- - 'Finished ok, errors in the middle: '
+ - Space 'T1' already exists
+ - Space 'T1' already exists
+ - Space 'T3' does not exist
+ - Index 'T1A' already exists in space 'T1'
+ - 'Failed to execute SQL statement: can not truncate space ''T2'' because other
+ objects depend on it'
+ - Space 'T_DOES_NOT_EXIST' does not exist
+...
+true_check_res = monster_ddl_check()
+---
+...
+true_check_res
+---
+- - 'Finished ok, errors and trigger catcher content: '
+ - 'Check constraint failed ''CK1'': b < 100'
+ - Duplicate key exists in unique index "unique_unnamed_T2_2" in space "T2" with
+ old tuple - [1, 1, 1] and new tuple - [2, 2, 1]
+ - 'Failed to execute SQL statement: FOREIGN KEY constraint failed'
+ - 'Check constraint failed ''CK2'': a > 0'
+ - metadata:
+ - name: ID
+ type: integer
+ rows:
+ - [1]
+...
+monster_ddl_clear()
+---
+...
+monster_ddl_is_clean()
+---
+...
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+---
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+---
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+ddl_res = monster_ddl(stream)
+---
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+---
+- true
+...
+check_res = monster_ddl_check(stream)
+---
+...
+monster_ddl_cmp_res(check_res, true_check_res)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+monster_ddl_clear(stream)
+---
+...
+stream:call('monster_ddl_is_clean')
+---
+- true
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+test_run:switch("test")
+---
+- true
+...
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+---
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+---
+- true
+...
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+---
+- true
+...
+test_run:switch('default')
+---
+- true
+...
+conn:close()
+---
+...
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+---
+- true
+...
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+---
+...
+assert(conn:ping())
+---
+- true
+...
+stream = conn:new_stream()
+---
+...
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+---
+- row_count: 1
+...
+-- reload schema
+stream:ping()
+---
+- true
+...
+space = stream.space.TEST
+---
+...
+assert(space ~= nil)
+---
+- true
+...
+stream:execute('START TRANSACTION')
+---
+- row_count: 0
+...
+space:replace{1, 2, '3'}
+---
+- [1, 2, '3']
+...
+space:select()
+---
+- - [1, 2, '3']
+...
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+---
+- []
+...
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+---
+...
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+---
+- true
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows: []
+...
+stream:execute('COMMIT')
+---
+- row_count: 0
+...
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+---
+- metadata:
+ - name: ID
+ type: integer
+ - name: A
+ type: number
+ - name: B
+ type: string
+ rows:
+ - [1, 2, '3']
+...
+stream:unprepare(stream_pr.stmt_id)
+---
+- null
+...
+conn:close()
+---
+...
+test_run:switch('test')
+---
+- true
+...
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+---
+- - [1, 2, '3']
+...
+box.space.TEST:drop()
+---
+...
+test_run:switch('default')
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
test_run:cmd("cleanup server test")
- | ---
- | - true
- | ...
+---
+- true
+...
test_run:cmd("delete server test")
- | ---
- | - true
- | ...
+---
+- true
+...
diff --git a/test/box/stream.test.lua b/test/box/stream.test.lua
index 72129a228..f99c16c0d 100644
--- a/test/box/stream.test.lua
+++ b/test/box/stream.test.lua
@@ -1,6 +1,8 @@
-- This test checks streams iplementation in iproto (gh-5860).
net_box = require('net.box')
+json = require('json')
fiber = require('fiber')
+msgpack = require('msgpack')
test_run = require('test_run').new()
test_run:cmd("create server test with script='box/stream.lua'")
@@ -45,6 +47,63 @@ stream = conn:new_stream()
-- The new method `new_stream`, for the stream object, returns a new
-- stream object, just as in the case of connection.
_ = stream:new_stream()
+
+-- Simple checks for transactions
+conn_1 = net_box.connect(server_addr)
+conn_2 = net_box.connect(server_addr)
+stream_1_1 = conn_1:new_stream()
+stream_1_2 = conn_1:new_stream()
+stream_2 = conn_2:new_stream()
+-- It's ok to commit or rollback without any active transaction
+stream_1_1:commit()
+stream_1_1:rollback()
+
+stream_1_1:begin()
+-- Error unable to start second transaction in one stream
+stream_1_1:begin()
+-- It's ok to start transaction in separate stream in one connection
+stream_1_2:begin()
+-- It's ok to start transaction in separate stream in other connection
+stream_2:begin()
+test_run:switch("test")
+-- It's ok to start local transaction separately with active stream
+-- transactions
+box.begin()
+box.commit()
+test_run:switch("default")
+stream_1_1:commit()
+stream_1_2:commit()
+stream_2:commit()
+
+-- Check unsupported requests
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+-- Begin, commit and rollback supported only for streams
+conn:_request(net_box._method.begin, nil, nil, nil)
+conn:_request(net_box._method.commit, nil, nil, nil)
+conn:_request(net_box._method.rollback, nil, nil, nil)
+-- Not all requests supported by stream.
+stream = conn:new_stream()
+-- Start transaction to allocate stream object on the
+-- server side
+stream:begin()
+IPROTO_REQUEST_TYPE = 0x00
+IPROTO_SYNC = 0x01
+IPROTO_AUTH = 7
+IPROTO_STREAM_ID = 0x0a
+next_request_id = 9
+test_run:cmd("setopt delimiter ';'")
+header = msgpack.encode({
+ [IPROTO_REQUEST_TYPE] = IPROTO_AUTH,
+ [IPROTO_SYNC] = next_request_id,
+ [IPROTO_STREAM_ID] = 1,
+});
+body = msgpack.encode({nil});
+size = msgpack.encode(header:len() + body:len());
+conn._transport.perform_request(nil, nil, false, net_box._method.inject,
+ nil, nil, nil, nil,
+ size .. header .. body);
+test_run:cmd("setopt delimiter ''");
conn:close()
-- Check that spaces in stream object updates, during reload_schema
@@ -178,5 +237,1148 @@ assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
test_run:switch("default")
test_run:cmd("stop server test")
+-- Second argument (false is a value for memtx_use_mvcc_engine option)
+-- Server start without active transaction manager, so all transaction
+-- fails because of yeild!
+test_run:cmd("start server test with args='10, false'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = false
+stream:begin()
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 1)
+test_run:switch('default')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space:select{}
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:commit()
+-- Select is empty, transaction was aborted
+space:select{}
+-- Check that after failed transaction commit we able to start next
+-- transaction (it's strange check, but it's necessary because it was
+-- bug with it)
+stream:begin()
+stream:ping()
+stream:commit()
+-- Same checks for `call` end `eval` functions.
+stream:call('box.begin')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:eval('box.commit()')
+-- Select is empty, transaction was aborted
+space:select{}
+
+-- Same checks for `execute` function which can also
+-- begin and commit transaction.
+stream:execute('START TRANSACTION')
+stream:call('s:replace', {{1}})
+-- Select is empty, because memtx_use_mvcc_engine is false
+space:select({})
+stream:call('s:select', {})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+-- Commit fails, transaction yeild with memtx_use_mvcc_engine = false
+stream:execute('COMMIT')
+-- Select is empty, transaction was aborted
+space:select{}
+
+test_run:switch('test')
+s:drop()
+-- Check that there are no streams and messages, which
+-- was not deleted
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+stream:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Next we check transactions only for memtx with
+-- memtx_use_mvcc_engine = true and for vinyl, because
+-- if memtx_use_mvcc_engine = false all transactions fails,
+-- as we can see before!
+
+-- Second argument (true is a value for memtx_use_mvcc_engine option)
+-- Same test case as previous but server start with active transaction
+-- manager. Also check vinyl, because it's behaviour is same.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s1:create_index('primary')
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- Spaces getting from connection, not from stream has no stream_id
+-- and not belongs to stream
+space_1_no_stream = conn.space.test_1
+space_2_no_stream = conn.space.test_2
+-- Check syncronious stream txn requests for memtx
+-- with memtx_use_mvcc_engine = true and to vinyl:
+-- behaviour is same!
+stream_1:begin()
+space_1:replace({1})
+stream_2:begin()
+space_2:replace({1})
+test_run:switch('test')
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 2)
+test_run:switch('default')
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_1_no_stream:select{}
+space_2_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+space_2:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+-- Commit was successful, transaction can yeild with
+-- memtx_use_mvcc_engine = true. Vinyl transactions
+-- can yeild also.
+stream_1:commit()
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_1:select{}
+space_2:select{}
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Simple read/write conflict.
+space_1_1:select({1})
+space_1_2:select({1})
+space_1_1:replace({1, 1})
+space_1_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_1_1:select({})
+space_1_2:select({})
+
+-- Same test for vinyl sapce
+stream_1:begin()
+stream_2:begin()
+space_2_1:select({1})
+space_2_2:select({1})
+space_2_1:replace({1, 1})
+space_2_2:replace({1, 2})
+stream_1:commit()
+-- This transaction fails, because of conflict
+stream_2:commit()
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after commit
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+-- Here we must accept [1, 1]
+space_2_1:select({})
+space_2_2:select({})
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback as a command for memtx and vinyl spaces
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+-- Test rollback for memtx space
+space_1:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_1:select({})
+stream_1:rollback()
+-- Select is empty, transaction rollback
+space_1:select({})
+
+-- Test rollback for vinyl space
+space_2:replace({1})
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space_2:select({})
+stream_2:rollback()
+-- Select is empty, transaction rollback
+space_2:select({})
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after rollback
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+
+-- This is simple test is necessary because i have a bug
+-- with halting stream after rollback
+stream_1:begin()
+stream_1:commit()
+stream_2:begin()
+stream_2:commit()
+conn:close()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+conn:close()
+
+test_run:switch("test")
+-- Empty selects, transaction was rollback
+s1:select()
+s2:select()
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check rollback on disconnect with big count of async requests
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+
+space_1:replace({1})
+space_1:replace({2})
+-- Select return two previously inserted tuples
+space_1:select({})
+
+space_2:replace({1})
+space_2:replace({2})
+-- Select return two previously inserted tuples
+space_2:select({})
+-- We send a large number of asynchronous requests,
+-- their result is not important to us, it is important
+-- that they will be in the stream queue at the time of
+-- the disconnect.
+test_run:cmd("setopt delimiter ';'")
+for i = 1, 1000 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select was empty, transaction rollbacked
+s1:select()
+s2:select()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Same test, but now we check that if `commit` was received
+-- by server before connection closed, we processed it successful.
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+stream_1:begin()
+stream_2:begin()
+test_run:cmd("setopt delimiter ';'")
+-- Here, for a large number of messages, we cannot guarantee their processing,
+-- since if the net_msg_max limit is reached, we will stop processing incoming
+-- requests, and after close, we will discard all raw data. '100' is the number
+-- of messages that we can process without reaching net_msg_max. We will not try
+-- any more, so as not to make a test flaky.
+for i = 1, 100 do
+ space_1:replace({i}, {is_async = true})
+ space_2:replace({i}, {is_async = true})
+end;
+test_run:cmd("setopt delimiter ''");
+_ = stream_1:commit({is_async = true})
+_ = stream_2:commit({is_async = true})
+fiber.sleep(0)
+conn:close()
+
+test_run:switch("test")
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+test_run:cmd("setopt delimiter ';'")
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0
+end);
+test_run:wait_cond(function ()
+ return errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0
+end);
+test_run:cmd("setopt delimiter ''");
+-- Select return tuples from [1] to [100],
+-- transaction was commit
+rc1 = s1:select()
+rc2 = s2:select()
+assert(#rc1)
+assert(#rc2)
+s1:truncate()
+s2:truncate()
+test_run:switch("default")
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+-- Reconnect
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+-- We can begin new transactions with same stream_id, because
+-- previous one was rollbacked and destroyed.
+stream_1:begin()
+stream_2:begin()
+-- Two empty selects
+space_1:select({})
+space_2:select({})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Both select are empty, because transaction rollback
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check that all requests between `begin` and `commit`
+-- have correct lsn and tsn values. During my work on the
+-- patch, i see that all requests in stream comes with
+-- header->is_commit == true, so if we are in transaction
+-- in stream we should set this value to false, otherwise
+-- during recovering `wal_stream_apply_dml_row` fails, because
+-- of LSN/TSN mismatch. Here is a special test case for it.
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'memtx' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1 = stream_1.space.test_1
+space_2 = stream_2.space.test_2
+
+stream_1:begin()
+stream_2:begin()
+space_1:replace({1})
+space_1:replace({2})
+space_2:replace({1})
+space_2:replace({2})
+stream_1:commit()
+stream_2:commit()
+
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+s1:select{}
+-- Here we get two tuples, commit was successful
+s2:select{}
+-- Check that there are no streams and messages, which
+-- was not deleted after connection close
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+test_run:cmd("start server test with args='1, true'")
+test_run:switch('test')
+-- Here we get two tuples, commit was successful
+box.space.test_1:select{}
+-- Here we get two tuples, commit was successful
+box.space.test_2:select{}
+box.space.test_1:drop()
+box.space.test_2:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
+-- Same transactions checks for async mode
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream_1 = conn:new_stream()
+space_1 = stream_1.space.test_1
+stream_2 = conn:new_stream()
+space_2 = stream_2.space.test_2
+
+memtx_futures = {}
+memtx_futures["begin"] = stream_1:begin({is_async = true})
+memtx_futures["replace"] = space_1:replace({1}, {is_async = true})
+memtx_futures["insert"] = space_1:insert({2}, {is_async = true})
+memtx_futures["select"] = space_1:select({}, {is_async = true})
+
+vinyl_futures = {}
+vinyl_futures["begin"] = stream_2:begin({is_async = true})
+vinyl_futures["replace"] = space_2:replace({1}, {is_async = true})
+vinyl_futures["insert"] = space_2:insert({2}, {is_async = true})
+vinyl_futures["select"] = space_2:select({}, {is_async = true})
+
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s1:select()
+s2:select()
+test_run:switch('default')
+memtx_futures["commit"] = stream_1:commit({is_async = true})
+vinyl_futures["commit"] = stream_2:commit({is_async = true})
+
+memtx_results = wait_and_return_results(memtx_futures)
+vinyl_results = wait_and_return_results(vinyl_futures)
+-- If begin was successful it return nil
+assert(not memtx_results["begin"])
+assert(not vinyl_results["begin"])
+-- [1]
+assert(memtx_results["replace"])
+assert(vinyl_results["replace"])
+-- [2]
+assert(memtx_results["insert"])
+assert(vinyl_results["insert"])
+-- [1] [2]
+assert(memtx_results["select"])
+assert(vinyl_results["select"])
+-- If commit was successful it return nil
+assert(not memtx_results["commit"])
+assert(not vinyl_results["commit"])
+
+test_run:switch("test")
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Check conflict resolution in stream transactions,
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+
+test_run:switch("test")
+s1 = box.schema.space.create('test_1', { engine = 'memtx' })
+_ = s1:create_index('primary')
+s2 = box.schema.space.create('test_2', { engine = 'vinyl' })
+_ = s2:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+stream_1 = conn:new_stream()
+stream_2 = conn:new_stream()
+space_1_1 = stream_1.space.test_1
+space_1_2 = stream_2.space.test_1
+space_2_1 = stream_1.space.test_2
+space_2_2 = stream_2.space.test_2
+
+futures_1 = {}
+-- Simple read/write conflict.
+futures_1["begin_1"] = stream_1:begin({is_async = true})
+futures_1["begin_2"] = stream_2:begin({is_async = true})
+futures_1["select_1_1"] = space_1_1:select({1}, {is_async = true})
+futures_1["select_1_2"] = space_1_2:select({1}, {is_async = true})
+futures_1["replace_1_1"] = space_1_1:replace({1, 1}, {is_async = true})
+futures_1["replace_1_2"] = space_1_2:replace({1, 2}, {is_async = true})
+futures_1["commit_1"] = stream_1:commit({is_async = true})
+futures_1["commit_2"] = stream_2:commit({is_async = true})
+futures_1["select_1_1_A"] = space_1_1:select({}, {is_async = true})
+futures_1["select_1_2_A"] = space_1_2:select({}, {is_async = true})
+
+results_1 = wait_and_return_results(futures_1)
+-- Successful begin return nil
+assert(not results_1["begin_1"])
+assert(not results_1["begin_2"])
+-- []
+assert(not results_1["select_1_1"][1])
+assert(not results_1["select_1_2"][1])
+-- [1]
+assert(results_1["replace_1_1"][1])
+-- [1]
+assert(results_1["replace_1_1"][2])
+-- [1]
+assert(results_1["replace_1_2"][1])
+-- [2]
+assert(results_1["replace_1_2"][2])
+-- Successful commit return nil
+assert(not results_1["commit_1"])
+-- Error because of transaction conflict
+assert(results_1["commit_2"])
+-- [1, 1]
+assert(results_1["select_1_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_1_2_A"][1]
+
+futures_2 = {}
+-- Simple read/write conflict.
+futures_2["begin_1"] = stream_1:begin({is_async = true})
+futures_2["begin_2"] = stream_2:begin({is_async = true})
+futures_2["select_2_1"] = space_2_1:select({1}, {is_async = true})
+futures_2["select_2_2"] = space_2_2:select({1}, {is_async = true})
+futures_2["replace_2_1"] = space_2_1:replace({1, 1}, {is_async = true})
+futures_2["replace_2_2"] = space_2_2:replace({1, 2}, {is_async = true})
+futures_2["commit_1"] = stream_1:commit({is_async = true})
+futures_2["commit_2"] = stream_2:commit({is_async = true})
+futures_2["select_2_1_A"] = space_2_1:select({}, {is_async = true})
+futures_2["select_2_2_A"] = space_2_2:select({}, {is_async = true})
+
+results_2 = wait_and_return_results(futures_2)
+-- Successful begin return nil
+assert(not results_2["begin_1"])
+assert(not results_2["begin_2"])
+-- []
+assert(not results_2["select_2_1"][1])
+assert(not results_2["select_2_2"][1])
+-- [1]
+assert(results_2["replace_2_1"][1])
+-- [1]
+assert(results_2["replace_2_1"][2])
+-- [1]
+assert(results_2["replace_2_2"][1])
+-- [2]
+assert(results_2["replace_2_2"][2])
+-- Successful commit return nil
+assert(not results_2["commit_1"])
+-- Error because of transaction conflict
+assert(results_2["commit_2"])
+-- [1, 1]
+assert(results_2["select_2_1_A"][1])
+-- commit_1 could have ended before commit_2, so
+-- here we can get both empty select and [1, 1]
+-- for results_1["select_2_2_A"][1]
+
+test_run:switch('test')
+-- Both select return tuple [1, 1], transaction commited
+s1:select()
+s2:select()
+s1:drop()
+s2:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Checks for iproto call/eval/execute in stream
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+function ping() return "pong" end
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+space_no_stream = conn.space.test
+
+-- successful begin using stream:call
+stream:call('box.begin')
+-- error: Operation is not permitted when there is an active transaction
+stream:eval('box.begin()')
+-- error: Operation is not permitted when there is an active transaction
+stream:begin()
+-- error: Operation is not permitted when there is an active transaction
+stream:execute('START TRANSACTION')
+stream:call('ping')
+stream:eval('ping()')
+-- error: Operation is not permitted when there is an active transaction
+stream:call('box.begin')
+stream:eval('box.begin()')
+-- successful commit using stream:call
+stream:call('box.commit')
+
+-- successful begin using stream:eval
+stream:eval('box.begin()')
+space:replace({1})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful commit using stream:execute
+stream:execute('COMMIT')
+-- Select return tuple, which was previously inserted,
+-- because transaction was successful
+space_no_stream:select{}
+test_run:switch("test")
+-- Select return tuple, because transaction was successful
+s:select()
+s:delete{1}
+test_run:switch('default')
+-- Check rollback using stream:call
+stream:begin()
+space:replace({2})
+-- Empty select, transaction was not commited and
+-- is not visible from requests not belonging to the
+-- transaction.
+space_no_stream:select{}
+-- Select return tuple, which was previously inserted,
+-- because this select belongs to transaction.
+space:select({})
+test_run:switch("test")
+-- Select is empty, transaction was not commited
+s:select()
+test_run:switch('default')
+--Successful rollback using stream:call
+stream:call('box.rollback')
+-- Empty selects transaction rollbacked
+space:select({})
+space_no_stream:select{}
+test_run:switch("test")
+-- Empty select transaction rollbacked
+s:select()
+s:drop()
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Simple test which demostrates that stream immediately
+-- destroyed, when no processing messages in stream and
+-- no active transaction.
+
+test_run:cmd("start server test with args='10, true'")
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+test_run:switch("test")
+s = box.schema.space.create('test', { engine = 'memtx' })
+_ = s:create_index('primary')
+test_run:switch('default')
+
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+space = stream.space.test
+for i = 1, 10 do space:replace{i} end
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+s:drop()
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+test_run:cmd("stop server test")
+
+-- Transaction tests for sql iproto requests.
+-- All this functions are copy-paste from sql/ddl.test.lua,
+-- except that they check sql transactions in streams
+test_run:cmd("setopt delimiter '$'")
+function execute_sql_string(stream, sql_string)
+ if stream then
+ stream:execute(sql_string)
+ else
+ box.execute(sql_string)
+ end
+end$
+function execute_sql_string_and_return_result(stream, sql_string)
+ if stream then
+ return pcall(stream.execute, stream, sql_string)
+ else
+ return box.execute(sql_string)
+ end
+end$
+function monster_ddl(stream)
+ local _, err1, err2, err3, err4, err5, err6
+ local stream_or_box = stream or box
+ execute_sql_string(stream, [[CREATE TABLE t1(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER);]])
+ execute_sql_string(stream, [[CREATE TABLE t2(id INTEGER PRIMARY KEY,
+ a INTEGER,
+ b INTEGER UNIQUE,
+ CONSTRAINT ck1
+ CHECK(b < 100));]])
+
+ execute_sql_string(stream, 'CREATE INDEX t1a ON t1(a);')
+ execute_sql_string(stream, 'CREATE INDEX t2a ON t2(a);')
+
+ execute_sql_string(stream, [[CREATE TABLE t_to_rename(id INTEGER PRIMARY
+ KEY, a INTEGER);]])
+
+ execute_sql_string(stream, 'DROP INDEX t2a ON t2;')
+
+ execute_sql_string(stream, 'CREATE INDEX t_to_rename_a ON t_to_rename(a);')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT ck1
+ CHECK(b > 0);]])
+
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[ALTER TABLE t_to_rename
+ RENAME TO t1;]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT
+ ck2 CHECK(a > 0);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT ck1;')
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[CREATE TABLE t1(id
+ INTEGER PRIMARY KEY);]])
+
+ execute_sql_string(stream, [[ALTER TABLE t1 ADD CONSTRAINT fk1 FOREIGN KEY
+ (a) REFERENCES t2(b);]])
+
+ execute_sql_string(stream, [[CREATE TABLE
+ trigger_catcher(id INTEGER PRIMARY
+ KEY AUTOINCREMENT);]])
+
+ execute_sql_string(stream, 'ALTER TABLE t_to_rename RENAME TO t_renamed;')
+
+ execute_sql_string(stream, 'DROP INDEX t_to_rename_a ON t_renamed;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t1t AFTER INSERT ON
+ t1 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err3 = execute_sql_string_and_return_result(stream, 'DROP TABLE t3;')
+
+ execute_sql_string(stream, [[CREATE TRIGGER t2t AFTER INSERT ON
+ t2 FOR EACH ROW
+ BEGIN
+ INSERT INTO trigger_catcher VALUES(1);
+ END; ]])
+
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[CREATE INDEX t1a
+ ON t1(a, b);]])
+
+ execute_sql_string(stream, 'TRUNCATE TABLE t1;')
+ _, err5 =
+ execute_sql_string_and_return_result(stream, 'TRUNCATE TABLE t2;')
+ _, err6 =
+ execute_sql_string_and_return_result(stream, [[TRUNCATE TABLE
+ t_does_not_exist;]])
+
+ execute_sql_string(stream, 'DROP TRIGGER t2t;')
+
+ return {'Finished ok, errors in the middle: ', err1, err2, err3, err4,
+ err5, err6}
+end$
+function monster_ddl_cmp_res(res1, res2)
+ if json.encode(res1) == json.encode(res2) then
+ return true
+ end
+ return res1, res2
+end$
+function monster_ddl_is_clean(stream)
+ local stream_or_box = stream or box
+ assert(stream_or_box.space.T1 == nil)
+ assert(stream_or_box.space.T2 == nil)
+ assert(stream_or_box.space._trigger:count() == 0)
+ assert(stream_or_box.space._fk_constraint:count() == 0)
+ assert(stream_or_box.space._ck_constraint:count() == 0)
+ assert(stream_or_box.space.T_RENAMED == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+end$
+function monster_ddl_check(stream)
+ local _, err1, err2, err3, err4, res
+ local stream_or_box = stream or box
+ _, err1 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES (1, 1, 101)]])
+ execute_sql_string(stream, 'INSERT INTO t2 VALUES (1, 1, 1)')
+ _, err2 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t2
+ VALUES(2, 2, 1)]])
+ _, err3 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, 20, 1)]])
+ _, err4 =
+ execute_sql_string_and_return_result(stream, [[INSERT INTO t1
+ VALUES(1, -1, 1)]])
+ execute_sql_string(stream, 'INSERT INTO t1 VALUES (1, 1, 1)')
+ if not stream then
+ assert(stream_or_box.space.T_RENAMED ~= nil)
+ assert(stream_or_box.space.T_RENAMED.index.T_TO_RENAME_A == nil)
+ assert(stream_or_box.space.T_TO_RENAME == nil)
+ res = execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ else
+ _, res =
+ execute_sql_string_and_return_result(stream, [[SELECT * FROM
+ trigger_catcher]])
+ end
+ return {'Finished ok, errors and trigger catcher content: ', err1, err2,
+ err3, err4, res}
+end$
+function monster_ddl_clear(stream)
+ execute_sql_string(stream, 'DROP TRIGGER IF EXISTS t1t;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS trigger_catcher;')
+ execute_sql_string(stream, 'ALTER TABLE t1 DROP CONSTRAINT fk1;')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t2')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t1')
+ execute_sql_string(stream, 'DROP TABLE IF EXISTS t_renamed')
+end$
+test_run:cmd("setopt delimiter ''")$
+
+test_run:cmd("start server test with args='10, true'")
+test_run:switch('test')
+test_run:cmd("setopt delimiter '$'")
+function monster_ddl_is_clean()
+ if not (box.space.T1 == nil) or
+ not (box.space.T2 == nil) or
+ not (box.space._trigger:count() == 0) or
+ not (box.space._fk_constraint:count() == 0) or
+ not (box.space._ck_constraint:count() == 0) or
+ not (box.space.T_RENAMED == nil) or
+ not (box.space.T_TO_RENAME == nil) then
+ return false
+ end
+ return true
+end$
+test_run:cmd("setopt delimiter ''")$
+test_run:switch('default')
+
+server_addr = test_run:cmd("eval test 'return box.cfg.listen'")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+
+-- No txn.
+true_ddl_res = monster_ddl()
+true_ddl_res
+
+true_check_res = monster_ddl_check()
+true_check_res
+
+monster_ddl_clear()
+monster_ddl_is_clean()
+
+-- Both DDL and cleanup in one txn in stream.
+ddl_res = nil
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+-- DDL in txn, cleanup is not.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+
+-- DDL is not in txn, cleanup is.
+ddl_res = monster_ddl(stream)
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+-- DDL and cleanup in separate txns.
+stream:execute('START TRANSACTION')
+ddl_res = monster_ddl(stream)
+stream:execute('COMMIT')
+monster_ddl_cmp_res(ddl_res, true_ddl_res)
+
+check_res = monster_ddl_check(stream)
+monster_ddl_cmp_res(check_res, true_check_res)
+
+stream:execute('START TRANSACTION')
+monster_ddl_clear(stream)
+stream:call('monster_ddl_is_clean')
+stream:execute('COMMIT')
+
+test_run:switch("test")
+-- All messages was processed, so stream object was immediately
+-- deleted, because no active transaction started.
+errinj = box.error.injection
+assert(errinj.get('ERRINJ_IPROTO_STREAM_COUNT') == 0)
+assert(errinj.get('ERRINJ_IPROTO_STREAM_MSG_COUNT') == 0)
+test_run:switch('default')
+conn:close()
+test_run:wait_cond(function () return get_current_connection_count() == 0 end)
+
+
+-- Check for prepare and unprepare functions
+conn = net_box.connect(server_addr)
+assert(conn:ping())
+stream = conn:new_stream()
+
+stream:execute('CREATE TABLE test (id INT PRIMARY KEY, a NUMBER, b TEXT)')
+-- reload schema
+stream:ping()
+space = stream.space.TEST
+assert(space ~= nil)
+stream:execute('START TRANSACTION')
+space:replace{1, 2, '3'}
+space:select()
+-- select is empty, because transaction was not commited
+conn.space.TEST:select()
+stream_pr = stream:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+conn_pr = conn:prepare("SELECT * FROM test WHERE id = ? AND a = ?;")
+assert(stream_pr.stmt_id == conn_pr.stmt_id)
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- empty select, transaction was not commited
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:execute('COMMIT')
+-- [ 1, 2, '3' ]
+stream:execute(stream_pr.stmt_id, {1, 2})
+-- [ 1, 2, '3' ]
+conn:execute(conn_pr.stmt_id, {1, 2})
+stream:unprepare(stream_pr.stmt_id)
+conn:close()
+test_run:switch('test')
+-- [ 1, 2, '3' ]
+box.space.TEST:select()
+box.space.TEST:drop()
+test_run:switch('default')
+test_run:cmd("stop server test")
+
test_run:cmd("cleanup server test")
test_run:cmd("delete server test")
--
2.20.1
^ permalink raw reply [flat|nested] 14+ messages in thread
* Re: [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box
2021-08-11 8:56 ` [Tarantool-patches] [PATCH v3 8/8] net.box: add interactive transaction support in net.box mechanik20051988 via Tarantool-patches
@ 2021-08-11 12:47 ` Vladimir Davydov via Tarantool-patches
0 siblings, 0 replies; 14+ messages in thread
From: Vladimir Davydov via Tarantool-patches @ 2021-08-11 12:47 UTC (permalink / raw)
To: mechanik20051988; +Cc: tarantool-patches, v.shpilevoy, mechanik20051988
On Wed, Aug 11, 2021 at 11:56:58AM +0300, mechanik20051988 wrote:
> From: mechanik20051988 <mechanik20.05.1988@gmail.com>
>
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly.
>
> Closes #5860
>
> @TarantoolBot document
> Title: add interactive transaction support in net.box
> Implement `begin`, `commit` and `rollback` methods for stream object
> in `net.box`, which allows to begin, commit and rollback transaction
> accordingly. Now there are multiple ways to begin, commit and rollback
> transaction from `net.box`: using appropriate stream methods, using 'call`
> or 'eval' methods or using `execute` method with sql transaction syntax.
> User can mix these methods, for example, start transaction using
> `stream:begin()`, and commit transaction using `stream:call('box.commit')`
> or stream:execute('COMMIT').
> Simple example of using interactive transactions via iproto from net.box:
> ```lua
> stream = conn:new_stream()
> space = stream.space.test
> space_not_from_stream = conn.space.test
>
> stream:begin()
> space:replace({1})
> -- return previously inserted tuple, because request
> -- belongs to transaction.
> space:select({})
> -- empty select, because select doesn't belongs to
> -- transaction
> space_not_from_stream:select({})
> stream:call('box.commit')
> -- now transaction was commited, so all requests
> -- returns tuple.
> ```
> Different examples of using streams you can find in
> gh-5860-implement-streams-in-iproto.test.lua
> ---
> .../gh-5860-implement-streams-in-iproto.md | 26 +
> src/box/lua/net_box.c | 49 +-
> src/box/lua/net_box.lua | 35 +-
> test/box/stream.result | 3558 +++++++++++++++--
> test/box/stream.test.lua | 1202 ++++++
Please add a separate test file for this (net.box.transactions or
something like this).
> 5 files changed, 4554 insertions(+), 316 deletions(-)
> create mode 100644 changelogs/unreleased/gh-5860-implement-streams-in-iproto.md
>
> diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
> index 3dffc245f..745a8c0f5 100644
> --- a/src/box/lua/net_box.lua
> +++ b/src/box/lua/net_box.lua
> @@ -51,8 +51,11 @@ local M_GET = 13
> local M_MIN = 14
> local M_MAX = 15
> local M_COUNT = 16
> +local M_BEGIN = 17
> +local M_COMMIT = 18
> +local M_ROLLBACK = 19
> -- Injects raw data into connection. Used by console and tests.
> -local M_INJECT = 17
> +local M_INJECT = 20
>
> -- utility tables
> local is_final_state = {closed = 1, error = 1}
> @@ -754,11 +757,38 @@ local function new_stream(stream)
> return stream._conn:new_stream()
> end
>
> +local function begin(stream, opts)
stream_begin
> + check_remote_arg(stream, 'begin')
> + local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
> + if opts and opts.is_async then
> + return res
> + end
> +end
> +
> +local function commit(stream, opts)
stream_commit
> + check_remote_arg(stream, 'commit')
> + local res = stream:_request(M_COMMIT, opts, nil, stream._stream_id)
> + if opts and opts.is_async then
> + return res
> + end
> +end
> +
> +local function rollback(stream, opts)
stream_rollback
> + check_remote_arg(stream, 'rollback')
> + local res = stream:_request(M_ROLLBACK, opts, nil, stream._stream_id)
> + if opts and opts.is_async then
> + return res
> + end
> +end
> +
> function remote_methods:new_stream()
> check_remote_arg(self, 'new_stream')
> self._last_stream_id = self._last_stream_id + 1
> local stream = setmetatable({
> new_stream = new_stream,
> + begin = begin,
> + commit = commit,
> + rollback = rollback,
> _stream_id = self._last_stream_id,
> space = setmetatable({
> _stream_space_cache = {},
^ permalink raw reply [flat|nested] 14+ messages in thread