* [Tarantool-patches] [PATCH 01/15] Update license file (2020)
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple Aleksandr Lyapunov
` (14 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
LICENSE | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/LICENSE b/LICENSE
index d797c05..734ba6c 100644
--- a/LICENSE
+++ b/LICENSE
@@ -1,4 +1,4 @@
-Copyright 2010-2016 Tarantool AUTHORS: please see AUTHORS file.
+Copyright 2010-2020 Tarantool AUTHORS: please see AUTHORS file.
Redistribution and use in source and binary forms, with or
without modification, are permitted provided that the following
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 01/15] Update license file (2020) Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:03 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 03/15] tx: introduce dirty tuples Aleksandr Lyapunov
` (13 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
data_offset member of tuple is uint16_t now. At the same time
this field is calculated from field_map_size which is uint32_t.
That could lead to overflows and crashes.
Fixes #5084
---
src/box/errcode.h | 1 +
src/box/memtx_engine.c | 19 ++++++++-----
src/box/tuple.c | 11 ++++++--
src/box/vy_stmt.c | 8 ++++++
test/box/error.result | 1 +
test/box/huge_field_map.result | 49 +++++++++++++++++++++++++++++++++
test/box/huge_field_map.test.lua | 22 +++++++++++++++
test/box/huge_field_map_long.result | 51 +++++++++++++++++++++++++++++++++++
test/box/huge_field_map_long.test.lua | 28 +++++++++++++++++++
test/box/suite.ini | 1 +
10 files changed, 183 insertions(+), 8 deletions(-)
create mode 100644 test/box/huge_field_map.result
create mode 100644 test/box/huge_field_map.test.lua
create mode 100644 test/box/huge_field_map_long.result
create mode 100644 test/box/huge_field_map_long.test.lua
diff --git a/src/box/errcode.h b/src/box/errcode.h
index d1e4d02..938d411 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -266,6 +266,7 @@ struct errcode_record {
/*211 */_(ER_WRONG_QUERY_ID, "Prepared statement with id %u does not exist") \
/*212 */_(ER_SEQUENCE_NOT_STARTED, "Sequence '%s' is not started") \
/*213 */_(ER_NO_SUCH_SESSION_SETTING, "Session setting %s doesn't exist") \
+ /*214 */_(ER_TUPLE_METADATA_IS_TOO_BIG, "Can't create tuple: metadata size %u is too big") \
/*
* !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index 6ce8cac..b5b6b14 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1125,6 +1125,18 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
if (tuple_field_map_create(format, data, true, &builder) != 0)
goto end;
uint32_t field_map_size = field_map_build_size(&builder);
+ /*
+ * Data offset is calculated from the begin of the struct
+ * tuple base, not from memtx_tuple, because the struct
+ * tuple is not the first field of the memtx_tuple.
+ */
+ uint32_t data_offset = sizeof(struct tuple) + field_map_size;
+ if (data_offset > UINT16_MAX) {
+ /** tuple->data_offset is 16 bits */
+ diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
+ data_offset);
+ goto end;
+ }
size_t tuple_len = end - data;
size_t total = sizeof(struct memtx_tuple) + field_map_size + tuple_len;
@@ -1157,12 +1169,7 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
tuple->bsize = tuple_len;
tuple->format_id = tuple_format_id(format);
tuple_format_ref(format);
- /*
- * Data offset is calculated from the begin of the struct
- * tuple base, not from memtx_tuple, because the struct
- * tuple is not the first field of the memtx_tuple.
- */
- tuple->data_offset = sizeof(struct tuple) + field_map_size;
+ tuple->data_offset = data_offset;
char *raw = (char *) tuple + tuple->data_offset;
field_map_build(&builder, raw - field_map_size);
memcpy(raw, data, tuple_len);
diff --git a/src/box/tuple.c b/src/box/tuple.c
index 1f52a8c..e48ee08 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -83,6 +83,13 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
if (tuple_field_map_create(format, data, true, &builder) != 0)
goto end;
uint32_t field_map_size = field_map_build_size(&builder);
+ uint32_t data_offset = sizeof(struct tuple) + field_map_size;
+ if (data_offset > UINT16_MAX) {
+ /** tuple->data_offset is 16 bits */
+ diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
+ data_offset);
+ goto end;
+ }
size_t data_len = end - data;
size_t total = sizeof(struct tuple) + field_map_size + data_len;
@@ -97,8 +104,8 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
tuple->bsize = data_len;
tuple->format_id = tuple_format_id(format);
tuple_format_ref(format);
- tuple->data_offset = sizeof(struct tuple) + field_map_size;
- char *raw = (char *) tuple + tuple->data_offset;
+ tuple->data_offset = data_offset;
+ char *raw = (char *) tuple + data_offset;
field_map_build(&builder, raw - field_map_size);
memcpy(raw, data, data_len);
say_debug("%s(%zu) = %p", __func__, data_len, tuple);
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 392f3da..f59c418 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -159,6 +159,14 @@ static struct tuple *
vy_stmt_alloc(struct tuple_format *format, uint32_t data_offset, uint32_t bsize)
{
assert(data_offset >= sizeof(struct vy_stmt) + format->field_map_size);
+
+ if (data_offset > UINT16_MAX) {
+ /** tuple->data_offset is 16 bits */
+ diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
+ data_offset);
+ return NULL;
+ }
+
struct vy_stmt_env *env = format->engine;
uint32_t total_size = data_offset + bsize;
if (unlikely(total_size > env->max_tuple_size)) {
diff --git a/test/box/error.result b/test/box/error.result
index 2196fa5..a166824 100644
--- a/test/box/error.result
+++ b/test/box/error.result
@@ -432,6 +432,7 @@ t;
| 211: box.error.WRONG_QUERY_ID
| 212: box.error.SEQUENCE_NOT_STARTED
| 213: box.error.NO_SUCH_SESSION_SETTING
+ | 214: box.error.TUPLE_METADATA_IS_TOO_BIG
| ...
test_run:cmd("setopt delimiter ''");
diff --git a/test/box/huge_field_map.result b/test/box/huge_field_map.result
new file mode 100644
index 0000000..11b4da3
--- /dev/null
+++ b/test/box/huge_field_map.result
@@ -0,0 +1,49 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+s = box.schema.space.create('test', {engine = 'memtx'})
+ | ---
+ | ...
+i1 = s:create_index('pk')
+ | ---
+ | ...
+i2 = s:create_index('mk', {parts={{'[2][*]', 'uint'}}})
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function test()
+ local t = {1, {}}
+ for i = 1,65536 do
+ table.insert(t[2], i)
+ if (i % 4096 == 0) then
+ s:replace(t)
+ end
+ end
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+pcall(test) -- must fail but not crash
+ | ---
+ | - false
+ | - 'Can''t create tuple: metadata size 65558 is too big'
+ | ...
+
+test = nil
+ | ---
+ | ...
+s:drop()
+ | ---
+ | ...
diff --git a/test/box/huge_field_map.test.lua b/test/box/huge_field_map.test.lua
new file mode 100644
index 0000000..9042751
--- /dev/null
+++ b/test/box/huge_field_map.test.lua
@@ -0,0 +1,22 @@
+env = require('test_run')
+test_run = env.new()
+
+s = box.schema.space.create('test', {engine = 'memtx'})
+i1 = s:create_index('pk')
+i2 = s:create_index('mk', {parts={{'[2][*]', 'uint'}}})
+test_run:cmd("setopt delimiter ';'")
+function test()
+ local t = {1, {}}
+ for i = 1,65536 do
+ table.insert(t[2], i)
+ if (i % 4096 == 0) then
+ s:replace(t)
+ end
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+pcall(test) -- must fail but not crash
+
+test = nil
+s:drop()
\ No newline at end of file
diff --git a/test/box/huge_field_map_long.result b/test/box/huge_field_map_long.result
new file mode 100644
index 0000000..d7971ae
--- /dev/null
+++ b/test/box/huge_field_map_long.result
@@ -0,0 +1,51 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+s = box.schema.space.create('test', {engine = 'memtx'})
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ';'")
+ | ---
+ | - true
+ | ...
+function test()
+ local t = {}
+ local k = {}
+ for i = 1,128 do
+ local parts = {}
+ for j = 0,127 do
+ table.insert(parts, {i * 128 - j, 'uint'})
+ table.insert(t, 1)
+ end
+ if i == 1 then k = table.deepcopy(t) end
+ s:create_index('test'..i, {parts = parts})
+ if i % 16 == 0 then
+ s:replace(t)
+ s:delete(k)
+ end
+ end
+end;
+ | ---
+ | ...
+test_run:cmd("setopt delimiter ''");
+ | ---
+ | - true
+ | ...
+
+pcall(test) -- must fail but not crash
+ | ---
+ | - false
+ | - 'Can''t create tuple: metadata size 65542 is too big'
+ | ...
+
+test = nil
+ | ---
+ | ...
+s:drop()
+ | ---
+ | ...
diff --git a/test/box/huge_field_map_long.test.lua b/test/box/huge_field_map_long.test.lua
new file mode 100644
index 0000000..6415615
--- /dev/null
+++ b/test/box/huge_field_map_long.test.lua
@@ -0,0 +1,28 @@
+env = require('test_run')
+test_run = env.new()
+
+s = box.schema.space.create('test', {engine = 'memtx'})
+test_run:cmd("setopt delimiter ';'")
+function test()
+ local t = {}
+ local k = {}
+ for i = 1,128 do
+ local parts = {}
+ for j = 0,127 do
+ table.insert(parts, {i * 128 - j, 'uint'})
+ table.insert(t, 1)
+ end
+ if i == 1 then k = table.deepcopy(t) end
+ s:create_index('test'..i, {parts = parts})
+ if i % 16 == 0 then
+ s:replace(t)
+ s:delete(k)
+ end
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+pcall(test) -- must fail but not crash
+
+test = nil
+s:drop()
\ No newline at end of file
diff --git a/test/box/suite.ini b/test/box/suite.ini
index de8f5a7..801a91e 100644
--- a/test/box/suite.ini
+++ b/test/box/suite.ini
@@ -3,6 +3,7 @@ core = tarantool
description = Database tests
script = box.lua
disabled = rtree_errinj.test.lua tuple_bench.test.lua
+long_run = huge_field_map_long.test.lua
config = engine.cfg
release_disabled = errinj.test.lua errinj_index.test.lua rtree_errinj.test.lua upsert_errinj.test.lua iproto_stress.test.lua gh-4648-func-load-unload.test.lua
lua_libs = lua/fifo.lua lua/utils.lua lua/bitset.lua lua/index_random_test.lua lua/push.lua lua/identifier.lua
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple Aleksandr Lyapunov
@ 2020-07-05 17:03 ` Vladislav Shpilevoy
2020-07-06 13:39 ` Aleksandr Lyapunov
0 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:03 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
Thanks for the patch!
See 3 comments below.
> diff --git a/test/box/huge_field_map.result b/test/box/huge_field_map.result
> new file mode 100644
> index 0000000..11b4da3
> --- /dev/null
> +++ b/test/box/huge_field_map.result
> @@ -0,0 +1,49 @@
> +-- test-run result file version 2
> +env = require('test_run')
> + | ---
> + | ...
> +test_run = env.new()
> + | ---
> + | ...
> +
> +s = box.schema.space.create('test', {engine = 'memtx'})
> + | ---
> + | ...
> +i1 = s:create_index('pk')
> + | ---
> + | ...
> +i2 = s:create_index('mk', {parts={{'[2][*]', 'uint'}}})
1. Why do you need a multikey index? This test lacks a comment.
> + | ---
> + | ...
> +test_run:cmd("setopt delimiter ';'")
> + | ---
> + | - true
> + | ...
> diff --git a/test/box/huge_field_map_long.result b/test/box/huge_field_map_long.result
> new file mode 100644
> index 0000000..d7971ae
> --- /dev/null
> +++ b/test/box/huge_field_map_long.result
> @@ -0,0 +1,51 @@
> +-- test-run result file version 2
> +env = require('test_run')
> + | ---
> + | ...
> +test_run = env.new()
> + | ---
> + | ...
> +
> +s = box.schema.space.create('test', {engine = 'memtx'})
2. I saw you also fixed vinyl and runtime tuples. I suppose
you need to test them as well.
> + | ---
> + | ...
> +test_run:cmd("setopt delimiter ';'")
> + | ---
> + | - true
> + | ...
> +function test()
> + local t = {}
> + local k = {}
> + for i = 1,128 do
> + local parts = {}
> + for j = 0,127 do
> + table.insert(parts, {i * 128 - j, 'uint'})
> + table.insert(t, 1)
> + end
> + if i == 1 then k = table.deepcopy(t) end
> + s:create_index('test'..i, {parts = parts})
> + if i % 16 == 0 then
> + s:replace(t)
> + s:delete(k)
> + end
> + end
> +end;
> + | ---
> + | ...
> +test_run:cmd("setopt delimiter ''");
> + | ---
> + | - true
> + | ...
> +
> +pcall(test) -- must fail but not crash
> + | ---
> + | - false
> + | - 'Can''t create tuple: metadata size 65542 is too big'
> + | ...
> +
> +test = nil
> + | ---
> + | ...
> +s:drop()
> + | ---
> + | ...
> diff --git a/test/box/huge_field_map_long.test.lua b/test/box/huge_field_map_long.test.lua
> new file mode 100644
3. Why do you need a 'long' test? The first test seems to be good enough.
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple
2020-07-05 17:03 ` Vladislav Shpilevoy
@ 2020-07-06 13:39 ` Aleksandr Lyapunov
0 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-06 13:39 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
Thank for the review.
On 05.07.2020 20:03, Vladislav Shpilevoy wrote:
> +i1 = s:create_index('pk')
>> + | ---
>> + | ...
>> +i2 = s:create_index('mk', {parts={{'[2][*]', 'uint'}}})
> 1. Why do you need a multikey index? This test lacks a comment.
Mutlikey index creates a variadic-length offset table in a tuple.
That is a simplest way to overflow it.
Sure, I'll add a comment.
>> diff --git a/test/box/huge_field_map_long.test.lua b/test/box/huge_field_map_long.test.lua
>> new file mode 100644
> 3. Why do you need a 'long' test? The first test seems to be good enough.
It's just another way to overflow offset table. For example 1.10 does not
have multikey indexes, so only long test will crash it. And perhaps
we'll merge
into 1.10.
There's also a chance that we'll fix
https://github.com/tarantool/tarantool/issues/5085
and multikey test will not work.
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 03/15] tx: introduce dirty tuples
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 01/15] Update license file (2020) Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 02/15] Check data_offset overflow in struct tuple Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:04 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 04/15] vinyl: rename tx_manager -> vy_tx_manager Aleksandr Lyapunov
` (12 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/memtx_engine.c | 4 ++--
src/box/tuple.c | 4 ++--
src/box/tuple.h | 8 +++++++-
src/box/tuple_format.c | 4 ++--
src/box/vy_stmt.c | 4 ++--
test/box/huge_field_map.result | 2 +-
test/box/huge_field_map_long.result | 2 +-
7 files changed, 17 insertions(+), 11 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index b5b6b14..af80d9e 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -1131,8 +1131,8 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end)
* tuple is not the first field of the memtx_tuple.
*/
uint32_t data_offset = sizeof(struct tuple) + field_map_size;
- if (data_offset > UINT16_MAX) {
- /** tuple->data_offset is 16 bits */
+ if (data_offset > INT16_MAX) {
+ /** tuple->data_offset is 15 bits */
diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
data_offset);
goto end;
diff --git a/src/box/tuple.c b/src/box/tuple.c
index e48ee08..c599b30 100644
--- a/src/box/tuple.c
+++ b/src/box/tuple.c
@@ -84,8 +84,8 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end
goto end;
uint32_t field_map_size = field_map_build_size(&builder);
uint32_t data_offset = sizeof(struct tuple) + field_map_size;
- if (data_offset > UINT16_MAX) {
- /** tuple->data_offset is 16 bits */
+ if (data_offset > INT16_MAX) {
+ /** tuple->data_offset is 15 bits */
diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
data_offset);
goto end;
diff --git a/src/box/tuple.h b/src/box/tuple.h
index 9a88772..b647161 100644
--- a/src/box/tuple.h
+++ b/src/box/tuple.h
@@ -319,7 +319,13 @@ struct PACKED tuple
/**
* Offset to the MessagePack from the begin of the tuple.
*/
- uint16_t data_offset;
+ uint16_t data_offset : 15;
+ /**
+ * The tuple (if it's found in index for example) could be invisible
+ * for current transactions. The flag means that the tuple must
+ * be clarified by transaction engine.
+ */
+ bool is_dirty : 1;
/**
* Engine specific fields and offsets array concatenated
* with MessagePack fields array.
diff --git a/src/box/tuple_format.c b/src/box/tuple_format.c
index 68ec2a7..6ebc855 100644
--- a/src/box/tuple_format.c
+++ b/src/box/tuple_format.c
@@ -481,8 +481,8 @@ tuple_format_create(struct tuple_format *format, struct key_def * const *keys,
assert(tuple_format_field(format, 0)->offset_slot ==
TUPLE_OFFSET_SLOT_NIL);
size_t field_map_size = -current_slot * sizeof(uint32_t);
- if (field_map_size > UINT16_MAX) {
- /** tuple->data_offset is 16 bits */
+ if (field_map_size > INT16_MAX) {
+ /** tuple->data_offset is 15 bits */
diag_set(ClientError, ER_INDEX_FIELD_COUNT_LIMIT,
-current_slot);
return -1;
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index f59c418..837be8c 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -160,8 +160,8 @@ vy_stmt_alloc(struct tuple_format *format, uint32_t data_offset, uint32_t bsize)
{
assert(data_offset >= sizeof(struct vy_stmt) + format->field_map_size);
- if (data_offset > UINT16_MAX) {
- /** tuple->data_offset is 16 bits */
+ if (data_offset > INT16_MAX) {
+ /** tuple->data_offset is 15 bits */
diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG,
data_offset);
return NULL;
diff --git a/test/box/huge_field_map.result b/test/box/huge_field_map.result
index 11b4da3..45022cc 100644
--- a/test/box/huge_field_map.result
+++ b/test/box/huge_field_map.result
@@ -38,7 +38,7 @@ test_run:cmd("setopt delimiter ''");
pcall(test) -- must fail but not crash
| ---
| - false
- | - 'Can''t create tuple: metadata size 65558 is too big'
+ | - 'Can''t create tuple: metadata size 32790 is too big'
| ...
test = nil
diff --git a/test/box/huge_field_map_long.result b/test/box/huge_field_map_long.result
index d7971ae..cb47900 100644
--- a/test/box/huge_field_map_long.result
+++ b/test/box/huge_field_map_long.result
@@ -40,7 +40,7 @@ test_run:cmd("setopt delimiter ''");
pcall(test) -- must fail but not crash
| ---
| - false
- | - 'Can''t create tuple: metadata size 65542 is too big'
+ | - 'Can''t create tuple: metadata size 32774 is too big'
| ...
test = nil
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 04/15] vinyl: rename tx_manager -> vy_tx_manager
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (2 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 03/15] tx: introduce dirty tuples Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:04 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 05/15] tx: save txn in txn_stmt Aleksandr Lyapunov
` (11 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
Apart from other vinyl objects that are named with "vy_" prefix,
its transaction manager (tx_manager) have no such prefix.
It should have in order to avoid conflicts with global tx manager.
Needed for #4897
---
src/box/vinyl.c | 30 +++++++++++++++---------------
src/box/vy_tx.c | 49 +++++++++++++++++++++++++------------------------
src/box/vy_tx.h | 33 +++++++++++++++++----------------
3 files changed, 57 insertions(+), 55 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index fd9b7e6..6e32331 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -98,7 +98,7 @@ struct vy_env {
/** Recovery status */
enum vy_status status;
/** TX manager */
- struct tx_manager *xm;
+ struct vy_tx_manager *xm;
/** Upsert squash queue */
struct vy_squash_queue *squash_queue;
/** Memory pool for index iterator. */
@@ -267,7 +267,7 @@ vy_info_append_regulator(struct vy_env *env, struct info_handler *h)
static void
vy_info_append_tx(struct vy_env *env, struct info_handler *h)
{
- struct tx_manager *xm = env->xm;
+ struct vy_tx_manager *xm = env->xm;
info_table_begin(h, "tx");
@@ -292,7 +292,7 @@ static void
vy_info_append_memory(struct vy_env *env, struct info_handler *h)
{
info_table_begin(h, "memory");
- info_append_int(h, "tx", tx_manager_mem_used(env->xm));
+ info_append_int(h, "tx", vy_tx_manager_mem_used(env->xm));
info_append_int(h, "level0", lsregion_used(&env->mem_env.allocator));
info_append_int(h, "tuple_cache", env->cache_env.mem_used);
info_append_int(h, "page_index", env->lsm_env.page_index_size);
@@ -509,7 +509,7 @@ vinyl_engine_memory_stat(struct engine *engine, struct engine_memory_stat *stat)
stat->index += env->lsm_env.bloom_size;
stat->index += env->lsm_env.page_index_size;
stat->cache += env->cache_env.mem_used;
- stat->tx += tx_manager_mem_used(env->xm);
+ stat->tx += vy_tx_manager_mem_used(env->xm);
}
static void
@@ -517,7 +517,7 @@ vinyl_engine_reset_stat(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- struct tx_manager *xm = env->xm;
+ struct vy_tx_manager *xm = env->xm;
memset(&xm->stat, 0, sizeof(xm->stat));
vy_scheduler_reset_stat(&env->scheduler);
@@ -996,7 +996,7 @@ vinyl_space_invalidate(struct space *space)
* request bail out early, without dereferencing the space.
*/
bool unused;
- tx_manager_abort_writers_for_ddl(env->xm, space, &unused);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, space, &unused);
}
/** Argument passed to vy_check_format_on_replace(). */
@@ -1064,7 +1064,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
* be checked with on_replace trigger so we abort them.
*/
bool need_wal_sync;
- tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, space, &need_wal_sync);
if (!need_wal_sync && vy_lsm_is_empty(pk))
return 0; /* space is empty, nothing to do */
@@ -2486,7 +2486,7 @@ static void
vinyl_engine_switch_to_ro(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- tx_manager_abort_writers_for_ro(env->xm);
+ vy_tx_manager_abort_writers_for_ro(env->xm);
}
/* }}} Public API of transaction control */
@@ -2573,7 +2573,7 @@ vy_env_new(const char *path, size_t memory,
goto error_path;
}
- e->xm = tx_manager_new();
+ e->xm = vy_tx_manager_new();
if (e->xm == NULL)
goto error_xm;
e->squash_queue = vy_squash_queue_new();
@@ -2609,7 +2609,7 @@ error_lsm_env:
vy_scheduler_destroy(&e->scheduler);
vy_squash_queue_delete(e->squash_queue);
error_squash_queue:
- tx_manager_delete(e->xm);
+ vy_tx_manager_delete(e->xm);
error_xm:
free(e->path);
error_path:
@@ -2623,7 +2623,7 @@ vy_env_delete(struct vy_env *e)
vy_regulator_destroy(&e->regulator);
vy_scheduler_destroy(&e->scheduler);
vy_squash_queue_delete(e->squash_queue);
- tx_manager_delete(e->xm);
+ vy_tx_manager_delete(e->xm);
free(e->path);
mempool_destroy(&e->iterator_pool);
vy_run_env_destroy(&e->run_env);
@@ -2882,7 +2882,7 @@ vinyl_engine_end_recovery(struct engine *engine)
/*
* During recovery we skip statements that have
* been dumped to disk - see vy_is_committed() -
- * so it may turn out that tx_manager::lsn stays
+ * so it may turn out that vy_tx_manager::lsn stays
* behind the instance vclock while we need it
* to be up-to-date once recovery is complete,
* because we use it while building an index to
@@ -3706,7 +3706,7 @@ vinyl_snapshot_iterator_free(struct snapshot_iterator *base)
struct vy_lsm *lsm = it->iterator.lsm;
struct vy_env *env = vy_env(lsm->base.engine);
vy_read_iterator_close(&it->iterator);
- tx_manager_destroy_read_view(env->xm, it->rv);
+ vy_tx_manager_destroy_read_view(env->xm, it->rv);
vy_lsm_unref(lsm);
free(it);
}
@@ -3726,7 +3726,7 @@ vinyl_index_create_snapshot_iterator(struct index *base)
it->base.next = vinyl_snapshot_iterator_next;
it->base.free = vinyl_snapshot_iterator_free;
- it->rv = tx_manager_read_view(env->xm);
+ it->rv = vy_tx_manager_read_view(env->xm);
if (it->rv == NULL) {
free(it);
return NULL;
@@ -4149,7 +4149,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
* be checked with on_replace trigger so we abort them.
*/
bool need_wal_sync;
- tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
+ vy_tx_manager_abort_writers_for_ddl(env->xm, src_space, &need_wal_sync);
if (!need_wal_sync && vy_lsm_is_empty(pk))
return 0; /* space is empty, nothing to do */
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 846c632..17c79d6 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -98,13 +98,13 @@ vy_global_read_view_create(struct vy_read_view *rv, int64_t lsn)
rv->refs = 0;
}
-struct tx_manager *
-tx_manager_new(void)
+struct vy_tx_manager *
+vy_tx_manager_new(void)
{
- struct tx_manager *xm = calloc(1, sizeof(*xm));
+ struct vy_tx_manager *xm = calloc(1, sizeof(*xm));
if (xm == NULL) {
diag_set(OutOfMemory, sizeof(*xm),
- "malloc", "struct tx_manager");
+ "malloc", "struct vy_tx_manager");
return NULL;
}
@@ -128,7 +128,7 @@ tx_manager_new(void)
}
void
-tx_manager_delete(struct tx_manager *xm)
+vy_tx_manager_delete(struct vy_tx_manager *xm)
{
mempool_destroy(&xm->read_view_mempool);
mempool_destroy(&xm->read_interval_mempool);
@@ -138,7 +138,7 @@ tx_manager_delete(struct tx_manager *xm)
}
size_t
-tx_manager_mem_used(struct tx_manager *xm)
+vy_tx_manager_mem_used(struct vy_tx_manager *xm)
{
struct mempool_stats mstats;
size_t ret = 0;
@@ -157,7 +157,7 @@ tx_manager_mem_used(struct tx_manager *xm)
}
struct vy_read_view *
-tx_manager_read_view(struct tx_manager *xm)
+vy_tx_manager_read_view(struct vy_tx_manager *xm)
{
struct vy_read_view *rv;
/*
@@ -195,7 +195,8 @@ tx_manager_read_view(struct tx_manager *xm)
}
void
-tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
+vy_tx_manager_destroy_read_view(struct vy_tx_manager *xm,
+ struct vy_read_view *rv)
{
if (rv == xm->p_global_read_view)
return;
@@ -209,7 +210,7 @@ tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv)
static struct txv *
txv_new(struct vy_tx *tx, struct vy_lsm *lsm, struct vy_entry entry)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
struct txv *v = mempool_alloc(&xm->txv_mempool);
if (v == NULL) {
diag_set(OutOfMemory, sizeof(*v), "mempool", "struct txv");
@@ -234,7 +235,7 @@ txv_new(struct vy_tx *tx, struct vy_lsm *lsm, struct vy_entry entry)
static void
txv_delete(struct txv *v)
{
- struct tx_manager *xm = v->tx->xm;
+ struct vy_tx_manager *xm = v->tx->xm;
xm->write_set_size -= tuple_size(v->entry.stmt);
vy_stmt_counter_unacct_tuple(&v->lsm->stat.txw.count, v->entry.stmt);
tuple_unref(v->entry.stmt);
@@ -248,7 +249,7 @@ txv_delete(struct txv *v)
static void
vy_read_interval_acct(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
xm->read_set_size += tuple_size(interval->left.stmt);
if (interval->left.stmt != interval->right.stmt)
xm->read_set_size += tuple_size(interval->right.stmt);
@@ -260,7 +261,7 @@ vy_read_interval_acct(struct vy_read_interval *interval)
static void
vy_read_interval_unacct(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
xm->read_set_size -= tuple_size(interval->left.stmt);
if (interval->left.stmt != interval->right.stmt)
xm->read_set_size -= tuple_size(interval->right.stmt);
@@ -271,7 +272,7 @@ vy_read_interval_new(struct vy_tx *tx, struct vy_lsm *lsm,
struct vy_entry left, bool left_belongs,
struct vy_entry right, bool right_belongs)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
struct vy_read_interval *interval;
interval = mempool_alloc(&xm->read_interval_mempool);
if (interval == NULL) {
@@ -296,7 +297,7 @@ vy_read_interval_new(struct vy_tx *tx, struct vy_lsm *lsm,
static void
vy_read_interval_delete(struct vy_read_interval *interval)
{
- struct tx_manager *xm = interval->tx->xm;
+ struct vy_tx_manager *xm = interval->tx->xm;
vy_read_interval_unacct(interval);
vy_lsm_unref(interval->lsm);
tuple_unref(interval->left.stmt);
@@ -316,7 +317,7 @@ vy_tx_read_set_free_cb(vy_tx_read_set_t *read_set,
}
void
-vy_tx_create(struct tx_manager *xm, struct vy_tx *tx)
+vy_tx_create(struct vy_tx_manager *xm, struct vy_tx *tx)
{
tx->last_stmt_space = NULL;
stailq_create(&tx->log);
@@ -339,7 +340,7 @@ vy_tx_destroy(struct vy_tx *tx)
trigger_run(&tx->on_destroy, NULL);
trigger_destroy(&tx->on_destroy);
- tx_manager_destroy_read_view(tx->xm, tx->read_view);
+ vy_tx_manager_destroy_read_view(tx->xm, tx->read_view);
struct txv *v, *tmp;
stailq_foreach_entry_safe(v, tmp, &tx->log, next_in_log)
@@ -392,7 +393,7 @@ vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v)
/* already in (earlier) read view */
if (vy_tx_is_in_read_view(abort))
continue;
- struct vy_read_view *rv = tx_manager_read_view(tx->xm);
+ struct vy_read_view *rv = vy_tx_manager_read_view(tx->xm);
if (rv == NULL)
return -1;
abort->read_view = rv;
@@ -422,7 +423,7 @@ vy_tx_abort_readers(struct vy_tx *tx, struct txv *v)
}
struct vy_tx *
-vy_tx_begin(struct tx_manager *xm)
+vy_tx_begin(struct vy_tx_manager *xm)
{
struct vy_tx *tx = mempool_alloc(&xm->tx_mempool);
if (unlikely(tx == NULL)) {
@@ -662,7 +663,7 @@ vy_tx_handle_deferred_delete(struct vy_tx *tx, struct txv *v)
int
vy_tx_prepare(struct vy_tx *tx)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
if (tx->state == VINYL_TX_ABORT) {
/* Conflict is already accounted - see vy_tx_abort(). */
@@ -793,7 +794,7 @@ void
vy_tx_commit(struct vy_tx *tx, int64_t lsn)
{
assert(tx->state == VINYL_TX_COMMIT);
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
xm->stat.commit++;
@@ -833,7 +834,7 @@ vy_tx_rollback_after_prepare(struct vy_tx *tx)
{
assert(tx->state == VINYL_TX_COMMIT);
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
/*
* There are two reasons of rollback_after_prepare:
@@ -878,7 +879,7 @@ vy_tx_rollback_after_prepare(struct vy_tx *tx)
void
vy_tx_rollback(struct vy_tx *tx)
{
- struct tx_manager *xm = tx->xm;
+ struct vy_tx_manager *xm = tx->xm;
xm->stat.rollback++;
@@ -1140,7 +1141,7 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
}
void
-tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
+vy_tx_manager_abort_writers_for_ddl(struct vy_tx_manager *xm, struct space *space,
bool *need_wal_sync)
{
*need_wal_sync = false;
@@ -1166,7 +1167,7 @@ tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
}
void
-tx_manager_abort_writers_for_ro(struct tx_manager *xm)
+vy_tx_manager_abort_writers_for_ro(struct vy_tx_manager *xm)
{
struct vy_tx *tx;
rlist_foreach_entry(tx, &xm->writers, in_writers) {
diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h
index 3144c92..4fac5f6 100644
--- a/src/box/vy_tx.h
+++ b/src/box/vy_tx.h
@@ -54,7 +54,7 @@ extern "C" {
struct space;
struct tuple;
-struct tx_manager;
+struct vy_tx_manager;
struct vy_mem;
struct vy_tx;
struct vy_history;
@@ -140,16 +140,16 @@ write_set_search_key(write_set_t *tree, struct vy_lsm *lsm,
/** Transaction object. */
struct vy_tx {
- /** Link in tx_manager::writers. */
+ /** Link in vy_tx_manager::writers. */
struct rlist in_writers;
/** Transaction manager. */
- struct tx_manager *xm;
+ struct vy_tx_manager *xm;
/**
* Pointer to the space affected by the last prepared statement.
* We need it so that we can abort a transaction on DDL even
* if it hasn't inserted anything into the write set yet (e.g.
* yielded on unique check) and therefore would otherwise be
- * ignored by tx_manager_abort_writers_for_ddl().
+ * ignored by vy_tx_manager_abort_writers_for_ddl().
*/
struct space *last_stmt_space;
/**
@@ -209,7 +209,7 @@ vy_tx_read_view(struct vy_tx *tx)
}
/** Transaction manager object. */
-struct tx_manager {
+struct vy_tx_manager {
/**
* The last committed log sequence number known to
* vinyl. Updated in vy_commit().
@@ -278,24 +278,25 @@ struct tx_manager {
};
/** Allocate a tx manager object. */
-struct tx_manager *
-tx_manager_new(void);
+struct vy_tx_manager *
+vy_tx_manager_new(void);
/** Delete a tx manager object. */
void
-tx_manager_delete(struct tx_manager *xm);
+vy_tx_manager_delete(struct vy_tx_manager *xm);
/** Return total amount of memory used by active transactions. */
size_t
-tx_manager_mem_used(struct tx_manager *xm);
+vy_tx_manager_mem_used(struct vy_tx_manager *xm);
/** Create or reuse an instance of a read view. */
struct vy_read_view *
-tx_manager_read_view(struct tx_manager *xm);
+vy_tx_manager_read_view(struct vy_tx_manager *xm);
/** Dereference and possibly destroy a read view. */
void
-tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
+vy_tx_manager_destroy_read_view(struct vy_tx_manager *xm,
+ struct vy_read_view *rv);
/**
* Abort all rw transactions that affect the given space
@@ -307,19 +308,19 @@ tx_manager_destroy_read_view(struct tx_manager *xm, struct vy_read_view *rv);
* to call wal_sync() to flush them.
*/
void
-tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
- bool *need_wal_sync);
+vy_tx_manager_abort_writers_for_ddl(struct vy_tx_manager *xm,
+ struct space *space, bool *need_wal_sync);
/**
* Abort all local rw transactions that haven't reached WAL yet.
* Called before switching to read-only mode.
*/
void
-tx_manager_abort_writers_for_ro(struct tx_manager *xm);
+vy_tx_manager_abort_writers_for_ro(struct vy_tx_manager *xm);
/** Initialize a tx object. */
void
-vy_tx_create(struct tx_manager *xm, struct vy_tx *tx);
+vy_tx_create(struct vy_tx_manager *xm, struct vy_tx *tx);
/** Destroy a tx object. */
void
@@ -327,7 +328,7 @@ vy_tx_destroy(struct vy_tx *tx);
/** Begin a new transaction. */
struct vy_tx *
-vy_tx_begin(struct tx_manager *xm);
+vy_tx_begin(struct vy_tx_manager *xm);
/** Prepare a transaction to be committed. */
int
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 04/15] vinyl: rename tx_manager -> vy_tx_manager
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 04/15] vinyl: rename tx_manager -> vy_tx_manager Aleksandr Lyapunov
@ 2020-07-05 17:04 ` Vladislav Shpilevoy
0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:04 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
Thanks for the patch!
The old name is still used in one place:
src/box/vy_scheduler.h
151: /** List of read views, see tx_manager::read_views. */
> diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
> index 846c632..17c79d6 100644
> --- a/src/box/vy_tx.c
> +++ b/src/box/vy_tx.c
> @@ -1140,7 +1141,7 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
> }
>
> void
> -tx_manager_abort_writers_for_ddl(struct tx_manager *xm, struct space *space,
> +vy_tx_manager_abort_writers_for_ddl(struct vy_tx_manager *xm, struct space *space,
> bool *need_wal_sync)
The indentation is broken here.
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 05/15] tx: save txn in txn_stmt
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (3 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 04/15] vinyl: rename tx_manager -> vy_tx_manager Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:04 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 06/15] tx: add TX status Aleksandr Lyapunov
` (10 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 1 +
src/box/txn.h | 2 ++
2 files changed, 3 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index 1235201..71e732c 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -100,6 +100,7 @@ txn_stmt_new(struct region *region)
}
/* Initialize members explicitly to save time on memset() */
+ stmt->txn = in_txn();
stmt->space = NULL;
stmt->old_tuple = NULL;
stmt->new_tuple = NULL;
diff --git a/src/box/txn.h b/src/box/txn.h
index 3f6d79d..5b264f0 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -85,6 +85,8 @@ struct txn_stmt {
/** A linked list of all statements. */
struct stailq_entry next;
+ /** Owner of that statement. */
+ struct txn *txn;
/** Undo info. */
struct space *space;
struct tuple *old_tuple;
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 05/15] tx: save txn in txn_stmt
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 05/15] tx: save txn in txn_stmt Aleksandr Lyapunov
@ 2020-07-05 17:04 ` Vladislav Shpilevoy
0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:04 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
Thanks for the patch!
Why?
On 03/07/2020 08:33, Aleksandr Lyapunov wrote:
> ---
> src/box/txn.c | 1 +
> src/box/txn.h | 2 ++
> 2 files changed, 3 insertions(+)
>
> diff --git a/src/box/txn.c b/src/box/txn.c
> index 1235201..71e732c 100644
> --- a/src/box/txn.c
> +++ b/src/box/txn.c
> @@ -100,6 +100,7 @@ txn_stmt_new(struct region *region)
> }
>
> /* Initialize members explicitly to save time on memset() */
> + stmt->txn = in_txn();
> stmt->space = NULL;
> stmt->old_tuple = NULL;
> stmt->new_tuple = NULL;
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 3f6d79d..5b264f0 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -85,6 +85,8 @@ struct txn_stmt {
>
> /** A linked list of all statements. */
> struct stailq_entry next;
> + /** Owner of that statement. */
> + struct txn *txn;
> /** Undo info. */
> struct space *space;
> struct tuple *old_tuple;
>
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 06/15] tx: add TX status
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (4 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 05/15] tx: save txn in txn_stmt Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:04 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 07/15] tx: save preserve old tuple flag in txn_stmt Aleksandr Lyapunov
` (9 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 5 +++++
src/box/txn.h | 34 ++++++++++++++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index 71e732c..cf60d54 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -222,6 +222,7 @@ txn_begin(void)
txn->flags = 0;
txn->in_sub_stmt = 0;
txn->id = ++tsn;
+ txn->status = TXN_INPROGRESS;
txn->signature = -1;
txn->engine = NULL;
txn->engine_tx = NULL;
@@ -425,6 +426,7 @@ txn_complete(struct txn *txn)
* IPROTO_NOP statements only.
*/
if (txn->signature < 0) {
+ txn->status = TXN_ABORTED;
/* Undo the transaction. */
struct txn_stmt *stmt;
stailq_reverse(&txn->stmts);
@@ -435,6 +437,7 @@ txn_complete(struct txn *txn)
if (txn_has_flag(txn, TXN_HAS_TRIGGERS))
txn_run_rollback_triggers(txn, &txn->on_rollback);
} else {
+ txn->status = TXN_COMMITTED;
/* Commit the transaction. */
if (txn->engine != NULL)
engine_commit(txn->engine, txn);
@@ -555,6 +558,7 @@ txn_prepare(struct txn *txn)
trigger_clear(&txn->fiber_on_yield);
txn->start_tm = ev_monotonic_now(loop());
+ txn->status = TXN_PREPARED;
return 0;
}
@@ -683,6 +687,7 @@ void
txn_rollback(struct txn *txn)
{
assert(txn == in_txn());
+ txn->status = TXN_ABORTED;
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
diff --git a/src/box/txn.h b/src/box/txn.h
index 5b264f0..1394bfb 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -77,6 +77,38 @@ enum {
};
/**
+ * Status of a transaction.
+ */
+enum txn_status {
+ /**
+ * Initial state of TX. The only state of a TX that allowed to do
+ * read or write actions.
+ */
+ TXN_INPROGRESS,
+ /**
+ * The TX have passed conflict checks and is ready to be committed.
+ */
+ TXN_PREPARED,
+ /**
+ * The TX was aborted by other TX due to conflict.
+ */
+ TXN_CONFLICTED,
+ /**
+ * The TX was read_only, has a conflict and was sent to read view.
+ * Read-only and does not participate in conflict resolution ever more.
+ */
+ TXN_IN_READ_VIEW,
+ /**
+ * The TX was committed.
+ */
+ TXN_COMMITTED,
+ /**
+ * The TX was aborted.
+ */
+ TXN_ABORTED,
+};
+
+/**
* A single statement of a multi-statement
* transaction: undo and redo info.
*/
@@ -173,6 +205,8 @@ struct txn {
* Valid IDs start from 1.
*/
int64_t id;
+ /** Status of the TX */
+ enum txn_status status;
/** List of statements in a transaction. */
struct stailq stmts;
/** Number of new rows without an assigned LSN. */
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 07/15] tx: save preserve old tuple flag in txn_stmt
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (5 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 06/15] tx: add TX status Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:05 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 08/15] tx: introduce tx manager Aleksandr Lyapunov
` (8 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/memtx_space.c | 17 +++++++++++++++++
src/box/txn.c | 3 +++
src/box/txn.h | 9 +++++++++
3 files changed, 29 insertions(+)
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 8755920..5820c40 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -316,6 +316,10 @@ memtx_space_execute_replace(struct space *space, struct txn *txn,
if (stmt->new_tuple == NULL)
return -1;
tuple_ref(stmt->new_tuple);
+
+ if (mode == DUP_INSERT)
+ stmt->preserve_old_tuple = true;
+
if (memtx_space->replace(space, NULL, stmt->new_tuple,
mode, &stmt->old_tuple) != 0)
return -1;
@@ -342,6 +346,13 @@ memtx_space_execute_delete(struct space *space, struct txn *txn,
struct tuple *old_tuple;
if (index_get(pk, key, part_count, &old_tuple) != 0)
return -1;
+
+ /*
+ * We have to delete exactly old_tuple just because we return it as
+ * a result.
+ */
+ stmt->preserve_old_tuple = true;
+
if (old_tuple != NULL &&
memtx_space->replace(space, old_tuple, NULL,
DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0)
@@ -390,6 +401,9 @@ memtx_space_execute_update(struct space *space, struct txn *txn,
if (stmt->new_tuple == NULL)
return -1;
tuple_ref(stmt->new_tuple);
+
+ stmt->preserve_old_tuple = true;
+
if (memtx_space->replace(space, old_tuple, stmt->new_tuple,
DUP_REPLACE, &stmt->old_tuple) != 0)
return -1;
@@ -496,6 +510,9 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn,
stmt->new_tuple = NULL;
}
}
+
+ stmt->preserve_old_tuple = true;
+
/*
* It's OK to use DUP_REPLACE_OR_INSERT: we don't risk
* inserting a new tuple if the old one exists, since
diff --git a/src/box/txn.c b/src/box/txn.c
index cf60d54..d1182aa 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -107,6 +107,7 @@ txn_stmt_new(struct region *region)
stmt->engine_savepoint = NULL;
stmt->row = NULL;
stmt->has_triggers = false;
+ stmt->preserve_old_tuple = false;
return stmt;
}
@@ -349,6 +350,8 @@ txn_commit_stmt(struct txn *txn, struct request *request)
*/
if (stmt->space != NULL && !rlist_empty(&stmt->space->on_replace) &&
stmt->space->run_triggers && (stmt->old_tuple || stmt->new_tuple)) {
+ /* Triggers see old_tuple and that tuple must remain the same */
+ stmt->preserve_old_tuple = true;
int rc = 0;
if(!space_is_temporary(stmt->space)) {
rc = trigger_run(&stmt->space->on_replace, txn);
diff --git a/src/box/txn.h b/src/box/txn.h
index 1394bfb..e860e1e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -129,6 +129,15 @@ struct txn_stmt {
struct xrow_header *row;
/** on_commit and/or on_rollback list is not empty. */
bool has_triggers;
+ /**
+ * Whether the stmt upon commit must replace exactly old_tuple from it.
+ * Explanation: to the moment of commit of the statement actual state
+ * of the space could change due to commit of other transaction(s).
+ * Some statements require the replaced tuple at the moment of commit to
+ * be exactly the same as replaced tuple at the moment of execution.
+ * Some - doesn't.
+ */
+ bool preserve_old_tuple;
/** Commit/rollback triggers associated with this statement. */
struct rlist on_commit;
struct rlist on_rollback;
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 07/15] tx: save preserve old tuple flag in txn_stmt
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 07/15] tx: save preserve old tuple flag in txn_stmt Aleksandr Lyapunov
@ 2020-07-05 17:05 ` Vladislav Shpilevoy
0 siblings, 0 replies; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:05 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
> diff --git a/src/box/txn.h b/src/box/txn.h
> index 1394bfb..e860e1e 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -129,6 +129,15 @@ struct txn_stmt {
> struct xrow_header *row;
> /** on_commit and/or on_rollback list is not empty. */
> bool has_triggers;
> + /**
> + * Whether the stmt upon commit must replace exactly old_tuple from it.
> + * Explanation: to the moment of commit of the statement actual state
> + * of the space could change due to commit of other transaction(s).
> + * Some statements require the replaced tuple at the moment of commit to
> + * be exactly the same as replaced tuple at the moment of execution.
> + * Some - doesn't.
> + */
I still can't understand why is this flag added. Can you provide an example?
> + bool preserve_old_tuple;
Flag names should be started from is/has/does/... .
> /** Commit/rollback triggers associated with this statement. */
> struct rlist on_commit;
> struct rlist on_rollback;
>
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 08/15] tx: introduce tx manager
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (6 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 07/15] tx: save preserve old tuple flag in txn_stmt Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number Aleksandr Lyapunov
` (7 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 16 ++++++++++++++++
src/box/txn.h | 6 ++++++
src/main.cc | 3 +++
3 files changed, 25 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index d1182aa..08293a1 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -36,6 +36,12 @@
#include "xrow.h"
#include "errinj.h"
+struct tx_manager
+{
+};
+
+static struct tx_manager txm;
+
double too_long_threshold;
/* Txn cache. */
@@ -953,3 +959,13 @@ txn_on_yield(struct trigger *trigger, void *event)
txn_set_flag(txn, TXN_IS_ABORTED_BY_YIELD);
return 0;
}
+
+void
+tx_manager_init()
+{
+}
+
+void
+tx_manager_free()
+{
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index e860e1e..e2194b6 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -636,6 +636,12 @@ box_txn_savepoint(void);
API_EXPORT int
box_txn_rollback_to_savepoint(box_txn_savepoint_t *savepoint);
+void
+tx_manager_init();
+
+void
+tx_manager_free();
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
diff --git a/src/main.cc b/src/main.cc
index 2c96391..edf6bad 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -75,6 +75,7 @@
#include <libutil.h>
#include "box/lua/init.h" /* box_lua_init() */
#include "box/session.h"
+#include "box/txn.h"
#include "systemd.h"
#include "crypto/crypto.h"
#include "core/popen.h"
@@ -667,6 +668,7 @@ tarantool_free(void)
random_free();
#endif
crypto_free();
+ tx_manager_free();
coll_free();
systemd_free();
say_logger_free();
@@ -830,6 +832,7 @@ main(int argc, char **argv)
signal_init();
cbus_init();
coll_init();
+ tx_manager_init();
crypto_init();
systemd_init();
tarantool_lua_init(tarantool_bin, main_argc, main_argv);
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (7 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 08/15] tx: introduce tx manager Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:05 ` Vladislav Shpilevoy
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 10/15] tx: introduce txn_stmt_destroy Aleksandr Lyapunov
` (6 subsequent siblings)
15 siblings, 1 reply; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 4 ++++
src/box/txn.h | 5 +++++
2 files changed, 9 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index 08293a1..8a935d7 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -38,6 +38,8 @@
struct tx_manager
{
+ /** Last prepare-sequence-number that was assigned to preped TX. */
+ int64_t last_psn;
};
static struct tx_manager txm;
@@ -539,6 +541,8 @@ txn_journal_entry_new(struct txn *txn)
static int
txn_prepare(struct txn *txn)
{
+ txn->psn = ++txm.last_psn;
+
if (txn_has_flag(txn, TXN_IS_ABORTED_BY_YIELD)) {
assert(!txn_has_flag(txn, TXN_CAN_YIELD));
diag_set(ClientError, ER_TRANSACTION_YIELD);
diff --git a/src/box/txn.h b/src/box/txn.h
index e2194b6..cd1665f 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -214,6 +214,11 @@ struct txn {
* Valid IDs start from 1.
*/
int64_t id;
+ /**
+ * A sequential ID that is assigned when the TX become prepared.
+ * Transactions are committed on in that order.
+ */
+ int64_t psn;
/** Status of the TX */
enum txn_status status;
/** List of statements in a transaction. */
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number Aleksandr Lyapunov
@ 2020-07-05 17:05 ` Vladislav Shpilevoy
2020-07-06 13:50 ` Aleksandr Lyapunov
0 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:05 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
Thanks for the patch!
What are these psns? What are the difference with txn->id and
with lsn?
> diff --git a/src/box/txn.h b/src/box/txn.h
> index e2194b6..cd1665f 100644
> --- a/src/box/txn.h
> +++ b/src/box/txn.h
> @@ -214,6 +214,11 @@ struct txn {
> * Valid IDs start from 1.
> */
> int64_t id;
> + /**
> + * A sequential ID that is assigned when the TX become prepared.
> + * Transactions are committed on in that order.
> + */
> + int64_t psn;
Why can't you use id?
> /** Status of the TX */
> enum txn_status status;
> /** List of statements in a transaction. */
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number
2020-07-05 17:05 ` Vladislav Shpilevoy
@ 2020-07-06 13:50 ` Aleksandr Lyapunov
0 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-06 13:50 UTC (permalink / raw)
To: Vladislav Shpilevoy, tarantool-patches
On 05.07.2020 20:05, Vladislav Shpilevoy wrote:
> Thanks for the patch!
>
> What are these psns? What are the difference with txn->id and
> with lsn?
I need an ID for ordering transactions, for answering a simple
question - which TX is earlier in order of serialization.
txn->id gives us order of creation of TXs, it's not suitable because
TXs could be prepared in different order.
LSN is good, it give us a correct order, but there's a problem with it:
it becomes known after wal writer call, and I need that order before
wal writer finishes.
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 10/15] tx: introduce txn_stmt_destroy
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (8 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 09/15] tx: introduce prepare sequence number Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 11/15] tx: introduce conflict tracker Aleksandr Lyapunov
` (5 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
From: Aleksandr Lyapunov <a.lyapunov@corp.mail.ru>
---
src/box/txn.c | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 8a935d7..d34ec3a 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -120,7 +120,7 @@ txn_stmt_new(struct region *region)
}
static inline void
-txn_stmt_unref_tuples(struct txn_stmt *stmt)
+txn_stmt_destroy(struct txn_stmt *stmt)
{
if (stmt->old_tuple != NULL)
tuple_unref(stmt->old_tuple);
@@ -168,7 +168,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
assert(txn->n_applier_rows > 0);
txn->n_applier_rows--;
}
- txn_stmt_unref_tuples(stmt);
+ txn_stmt_destroy(stmt);
stmt->space = NULL;
stmt->row = NULL;
}
@@ -207,7 +207,7 @@ txn_free(struct txn *txn)
{
struct txn_stmt *stmt;
stailq_foreach_entry(stmt, &txn->stmts, next)
- txn_stmt_unref_tuples(stmt);
+ txn_stmt_destroy(stmt);
/* Truncate region up to struct txn size. */
region_truncate(&txn->region, sizeof(struct txn));
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 11/15] tx: introduce conflict tracker
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (9 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 10/15] tx: introduce txn_stmt_destroy Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 12/15] tx: introduce txm_story Aleksandr Lyapunov
` (4 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/box/txn.h | 5 +++
2 files changed, 103 insertions(+)
diff --git a/src/box/txn.c b/src/box/txn.c
index d34ec3a..79384f0 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -44,6 +44,13 @@ struct tx_manager
static struct tx_manager txm;
+struct tx_conflict_tracker {
+ struct txn *wreaker;
+ struct txn *victim;
+ struct rlist in_conflict_list;
+ struct rlist in_conflicted_by_list;
+};
+
double too_long_threshold;
/* Txn cache. */
@@ -196,6 +203,8 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->conflict_list);
+ rlist_create(&txn->conflicted_by_list);
return txn;
}
@@ -205,6 +214,20 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_conflict_tracker *entry, *next;
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ rlist_foreach_entry_safe(entry, &txn->conflicted_by_list,
+ in_conflicted_by_list, next) {
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
+
struct txn_stmt *stmt;
stailq_foreach_entry(stmt, &txn->stmts, next)
txn_stmt_destroy(stmt);
@@ -222,6 +245,8 @@ txn_begin(void)
struct txn *txn = txn_new();
if (txn == NULL)
return NULL;
+ assert(rlist_empty(&txn->conflict_list));
+ assert(rlist_empty(&txn->conflicted_by_list));
/* Initialize members explicitly to save time on memset() */
stailq_create(&txn->stmts);
@@ -279,6 +304,15 @@ txn_begin_stmt(struct txn *txn, struct space *space)
diag_set(ClientError, ER_SUB_STMT_MAX);
return -1;
}
+
+ /*
+ * A conflict have happened; there is no reason to continue the TX.
+ */
+ if (txn->status == TXN_CONFLICTED) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
struct txn_stmt *stmt = txn_stmt_new(&txn->region);
if (stmt == NULL)
return -1;
@@ -558,6 +592,16 @@ txn_prepare(struct txn *txn)
diag_set(ClientError, ER_FOREIGN_KEY_CONSTRAINT);
return -1;
}
+
+ /*
+ * Somebody else has written some value that we have read.
+ * The transaction is not possible.
+ */
+ if (txn->status == TXN_CONFLICTED) {
+ diag_set(ClientError, ER_TRANSACTION_CONFLICT);
+ return -1;
+ }
+
/*
* Perform transaction conflict resolution. Engine == NULL when
* we have a bunch of IPROTO_NOP statements.
@@ -566,6 +610,17 @@ txn_prepare(struct txn *txn)
if (engine_prepare(txn->engine, txn) != 0)
return -1;
}
+
+ struct tx_conflict_tracker *entry, *next;
+ rlist_foreach_entry_safe(entry, &txn->conflict_list,
+ in_conflict_list, next) {
+ if (entry->victim->status == TXN_INPROGRESS)
+ entry->victim->status = TXN_CONFLICTED;
+ rlist_del(&entry->in_conflict_list);
+ rlist_del(&entry->in_conflicted_by_list);
+ }
+
+
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
@@ -973,3 +1028,46 @@ void
tx_manager_free()
{
}
+
+int
+txm_cause_conflict(struct txn *wreaker, struct txn *victim)
+{
+ struct tx_conflict_tracker *tracker = NULL;
+ struct rlist *r1 = wreaker->conflict_list.next;
+ struct rlist *r2 = wreaker->conflicted_by_list.next;
+ while (r1 != &wreaker->conflict_list &&
+ r2 != &wreaker->conflicted_by_list) {
+ tracker = rlist_entry(r1, struct tx_conflict_tracker,
+ in_conflict_list);
+ if (tracker->wreaker == wreaker && tracker->victim == victim)
+ break;
+ tracker = rlist_entry(r2, struct tx_conflict_tracker,
+ in_conflicted_by_list);
+ if (tracker->wreaker == wreaker && tracker->victim == victim)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /* Move to the beginning of a list
+ * for a case of subsequent lookups */
+ rlist_del(&tracker->in_conflict_list);
+ rlist_del(&tracker->in_conflicted_by_list);
+ } else {
+ size_t size;
+ tracker = region_alloc_object(&victim->region,
+ struct tx_conflict_tracker,
+ &size);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, size, "tx region",
+ "conflict_tracker");
+ return -1;
+ }
+ }
+ tracker->wreaker = wreaker;
+ tracker->victim = victim;
+ rlist_add(&wreaker->conflict_list, &tracker->in_conflict_list);
+ rlist_add(&wreaker->conflicted_by_list, &tracker->in_conflicted_by_list);
+ return 0;
+}
\ No newline at end of file
diff --git a/src/box/txn.h b/src/box/txn.h
index cd1665f..92c0116 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -280,6 +280,8 @@ struct txn {
uint32_t fk_deferred_count;
/** List of savepoints to find savepoint by name. */
struct rlist savepoints;
+ struct rlist conflict_list;
+ struct rlist conflicted_by_list;
};
static inline bool
@@ -647,6 +649,9 @@ tx_manager_init();
void
tx_manager_free();
+int
+txm_cause_conflict(struct txn *wreaker, struct txn *victim);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 12/15] tx: introduce txm_story
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (10 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 11/15] tx: introduce conflict tracker Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 13/15] tx: indexes Aleksandr Lyapunov
` (3 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 516 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/box/txn.h | 129 +++++++++++++++
2 files changed, 644 insertions(+), 1 deletion(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 79384f0..26377c6 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -35,13 +35,44 @@
#include <fiber.h>
#include "xrow.h"
#include "errinj.h"
+#include "small/mempool.h"
+
+static uint32_t
+txm_story_key_hash(const struct tuple *a)
+{
+ uintptr_t u = (uintptr_t)a;
+ if (sizeof(uintptr_t) <= sizeof(uint32_t))
+ return u;
+ else
+ return u ^ (u >> 32);
+}
+
+#define mh_name _history
+#define mh_key_t struct tuple *
+#define mh_node_t struct txm_story *
+#define mh_arg_t int
+#define mh_hash(a, arg) (txm_story_key_hash((*(a))->tuple))
+#define mh_hash_key(a, arg) (txm_story_key_hash(a))
+#define mh_cmp(a, b, arg) ((*(a))->tuple != (*(b))->tuple)
+#define mh_cmp_key(a, b, arg) ((a) != (*(b))->tuple)
+#define MH_SOURCE
+#include "salad/mhash.h"
struct tx_manager
{
/** Last prepare-sequence-number that was assigned to preped TX. */
int64_t last_psn;
+ /** Mempools for tx_story objects with difference index count. */
+ struct mempool txm_story_pool[BOX_INDEX_MAX];
+ /** Hash table tuple -> txm_story of that tuple. */
+ struct mh_history_t *history;
+ /** List of all txm_story objects. */
+ struct rlist all_stories;
+ /** Iterator that sequentially traverses all txm_story objects. */
+ struct rlist *traverse_all_stories;
};
+/** The one and only instance of tx_manager. */
static struct tx_manager txm;
struct tx_conflict_tracker {
@@ -119,6 +150,9 @@ txn_stmt_new(struct region *region)
stmt->space = NULL;
stmt->old_tuple = NULL;
stmt->new_tuple = NULL;
+ stmt->add_story = NULL;
+ stmt->del_story = NULL;
+ stmt->next_in_del_list = NULL;
stmt->engine_savepoint = NULL;
stmt->row = NULL;
stmt->has_triggers = false;
@@ -1022,11 +1056,23 @@ txn_on_yield(struct trigger *trigger, void *event)
void
tx_manager_init()
{
+ for (size_t i = 0; i < BOX_INDEX_MAX; i++) {
+ size_t item_size = sizeof(struct txm_story) +
+ i * sizeof(struct txm_story_link);
+ mempool_create(&txm.txm_story_pool[i],
+ cord_slab_cache(), item_size);
+ }
+ txm.history = mh_history_new();
+ rlist_create(&txm.all_stories);
+ txm.traverse_all_stories = &txm.all_stories;
}
void
tx_manager_free()
{
+ mh_history_delete(txm.history);
+ for (size_t i = 0; i < BOX_INDEX_MAX; i++)
+ mempool_destroy(&txm.txm_story_pool[i]);
}
int
@@ -1070,4 +1116,472 @@ txm_cause_conflict(struct txn *wreaker, struct txn *victim)
rlist_add(&wreaker->conflict_list, &tracker->in_conflict_list);
rlist_add(&wreaker->conflicted_by_list, &tracker->in_conflicted_by_list);
return 0;
-}
\ No newline at end of file
+}
+
+struct txm_story *
+txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count)
+{
+ assert(!tuple->is_dirty);
+ assert(index_count < BOX_INDEX_MAX);
+ struct mempool *pool = &txm.txm_story_pool[index_count];
+ struct txm_story *story = (struct txm_story *)mempool_alloc(pool);
+ if (story == NULL) {
+ size_t item_size = sizeof(struct txm_story) +
+ index_count * sizeof(struct txm_story_link);
+ diag_set(OutOfMemory, item_size,
+ "tx_manager", "tx story");
+ return story;
+ }
+ story->tuple = tuple;
+
+ const struct txm_story **put_story = (const struct txm_story **)&story;
+ struct txm_story **empty = NULL;
+ mh_int_t pos = mh_history_put(txm.history, put_story, &empty, 0);
+ if (pos == mh_end(txm.history)) {
+ mempool_free(pool, story);
+ diag_set(OutOfMemory, pos + 1,
+ "tx_manager", "tx history hash table");
+ return NULL;
+ }
+ tuple->is_dirty = true;
+ tuple_ref(tuple);
+
+ story->index_count = index_count;
+ story->add_stmt = stmt;
+ story->add_psn = 0;
+ story->del_stmt = NULL;
+ story->del_psn = 0;
+ rlist_create(&story->reader_list);
+ rlist_add(&txm.all_stories, &story->in_all_stories);
+ memset(story->link, 0, sizeof(story->link[0]) * index_count);
+ return story;
+}
+
+/** Temporary allocated on region that stores a conflicting TX. */
+struct txn_conflict
+{
+ struct txn *wreaker;
+ struct txn_conflict *next;
+};
+
+int
+txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt,
+ enum dup_replace_mode mode)
+{
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ assert(!story->link[i].is_old_story);
+ struct tuple *next_tuple = story->link[i].old_tuple;
+ if (next_tuple != NULL && next_tuple->is_dirty) {
+ struct txm_story *next = txm_story_get(next_tuple);
+ assert(next->link[i].new_story == NULL);
+ story->link[i].is_old_story = true;
+ story->link[i].old_story = next;
+ next->link[i].new_story = story;
+ }
+ }
+
+ struct region *region = &stmt->txn->region;
+ size_t region_svp = region_used(region);
+ struct txn_conflict *collected_conflicts = NULL;
+
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ struct tuple *visible = NULL;
+ struct txm_story *node = story;
+ while (true) {
+ if (!node->link[i].is_old_story) {
+ /*
+ * the tuple is so old that we doesn't
+ * know its story.
+ */
+ visible = node->link[i].old_tuple;
+ assert(visible == NULL || !visible->is_dirty);
+ break;
+ }
+ node = node->link[i].old_story;
+
+ if (node->del_psn != 0) {
+ /* deleted by at least prepared TX. */
+ break;
+ }
+ if (node->del_stmt != NULL &&
+ node->del_stmt->txn == stmt->txn)
+ break; /* deleted by us. */
+ if (node->add_psn != 0) {
+ /* added by at least prepared TX. */
+ visible = node->tuple;
+ break;
+ }
+ if (node->add_stmt == NULL) {
+ /*
+ * the tuple is so old that we lost
+ * the beginning of its story.
+ */
+ visible = node->tuple;
+ break;
+ }
+ if (node->add_stmt->txn == stmt->txn) {
+ /* added by us. */
+ visible = node->tuple;
+ break;
+ }
+ /*
+ * We skip the story but once the story is committed
+ * before out TX that may cause conflict.
+ * The conflict will be inavoidable if this statement
+ * relies on old_tuple. If not (it's a replace),
+ * the conflict will take place only for secondary
+ * index if the story will not be overwritten in primary
+ * index.
+ */
+ bool cross_conflict = false;
+ if (stmt->preserve_old_tuple) {
+ cross_conflict = true;
+ } else if (i != 0) {
+ struct txm_story *look_up = node;
+ cross_conflict = true;
+ while (look_up->link[0].new_story != NULL) {
+ struct txm_story *over;
+ over = look_up->link[0].new_story;
+ if (over->add_stmt->txn == stmt->txn) {
+ cross_conflict = false;
+ break;
+ }
+ look_up = over;
+ }
+ }
+ if (!cross_conflict)
+ continue;
+ size_t err_size;
+ struct txn_conflict *next_conflict;
+ next_conflict =
+ region_alloc_object(region,
+ struct txn_conflict,
+ &err_size);
+ if (next_conflict == NULL) {
+ diag_set(OutOfMemory, err_size,
+ "txn_region", "txn conflict");
+ goto fail;
+ }
+ next_conflict->wreaker = node->add_stmt->txn;
+ next_conflict->next = collected_conflicts;
+ collected_conflicts = next_conflict;
+ }
+
+ int errcode;
+ errcode = replace_check_dup(stmt->old_tuple, visible,
+ i == 0 ? mode : DUP_INSERT);
+ if (errcode != 0) {
+ struct space *sp = stmt->space;
+ if (sp != NULL)
+ diag_set(ClientError, errcode,
+ sp->index[i]->def->name,
+ space_name(sp));
+ goto fail;
+ }
+ }
+
+ if (story->link[0].is_old_story) {
+ stmt->next_in_del_list = story->link[0].old_story->del_stmt;
+ story->link[0].old_story->del_stmt = stmt;
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ if (story->link[i].is_old_story)
+ continue;
+ if (story->link[i].old_tuple != NULL)
+ tuple_ref(story->link[i].old_tuple);
+ }
+ } else if (story->link[0].old_tuple != NULL) {
+ struct tuple *old_tuple = story->link[0].old_tuple;
+ struct txm_story *del_story;
+ del_story = txm_story_new(old_tuple, NULL, story->index_count);
+ if (del_story == NULL)
+ goto fail;
+ del_story->del_stmt = stmt;
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ if (story->link[i].is_old_story)
+ continue;
+ if (story->link[i].old_tuple == old_tuple) {
+ story->link[i].is_old_story = true;
+ story->link[i].old_story = del_story;
+ } else if (story->link[i].old_tuple != NULL) {
+ tuple_ref(story->link[i].old_tuple);
+ }
+ }
+ }
+
+ while (collected_conflicts != NULL) {
+ if (txm_cause_conflict(collected_conflicts->wreaker,
+ stmt->txn) != 0) {
+ goto fail;
+ }
+ collected_conflicts = collected_conflicts->next;
+ }
+ stmt->add_story = story;
+
+ region_truncate(region, region_svp);
+ return 0;
+
+fail:
+ for (uint32_t j = story->index_count; j > 0; j--) {
+ uint32_t i = j - 1;
+ if (story->link[i].is_old_story) {
+ struct txm_story *next = story->link[i].old_story;
+ story->link[i].is_old_story = false;
+ story->link[i].old_tuple = next->tuple;
+ next->link[i].new_story = NULL;
+ }
+ }
+ region_truncate(region, region_svp);
+ return -1;
+}
+
+int
+txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt,
+ uint32_t index_count)
+{
+ if (old_tuple->is_dirty) {
+ struct txm_story *story = txm_story_get(old_tuple);
+ stmt->next_in_del_list = story->del_stmt;
+ story->del_stmt = stmt;
+ stmt->del_story = story;
+ return 0;
+ }
+ struct txm_story *del_story;
+ del_story = txm_story_new(old_tuple, NULL, index_count);
+ if (del_story == NULL)
+ return -1;
+ del_story->del_stmt = stmt;
+ stmt->del_story = del_story;
+ return 0;
+}
+
+void
+txm_unlink_add_story(struct txn_stmt *stmt)
+{
+ assert(stmt->add_story != NULL);
+
+ struct txm_story *story = stmt->add_story;
+
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ struct txm_story_link *from = &story->link[i];
+ if (from->new_story == NULL) {
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ struct tuple *rollback = from->is_old_story ?
+ from->old_story->tuple :
+ from->old_tuple;
+ if (index_replace(index, story->tuple, rollback,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+ if (i == 0 && rollback != NULL)
+ tuple_ref(rollback);
+ } else {
+ struct txm_story *new_story = from->new_story;
+ struct txm_story_link *to = &new_story->link[i];
+ assert(to->is_old_story);
+ assert(to->old_story == story);
+ to->is_old_story = from->is_old_story;
+ if (from->is_old_story) {
+ to->old_story = from->old_story;
+ from->old_story = NULL;
+ } else {
+ to->old_tuple = from->old_tuple;
+ from->old_tuple = NULL;
+ }
+ from->is_old_story = false;
+ }
+ }
+
+ stmt->add_story = NULL;
+}
+
+void
+txm_unlink_del_story(struct txn_stmt *stmt)
+{
+ assert(stmt->del_story != NULL);
+
+ struct txm_story *story = stmt->del_story;
+
+ struct txn_stmt **prev = &story->del_stmt;
+ while (*prev != stmt) {
+ prev = &(*prev)->next_in_del_list;
+ assert(*prev != NULL);
+ }
+ *prev = stmt->next_in_del_list;
+
+ stmt->del_story = NULL;
+}
+
+void
+txm_prepare_add_story(struct txn_stmt *stmt)
+{
+ assert(stmt->txn->psn != 0);
+ assert(stmt->add_story != NULL);
+ struct txm_story *story = stmt->add_story;
+
+ /* Move story to the past to prepared stories. */
+
+ while (true) {
+ if (!story->link[0].is_old_story)
+ break; /* tuple is prepared. */
+ struct txm_story *old_story = story->link[0].old_story;
+ if (old_story->del_psn != 0)
+ break; /* is psn is set, the change is prepared. */
+ if (old_story->add_psn != 0)
+ break; /* is psn is set, the change is prepared. */
+ if (old_story->add_stmt != NULL)
+ break; /* ancient */
+ if (old_story->add_stmt->txn == stmt->txn)
+ break; /* added by us. */
+
+ /* Swap story and old story. */
+ for (uint32_t i = 0; i < old_story->index_count; i++) {
+ if (!story->link[i].is_old_story ||
+ story->link[i].old_story != old_story)
+ continue;
+ if (story->link[i].new_story == NULL) {
+ /* we have to replace the tuple in index. */
+ struct tuple *unused;
+ struct index *index = stmt->space->index[i];
+ if (index_replace(index, story->tuple,
+ old_story->tuple,
+ DUP_INSERT, &unused) != 0) {
+ diag_log();
+ unreachable();
+ panic("failed to rollback change");
+ }
+ } else {
+ assert(story->link[i].new_story->link[i].old_story == story);
+ story->link[i].new_story->link[i].old_story = old_story;
+ }
+ old_story->link[i].new_story = story->link[i].new_story;
+
+ story->link[i].is_old_story = old_story->link[i].is_old_story;
+ if (old_story->link[i].is_old_story) {
+ story->link[i].old_story = old_story->link[i].old_story;
+ assert(old_story->link[i].old_story->link[i].new_story == old_story);
+ old_story->link[i].old_story->link[i].new_story = story;
+ } else {
+ story->link[i].old_tuple = old_story->link[i].old_tuple;
+ }
+ old_story->link[i].is_old_story = true;
+ old_story->link[i].old_story = story;
+ }
+ }
+ story->add_psn = stmt->txn->psn;
+}
+
+void
+txm_let_it_go(struct txn_stmt *stmt)
+{
+ if (stmt->add_story != NULL) {
+ stmt->add_story->add_stmt = NULL;
+ stmt->add_story = NULL;
+ }
+ if (stmt->del_story != NULL) {
+ struct txn_stmt **dels = &stmt->del_story->del_stmt;
+ while ((*dels) != stmt) {
+ assert(*dels != NULL);
+ dels = &(*dels)->next_in_del_list;
+ }
+ *dels = (*dels)->next_in_del_list;
+ stmt->del_story = NULL;
+ }
+}
+
+struct txm_story *
+txm_story_get(struct tuple *tuple)
+{
+ assert(tuple->is_dirty);
+
+ mh_int_t pos = mh_history_find(txm.history, tuple, 0);
+ assert(pos != mh_end(txm.history));
+ return *mh_history_node(txm.history, pos);
+}
+
+struct tuple *
+txm_tuple_clarify_slow(struct txn* txn, struct tuple* tuple, uint32_t index,
+ uint32_t mk_index, bool prepared_ok)
+{
+ struct tuple *result = NULL;
+ struct mh_history_t *hist = txm.history;
+ mh_int_t pos = mh_history_find(hist, tuple, 0);
+ assert(pos != mh_end(hist));
+ struct txm_story *story = *mh_history_node(hist, pos);
+ bool own_change = false;
+
+ while (true) {
+ struct txn_stmt *dels = story->del_stmt;
+ while (dels != NULL) {
+ if (dels->txn == txn) {
+ /* deleted by us. */
+ own_change = true;
+ break;
+ }
+ dels = dels->next_in_del_list;
+ }
+ if (prepared_ok && story->del_psn != 0) {
+ /* deleted by prepared. */
+ break;
+ }
+ if (story->del_psn != 0 && story->del_stmt == NULL) {
+ /* deleted by committed. */
+ break;
+ }
+
+ if (story->add_stmt != NULL && story->add_stmt->txn == txn) {
+ /* added by us. */
+ result = story->tuple;
+ break;
+ }
+
+ if (prepared_ok && story->add_psn != 0) {
+ /* added by prepared. */
+ result = story->tuple;
+ break;
+ }
+
+ if (story->add_psn != 0 && story->add_stmt == NULL) {
+ /* added by committed. */
+ result = story->tuple;
+ break;
+ }
+
+ if (!story->link[index].is_old_story) {
+ return story->link[index].old_tuple;
+ }
+
+ story = story->link[index].old_story;
+ }
+ (void)own_change; /* TODO: add conflict */
+ (void)mk_index; /* TODO: multiindex */
+ return result;
+}
+void
+txm_story_delete(struct txm_story *story)
+{
+ if (txm.traverse_all_stories == &story->in_all_stories)
+ txm.traverse_all_stories = rlist_next(txm.traverse_all_stories);
+ rlist_del(&story->in_all_stories);
+ tuple_unref(story->tuple);
+ story->tuple->is_dirty = false;
+
+ for (uint32_t i = 0; i < story->index_count; i++) {
+ if (!story->link[i].is_old_story &&
+ story->link[i].old_tuple != NULL) {
+ tuple_unref(story->link[i].old_tuple);
+ }
+ }
+
+#ifndef NDEBUG
+ const char poison_char = '?';
+ size_t item_size = sizeof(struct txm_story) +
+ story->index_count * sizeof(struct txm_story_link);
+ memset(story, poison_char, item_size);
+#endif
+
+ struct mempool *pool = &txm.txm_story_pool[story->index_count];
+ mempool_free(pool, story);
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index 92c0116..b5eda1e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -36,6 +36,7 @@
#include "trigger.h"
#include "fiber.h"
#include "space.h"
+#include "tuple.h"
#if defined(__cplusplus)
extern "C" {
@@ -123,6 +124,18 @@ struct txn_stmt {
struct space *space;
struct tuple *old_tuple;
struct tuple *new_tuple;
+ /**
+ * If new_tuple != NULL and this transaction was not prepared,
+ * this member holds added story of the new_tuple.
+ */
+ struct txm_story *add_story;
+ /**
+ * If new_tuple == NULL and this transaction was not prepared,
+ * this member holds added story of the old_tuple.
+ */
+ struct txm_story *del_story;
+ /** Link in txm_story::del_stmt linked list. */
+ struct txn_stmt *next_in_del_list;
/** Engine savepoint for the start of this statement. */
void *engine_savepoint;
/** Redo info: the binary log row */
@@ -284,6 +297,80 @@ struct txn {
struct rlist conflicted_by_list;
};
+/**
+ * Link that connects a txm_story with older and newer stories of the same
+ * key in index.
+ */
+struct txm_story_link {
+ /** Story that was happened after that story was ended. */
+ struct txm_story *new_story;
+ /** Flag whether there is older story of the same key in index. */
+ bool is_old_story;
+ union {
+ /** is_old_story = true. Older story of the same key. */
+ struct txm_story *old_story;
+ /**
+ * is_old_story = false. Tuple that was in the index before
+ * this story. That tuple is either NULL or so old that
+ * we don't have a story about it.
+ */
+
+ struct tuple *old_tuple;
+ };
+};
+
+/**
+ * A part of a history of a value in space.
+ * It's a story about a tuple, from the point it was added to space to the
+ * point when it was deleted from a space.
+ * All stories are linked into a list of stories of the same key of each index.
+ */
+struct txm_story {
+ /** The story is about thist tuple. */
+ struct tuple *tuple;
+ /**
+ * Statement that told this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if we don't know who
+ * introduced that story.
+ */
+ struct txn_stmt *add_stmt;
+ /**
+ * Prepare sequence number of add_stmt's transaction. Is set when
+ * the transactions is prepared. Can be 0 if the transaction is
+ * in progress or we don't know who introduced that story.
+ */
+ int64_t add_psn;
+ /**
+ * Statement that ended this story. Is set to NULL when the statement's
+ * transaction becomes committed. Can also be NULL if the tuple has not
+ * been deleted yet.
+ */
+ struct txn_stmt *del_stmt;
+ /**
+ * Prepare sequence number of del_stmt's transaction. Is set when
+ * the transactions is prepared. Can be 0 if the transaction is
+ * in progress or if nobody has deleted the tuple.
+ */
+ int64_t del_psn;
+ /**
+ * List of trackers - transactions that has read this tuple.
+ */
+ struct rlist reader_list;
+ /**
+ * Link in tx_manager::all_stories
+ */
+ struct rlist in_all_stories;
+ /**
+ * Number of indexes in this space - and the count of link[].
+ */
+ uint32_t index_count;
+ /**
+ * Link with older and newer stories (and just tuples) for each
+ * index respectively.
+ */
+ struct txm_story_link link[];
+};
+
static inline bool
txn_has_flag(struct txn *txn, enum txn_flag flag)
{
@@ -652,6 +739,48 @@ tx_manager_free();
int
txm_cause_conflict(struct txn *wreaker, struct txn *victim);
+struct txm_story *
+txm_story_new(struct tuple *tuple, struct txn_stmt *stmt, uint32_t index_count);
+
+int
+txm_check_and_link_add_story(struct txm_story *story, struct txn_stmt *stmt,
+ enum dup_replace_mode mode);
+
+int
+txm_link_del_story(struct tuple *old_tuple, struct txn_stmt *stmt,
+ uint32_t index_count);
+
+void
+txm_unlink_add_story(struct txn_stmt *stmt);
+
+void
+txm_unlink_del_story(struct txn_stmt *stmt);
+
+void
+txm_prepare_add_story(struct txn_stmt *stmt);
+
+void
+txm_let_it_go(struct txn_stmt *stmt);
+
+struct txm_story *
+txm_story_get(struct tuple *tuple);
+
+struct tuple *
+txm_tuple_clarify_slow(struct txn *txn, struct tuple *tuple, uint32_t index,
+ uint32_t mk_index, bool prepared_ok);
+
+static inline struct tuple*
+txm_tuple_clarify(struct txn *txn, struct tuple* tuple, uint32_t index,
+ uint32_t mk_index, bool prepared_ok)
+{
+ if (!tuple->is_dirty)
+ return tuple;
+ return txm_tuple_clarify_slow(txn, tuple, index, mk_index, prepared_ok);
+}
+
+void
+txm_story_delete(struct txm_story *story);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 13/15] tx: indexes
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (11 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 12/15] tx: introduce txm_story Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 14/15] tx: introduce point conflict tracker Aleksandr Lyapunov
` (2 subsequent siblings)
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/memtx_bitset.c | 28 ++++++++++++-------
src/box/memtx_hash.c | 60 +++++++++++++++++++++++++++++++++++------
src/box/memtx_rtree.c | 27 ++++++++++++++++---
src/box/memtx_tree.c | 73 ++++++++++++++++++++++++++++++++++++++++++++++----
4 files changed, 162 insertions(+), 26 deletions(-)
diff --git a/src/box/memtx_bitset.c b/src/box/memtx_bitset.c
index 67eaf6f..f3ab74f 100644
--- a/src/box/memtx_bitset.c
+++ b/src/box/memtx_bitset.c
@@ -40,6 +40,7 @@
#include "fiber.h"
#include "index.h"
#include "tuple.h"
+#include "txn.h"
#include "memtx_engine.h"
struct memtx_bitset_index {
@@ -198,19 +199,26 @@ bitset_index_iterator_next(struct iterator *iterator, struct tuple **ret)
assert(iterator->free == bitset_index_iterator_free);
struct bitset_index_iterator *it = bitset_index_iterator(iterator);
- size_t value = tt_bitset_iterator_next(&it->bitset_it);
- if (value == SIZE_MAX) {
- *ret = NULL;
- return 0;
- }
-
+ do {
+ size_t value = tt_bitset_iterator_next(&it->bitset_it);
+ if (value == SIZE_MAX) {
+ *ret = NULL;
+ return 0;
+ }
#ifndef OLD_GOOD_BITSET
- struct memtx_bitset_index *index =
- (struct memtx_bitset_index *)iterator->index;
- *ret = memtx_bitset_index_value_to_tuple(index, value);
+ struct memtx_bitset_index *index =
+ (struct memtx_bitset_index *)iterator->index;
+ struct tuple *tuple =
+ memtx_bitset_index_value_to_tuple(index, value);
#else /* #ifndef OLD_GOOD_BITSET */
- *ret = value_to_tuple(value);
+ struct tuple *tuple =value_to_tuple(value);
#endif /* #ifndef OLD_GOOD_BITSET */
+ uint32_t iid = iterator->index->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, tuple, iid, 0, is_rw);
+ } while (*ret == NULL);
+
return 0;
}
diff --git a/src/box/memtx_hash.c b/src/box/memtx_hash.c
index cdd531c..b3ae60c 100644
--- a/src/box/memtx_hash.c
+++ b/src/box/memtx_hash.c
@@ -33,6 +33,7 @@
#include "fiber.h"
#include "index.h"
#include "tuple.h"
+#include "txn.h"
#include "memtx_engine.h"
#include "space.h"
#include "schema.h" /* space_cache_find() */
@@ -101,7 +102,7 @@ hash_iterator_free(struct iterator *iterator)
}
static int
-hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
+hash_iterator_ge_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
struct hash_iterator *it = (struct hash_iterator *) ptr;
@@ -113,10 +114,10 @@ hash_iterator_ge(struct iterator *ptr, struct tuple **ret)
}
static int
-hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
+hash_iterator_gt_base(struct iterator *ptr, struct tuple **ret)
{
assert(ptr->free == hash_iterator_free);
- ptr->next = hash_iterator_ge;
+ ptr->next = hash_iterator_ge_base;
struct hash_iterator *it = (struct hash_iterator *) ptr;
struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
@@ -128,6 +129,31 @@ hash_iterator_gt(struct iterator *ptr, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct txn *txn = in_txn(); \
+ bool is_rw = txn != NULL; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool first = true; \
+ do { \
+ int rc = first ? name##_base(iterator, ret) \
+ : hash_iterator_ge_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ first = false; \
+ *ret = txm_tuple_clarify(txn, *ret, iid, 0, is_rw); \
+ } while (*ret == NULL); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(hash_iterator_ge);
+WRAP_ITERATOR_METHOD(hash_iterator_gt);
+
+#undef WRAP_ITERATOR_METHOD
+
static int
hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
{
@@ -136,12 +162,25 @@ hash_iterator_eq_next(MAYBE_UNUSED struct iterator *it, struct tuple **ret)
}
static int
-hash_iterator_eq(struct iterator *it, struct tuple **ret)
+hash_iterator_eq(struct iterator *ptr, struct tuple **ret)
{
- it->next = hash_iterator_eq_next;
- return hash_iterator_ge(it, ret);
+ ptr->next = hash_iterator_eq_next;
+ assert(ptr->free == hash_iterator_free);
+ struct hash_iterator *it = (struct hash_iterator *) ptr;
+ struct memtx_hash_index *index = (struct memtx_hash_index *)ptr->index;
+ struct tuple **res = light_index_iterator_get_and_next(&index->hash_table,
+ &it->iterator);
+ if (res == NULL) {
+ *ret = NULL;
+ return 0;
+ }
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, *res, ptr->index->def->iid, 0, is_rw);
+ return 0;
}
+
/* }}} */
/* {{{ MemtxHash -- implementation of all hashes. **********************/
@@ -282,8 +321,13 @@ memtx_hash_index_get(struct index *base, const char *key,
*result = NULL;
uint32_t h = key_hash(key, base->def->key_def);
uint32_t k = light_index_find_key(&index->hash_table, h, key);
- if (k != light_index_end)
- *result = light_index_get(&index->hash_table, k);
+ if (k != light_index_end) {
+ struct tuple *tuple = light_index_get(&index->hash_table, k);
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *result = txm_tuple_clarify(txn, tuple, iid, 0, is_rw);
+ }
return 0;
}
diff --git a/src/box/memtx_rtree.c b/src/box/memtx_rtree.c
index 612fcb2..992a422 100644
--- a/src/box/memtx_rtree.c
+++ b/src/box/memtx_rtree.c
@@ -40,6 +40,7 @@
#include "trivia/util.h"
#include "tuple.h"
+#include "txn.h"
#include "space.h"
#include "memtx_engine.h"
@@ -148,7 +149,15 @@ static int
index_rtree_iterator_next(struct iterator *i, struct tuple **ret)
{
struct index_rtree_iterator *itr = (struct index_rtree_iterator *)i;
- *ret = (struct tuple *)rtree_iterator_next(&itr->impl);
+ do {
+ *ret = (struct tuple *) rtree_iterator_next(&itr->impl);
+ if (*ret == NULL)
+ break;
+ uint32_t iid = i->index->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *ret = txm_tuple_clarify(txn, *ret, iid, 0, is_rw);
+ } while (*ret == NULL);
return 0;
}
@@ -213,8 +222,20 @@ memtx_rtree_index_get(struct index *base, const char *key,
unreachable();
*result = NULL;
- if (rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator))
- *result = (struct tuple *)rtree_iterator_next(&iterator);
+ if (!rtree_search(&index->tree, &rect, SOP_OVERLAPS, &iterator)) {
+ rtree_iterator_destroy(&iterator);
+ return 0;
+ }
+ do {
+ struct tuple *tuple = (struct tuple *)
+ rtree_iterator_next(&iterator);
+ if (tuple == NULL)
+ break;
+ uint32_t iid = base->def->iid;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ *result = txm_tuple_clarify(txn, tuple, iid, 0, is_rw);
+ } while (*result == NULL);
rtree_iterator_destroy(&iterator);
return 0;
}
diff --git a/src/box/memtx_tree.c b/src/box/memtx_tree.c
index 76ff3fc..b77c85c 100644
--- a/src/box/memtx_tree.c
+++ b/src/box/memtx_tree.c
@@ -37,6 +37,7 @@
#include "fiber.h"
#include "key_list.h"
#include "tuple.h"
+#include "txn.h"
#include <third_party/qsort_arg.h>
#include <small/mempool.h>
@@ -175,7 +176,7 @@ tree_iterator_dummie(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -205,7 +206,7 @@ tree_iterator_next(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -234,7 +235,7 @@ tree_iterator_prev(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_next_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -270,7 +271,7 @@ tree_iterator_next_equal(struct iterator *iterator, struct tuple **ret)
}
static int
-tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
+tree_iterator_prev_equal_base(struct iterator *iterator, struct tuple **ret)
{
struct memtx_tree_index *index =
(struct memtx_tree_index *)iterator->index;
@@ -304,6 +305,45 @@ tree_iterator_prev_equal(struct iterator *iterator, struct tuple **ret)
return 0;
}
+#define WRAP_ITERATOR_METHOD(name) \
+static int \
+name(struct iterator *iterator, struct tuple **ret) \
+{ \
+ struct memtx_tree *tree = \
+ &((struct memtx_tree_index *)iterator->index)->tree; \
+ struct tree_iterator *it = tree_iterator(iterator); \
+ struct memtx_tree_iterator *ti = &it->tree_iterator; \
+ uint32_t iid = iterator->index->def->iid; \
+ bool is_multikey = iterator->index->def->key_def->is_multikey; \
+ struct txn *txn = in_txn(); \
+ bool is_rw = txn != NULL; \
+ do { \
+ int rc = name##_base(iterator, ret); \
+ if (rc != 0 || *ret == NULL) \
+ return rc; \
+ uint32_t mk_index = 0; \
+ if (is_multikey) { \
+ struct memtx_tree_data *check = \
+ memtx_tree_iterator_get_elem(tree, ti); \
+ assert(check != NULL); \
+ mk_index = check->hint; \
+ } \
+ *ret = txm_tuple_clarify(txn, *ret, iid, mk_index, is_rw); \
+ } while (*ret == NULL); \
+ tuple_unref(it->current.tuple); \
+ it->current.tuple = *ret; \
+ tuple_ref(it->current.tuple); \
+ return 0; \
+} \
+struct forgot_to_add_semicolon
+
+WRAP_ITERATOR_METHOD(tree_iterator_next);
+WRAP_ITERATOR_METHOD(tree_iterator_prev);
+WRAP_ITERATOR_METHOD(tree_iterator_next_equal);
+WRAP_ITERATOR_METHOD(tree_iterator_prev_equal);
+
+#undef WRAP_ITERATOR_METHOD
+
static void
tree_iterator_set_next_method(struct tree_iterator *it)
{
@@ -388,6 +428,21 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
tuple_ref(*ret);
it->current = *res;
tree_iterator_set_next_method(it);
+
+ uint32_t iid = iterator->index->def->iid;
+ bool is_multikey = iterator->index->def->key_def->is_multikey;
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = is_multikey ? res->hint : 0;
+ *ret = txm_tuple_clarify(txn, *ret, iid, mk_index, is_rw);
+ if (*ret == NULL) {
+ return iterator->next(iterator, ret);
+ } else {
+ tuple_unref(it->current.tuple);
+ it->current.tuple = *ret;
+ tuple_ref(it->current.tuple);
+ }
+
return 0;
}
@@ -539,7 +594,15 @@ memtx_tree_index_get(struct index *base, const char *key,
key_data.part_count = part_count;
key_data.hint = key_hint(key, part_count, cmp_def);
struct memtx_tree_data *res = memtx_tree_find(&index->tree, &key_data);
- *result = res != NULL ? res->tuple : NULL;
+ if (res == NULL) {
+ *result = NULL;
+ return 0;
+ }
+ struct txn *txn = in_txn();
+ bool is_rw = txn != NULL;
+ uint32_t mk_index = base->def->key_def->is_multikey ? res->hint : 0;
+ *result = txm_tuple_clarify(txn, res->tuple, base->def->iid,
+ mk_index, is_rw);
return 0;
}
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 14/15] tx: introduce point conflict tracker
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (12 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 13/15] tx: indexes Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 15/15] tx: use new tx managet in memtx Aleksandr Lyapunov
2020-07-05 17:03 ` [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Vladislav Shpilevoy
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/txn.c | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/box/txn.h | 4 +++
2 files changed, 88 insertions(+), 1 deletion(-)
diff --git a/src/box/txn.c b/src/box/txn.c
index 26377c6..f1a7785 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -75,6 +75,13 @@ struct tx_manager
/** The one and only instance of tx_manager. */
static struct tx_manager txm;
+struct tx_read_tracker {
+ struct txn *reader;
+ struct txm_story *story;
+ struct rlist in_reader_list;
+ struct rlist in_read_set;
+};
+
struct tx_conflict_tracker {
struct txn *wreaker;
struct txn *victim;
@@ -237,6 +244,7 @@ txn_new(void)
}
assert(region_used(®ion) == sizeof(*txn));
txn->region = region;
+ rlist_create(&txn->read_set);
rlist_create(&txn->conflict_list);
rlist_create(&txn->conflicted_by_list);
return txn;
@@ -248,6 +256,14 @@ txn_new(void)
inline static void
txn_free(struct txn *txn)
{
+ struct tx_read_tracker *tracker, *tmp;
+ rlist_foreach_entry_safe(tracker, &txn->read_set,
+ in_read_set, tmp) {
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ }
+ assert(rlist_empty(&txn->read_set));
+
struct tx_conflict_tracker *entry, *next;
rlist_foreach_entry_safe(entry, &txn->conflict_list,
in_conflict_list, next) {
@@ -654,7 +670,6 @@ txn_prepare(struct txn *txn)
rlist_del(&entry->in_conflicted_by_list);
}
-
trigger_clear(&txn->fiber_on_stop);
if (!txn_has_flag(txn, TXN_CAN_YIELD))
trigger_clear(&txn->fiber_on_yield);
@@ -1585,3 +1600,71 @@ txm_story_delete(struct txm_story *story)
struct mempool *pool = &txm.txm_story_pool[story->index_count];
mempool_free(pool, story);
}
+
+int
+tx_track_read(struct txn *txn, struct tuple *tuple, uint32_t index_count)
+{
+ if (tuple == NULL)
+ return 0;
+ if (txn == NULL)
+ return 0;
+
+ struct txm_story *story;
+ struct tx_read_tracker *tracker = NULL;
+
+ if (!tuple->is_dirty) {
+ story = txm_story_new(tuple, NULL, index_count);
+ if (story != NULL)
+ return -1;
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ txm_story_delete(story);
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+ }
+ story = txm_story_get(tuple);
+
+ struct rlist *r1 = story->reader_list.next;
+ struct rlist *r2 = txn->read_set.next;
+ while (r1 != &story->reader_list && r2 != &txn->read_set) {
+ tracker = rlist_entry(r1, struct tx_read_tracker,
+ in_reader_list);
+ assert(tracker->story == story);
+ if (tracker->reader == txn)
+ break;
+ tracker = rlist_entry(r2, struct tx_read_tracker,
+ in_read_set);
+ assert(tracker->reader == txn);
+ if (tracker->story == story)
+ break;
+ tracker = NULL;
+ r1 = r1->next;
+ r2 = r2->next;
+ }
+ if (tracker != NULL) {
+ /* Move to the beginning of a list for faster further lookups.*/
+ rlist_del(&tracker->in_reader_list);
+ rlist_del(&tracker->in_read_set);
+ } else {
+ size_t sz;
+ tracker = region_alloc_object(&txn->region,
+ struct tx_read_tracker, &sz);
+ if (tracker == NULL) {
+ diag_set(OutOfMemory, sz, "tx region", "read_tracker");
+ return -1;
+ }
+ tracker->reader = txn;
+ tracker->story = story;
+ }
+ rlist_add(&story->reader_list, &tracker->in_reader_list);
+ rlist_add(&txn->read_set, &tracker->in_read_set);
+ return 0;
+}
diff --git a/src/box/txn.h b/src/box/txn.h
index b5eda1e..4567f2c 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -293,6 +293,7 @@ struct txn {
uint32_t fk_deferred_count;
/** List of savepoints to find savepoint by name. */
struct rlist savepoints;
+ struct rlist read_set;
struct rlist conflict_list;
struct rlist conflicted_by_list;
};
@@ -781,6 +782,9 @@ txm_tuple_clarify(struct txn *txn, struct tuple* tuple, uint32_t index,
void
txm_story_delete(struct txm_story *story);
+int
+tx_track_read(struct txn *txn, struct tuple *tuple, uint32_t index_count);
+
#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* [Tarantool-patches] [PATCH 15/15] tx: use new tx managet in memtx
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (13 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 14/15] tx: introduce point conflict tracker Aleksandr Lyapunov
@ 2020-07-03 6:33 ` Aleksandr Lyapunov
2020-07-05 17:03 ` [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Vladislav Shpilevoy
15 siblings, 0 replies; 27+ messages in thread
From: Aleksandr Lyapunov @ 2020-07-03 6:33 UTC (permalink / raw)
To: tarantool-patches
---
src/box/memtx_engine.c | 48 +++++++++++++++++++++++++-----------------------
src/box/memtx_space.c | 45 +++++++++++++++++++++++++++++----------------
src/box/txn.c | 5 +++++
3 files changed, 59 insertions(+), 39 deletions(-)
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index af80d9e..7fc495c 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -339,6 +339,29 @@ memtx_engine_begin(struct engine *engine, struct txn *txn)
return 0;
}
+static int
+memtx_engine_prepare(struct engine *engine, struct txn *txn)
+{
+ (void)engine;
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next) {
+ if (stmt->new_tuple != NULL)
+ txm_prepare_add_story(stmt);
+ }
+ return 0;
+}
+
+static void
+memtx_engine_commit(struct engine *engine, struct txn *txn)
+{
+ (void)engine;
+ struct txn_stmt *stmt;
+ stailq_foreach_entry(stmt, &txn->stmts, next) {
+ if (stmt->new_tuple != NULL)
+ txm_let_it_go(stmt);
+ }
+}
+
static void
memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
struct txn_stmt *stmt)
@@ -348,32 +371,11 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn,
if (stmt->old_tuple == NULL && stmt->new_tuple == NULL)
return;
struct space *space = stmt->space;
- struct memtx_space *memtx_space = (struct memtx_space *)space;
- uint32_t index_count;
/* Only roll back the changes if they were made. */
if (stmt->engine_savepoint == NULL)
return;
- if (memtx_space->replace == memtx_space_replace_all_keys)
- index_count = space->index_count;
- else if (memtx_space->replace == memtx_space_replace_primary_key)
- index_count = 1;
- else
- panic("transaction rolled back during snapshot recovery");
-
- for (uint32_t i = 0; i < index_count; i++) {
- struct tuple *unused;
- struct index *index = space->index[i];
- /* Rollback must not fail. */
- if (index_replace(index, stmt->new_tuple, stmt->old_tuple,
- DUP_INSERT, &unused) != 0) {
- diag_log();
- unreachable();
- panic("failed to rollback change");
- }
- }
-
memtx_space_update_bsize(space, stmt->new_tuple, stmt->old_tuple);
if (stmt->old_tuple != NULL)
tuple_ref(stmt->old_tuple);
@@ -914,8 +916,8 @@ static const struct engine_vtab memtx_engine_vtab = {
/* .complete_join = */ memtx_engine_complete_join,
/* .begin = */ memtx_engine_begin,
/* .begin_statement = */ generic_engine_begin_statement,
- /* .prepare = */ generic_engine_prepare,
- /* .commit = */ generic_engine_commit,
+ /* .prepare = */ memtx_engine_prepare,
+ /* .commit = */ memtx_engine_commit,
/* .rollback_statement = */ memtx_engine_rollback_statement,
/* .rollback = */ generic_engine_rollback,
/* .switch_to_ro = */ generic_engine_switch_to_ro,
diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c
index 5820c40..6cfa634 100644
--- a/src/box/memtx_space.c
+++ b/src/box/memtx_space.c
@@ -244,6 +244,10 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple,
struct tuple **result)
{
struct memtx_engine *memtx = (struct memtx_engine *)space->engine;
+ struct txn *txn = in_txn();
+ struct txn_stmt *stmt = txn_current_stmt(txn);
+ uint32_t index_count = space->index_count;
+
/*
* Ensure we have enough slack memory to guarantee
* successful statement-level rollback.
@@ -253,27 +257,31 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple,
RESERVE_EXTENTS_BEFORE_DELETE) != 0)
return -1;
- uint32_t i = 0;
-
/* Update the primary key */
struct index *pk = index_find(space, 0);
if (pk == NULL)
return -1;
assert(pk->def->opts.is_unique);
- /*
- * If old_tuple is not NULL, the index has to
- * find and delete it, or return an error.
- */
- if (index_replace(pk, old_tuple, new_tuple, mode, &old_tuple) != 0)
- return -1;
- assert(old_tuple || new_tuple);
- /* Update secondary keys. */
- for (i++; i < space->index_count; i++) {
- struct tuple *unused;
- struct index *index = space->index[i];
- if (index_replace(index, old_tuple, new_tuple,
- DUP_INSERT, &unused) != 0)
+ uint32_t i = 0;
+ struct txm_story* story = NULL;
+ if (new_tuple != NULL) {
+ story = txm_story_new(new_tuple, stmt, index_count);
+ if (story == NULL)
+ return -1;
+
+ for (; i < index_count; i++) {
+ struct index *index = space->index[i];
+ struct tuple **replaced = &story->link[i].old_tuple;
+ if (index_replace(index, NULL, new_tuple,
+ DUP_REPLACE_OR_INSERT, replaced) != 0)
+ goto rollback;
+ }
+ if (txm_check_and_link_add_story(story, stmt, mode) != 0)
+ goto rollback;
+ } else {
+ assert(old_tuple != NULL);
+ if (txm_link_del_story(old_tuple, stmt, index_count) != 0)
goto rollback;
}
@@ -287,13 +295,18 @@ rollback:
for (; i > 0; i--) {
struct tuple *unused;
struct index *index = space->index[i - 1];
+ struct tuple *replaced = story->link[i - 1].old_tuple;
/* Rollback must not fail. */
- if (index_replace(index, new_tuple, old_tuple,
+ if (index_replace(index, new_tuple, replaced,
DUP_INSERT, &unused) != 0) {
diag_log();
unreachable();
panic("failed to rollback change");
}
+ if (replaced != NULL) {
+ tuple_unref(replaced);
+ story->link[i - 1].old_tuple = NULL;
+ }
}
return -1;
}
diff --git a/src/box/txn.c b/src/box/txn.c
index f1a7785..e0ff0e7 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -170,6 +170,11 @@ txn_stmt_new(struct region *region)
static inline void
txn_stmt_destroy(struct txn_stmt *stmt)
{
+ if (stmt->add_story != NULL)
+ txm_unlink_add_story(stmt);
+ if (stmt->del_story != NULL)
+ txm_unlink_del_story(stmt);
+
if (stmt->old_tuple != NULL)
tuple_unref(stmt->old_tuple);
if (stmt->new_tuple != NULL)
--
2.7.4
^ permalink raw reply [flat|nested] 27+ messages in thread
* Re: [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine
2020-07-03 6:33 [Tarantool-patches] [PATCH 00/15] Transaction engine for memtx engine Aleksandr Lyapunov
` (14 preceding siblings ...)
2020-07-03 6:33 ` [Tarantool-patches] [PATCH 15/15] tx: use new tx managet in memtx Aleksandr Lyapunov
@ 2020-07-05 17:03 ` Vladislav Shpilevoy
2020-07-06 13:29 ` Aleksandr Lyapunov
15 siblings, 1 reply; 27+ messages in thread
From: Vladislav Shpilevoy @ 2020-07-05 17:03 UTC (permalink / raw)
To: Aleksandr Lyapunov, tarantool-patches
Hi!
Could you please provide branch and issue links, and put the
reviewers in CC/TO? Like described here
https://github.com/tarantool/tarantool/wiki/Code-review-procedure.
I assume the branch is this:
https://github.com/tarantool/tarantool/tree/alyapunov/gh-4897-memtx-tx-engine-v6
And the issue is this:
https://github.com/tarantool/tarantool/issues/4897
The CI looks very broken:
https://gitlab.com/tarantool/tarantool/-/pipelines/162943109
I can't build the branch either.
Besides, I see a dirty commit 'fixme' on the branch. What is it?
> Work is still in progress but the patchset is ready for discussions.
Would be nice to have some document with general overview of what is
done. Usually this is an RFC. Or at least in the cover letter or in
the commit messages and comments. This feature is definitely not an
easy patch to understand from sources only in a short time.
^ permalink raw reply [flat|nested] 27+ messages in thread