[Tarantool-patches] [PATCH] vinyl: check upsert doesn't modify PK right after execution

Nikita Pettik korablev at tarantool.org
Thu Jun 18 15:10:16 MSK 2020


vy_upsert_apply() consists of two parts: execution and squash.
Assume we have two upserts passing to vy_apply_upsert():
s1(key1, ops1) (old one) and s2(key1, ops2) (new one).
To execute upsert s2 we fetch key1 from s1 and attempt at applying ops2
on key1. Note that in case of memtx engine key1 is always whole tuple
fetched from index. Meanwhile in vinyl for the sake of performance we
don't seek tuple in index. As a result key1 can contain less fields than
the whole tuple stored in index. So when passing -1 as field index of
upsert operation, tuple update routine may modify key value.
For instance:

s:insert{1, 1} -- stored on disk; pk is the first field
s:upsert({1}, {{'+', 2, 200}}) -- s1
s:upsert({1}, {{'=', -1, 100}}) -- s2

In this case {'=', -1, 100} operation is applied to {1} key.
Result of operation is {100} - in other words - primary key has been
modified. As far as upserts modifying PK are ignored, this upsert is
skipped.

To resolve this problem let's check if PK is changed right after
tuple execution procedure. If it is, 'rollback' effects of execution and
continue processing upserts squash using unmodified key:

squash(s1, s2) --> s3(key1, {ops1, ops2})

The only drawback of this approach is that if one of upserts *really*
modifies PK of tuple stored on disk, all squashed operations will be
skipped as well. For example:

s:upsert({1}, {{'+', 2, 100}})
s:upsert({1}, {{'=', -2, 200}})

These two upserts are squashed into one: ({1}, {{'+', 2, 100}, {'=', -2, 200}}
The latter fails to be applied so neither of upserts will be executed
(despite the fact that the first one is absolutely OK).

Closes #5087
---
Branch: https://github.com/tarantool/tarantool/tree/np/gh-5087-upsert-negative-index
Issue: https://github.com/tarantool/tarantool/issues/5087

 src/box/vy_upsert.c                           |  67 +++++----
 .../gh-5087-upsert-negative-fieldno.result    | 131 ++++++++++++++++++
 .../gh-5087-upsert-negative-fieldno.test.lua  |  48 +++++++
 3 files changed, 221 insertions(+), 25 deletions(-)
 create mode 100644 test/vinyl/gh-5087-upsert-negative-fieldno.result
 create mode 100644 test/vinyl/gh-5087-upsert-negative-fieldno.test.lua

diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index 007921bb2..210b7e3a4 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -118,33 +118,63 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
 	/*
 	 * Apply new operations to the old stmt
 	 */
-	const char *result_mp;
+	const char *old_mp = NULL;
+	uint32_t old_mp_size = 0;
 	if (vy_stmt_type(old_stmt) == IPROTO_UPSERT)
-		result_mp = vy_upsert_data_range(old_stmt, &mp_size);
+		old_mp = vy_upsert_data_range(old_stmt, &old_mp_size);
 	else
-		result_mp = tuple_data_range(old_stmt, &mp_size);
-	const char *result_mp_end = result_mp + mp_size;
-	struct tuple *result_stmt = NULL;
+		old_mp = tuple_data_range(old_stmt, &old_mp_size);
+	const char *old_mp_end = old_mp + old_mp_size;
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
 	uint8_t old_type = vy_stmt_type(old_stmt);
 	uint64_t column_mask = COLUMN_MASK_FULL;
-	result_mp = tuple_upsert_execute(vy_update_alloc, region, new_ops,
-					 new_ops_end, result_mp, result_mp_end,
-					 &mp_size, 0, suppress_error,
-					 &column_mask);
+	uint32_t result_mp_size = 0;
+	const char *result_mp =
+		tuple_upsert_execute(vy_update_alloc, region, new_ops,
+				     new_ops_end, old_mp, old_mp_end,
+				     &result_mp_size, 0, suppress_error,
+				     &column_mask);
 	if (result_mp == NULL) {
 		region_truncate(region, region_svp);
 		return NULL;
 	}
-	result_mp_end = result_mp + mp_size;
+
+	const char *result_mp_end = result_mp + result_mp_size;
+	/*
+	 * tuple_upsert_execute() may modify PK field.
+	 * So before moving to further upsert processing, let's
+	 * check if it's true. If so, rollback *_execute effects
+	 * and ignore this UPSERT.
+	 */
+	if (unlikely(!key_update_can_be_skipped(cmp_def->column_mask,
+						column_mask))) {
+		if (vy_tuple_compare_with_raw_key(old_stmt, result_mp,
+						  cmp_def) != 0) {
+			result_mp = old_mp;
+			result_mp_end = old_mp_end;
+			region_truncate(region, region_svp);
+			if (!suppress_error) {
+				say_error("UPSERT operation failed: "
+					  "primary key is not allowed to be "
+					  "modified");
+			}
+		}
+	}
+
 	if (tuple_validate_raw(format, result_mp) != 0) {
 		region_truncate(region, region_svp);
 		return NULL;
 	}
+
+	struct tuple *result_stmt = NULL;
 	if (old_type != IPROTO_UPSERT) {
 		assert(old_type == IPROTO_INSERT ||
 		       old_type == IPROTO_REPLACE);
+		if (result_mp == old_mp) {
+			/* PK was modified. Return old statement. */
+			return vy_stmt_dup(old_stmt);
+		}
 		/*
 		 * UPDATE case: return the updated old stmt.
 		 */
@@ -154,7 +184,7 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
 		if (result_stmt == NULL)
 			return NULL; /* OOM */
 		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
+		return result_stmt;
 	}
 
 	/*
@@ -181,7 +211,7 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
 	if (result_stmt != NULL) {
 		region_truncate(region, region_svp);
 		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
+		return result_stmt;
 	}
 
 	/* Failed to squash, simply add one upsert to another */
@@ -208,18 +238,5 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt,
 		return NULL;
 	vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
 
-check_key:
-	/*
-	 * Check that key hasn't been changed after applying operations.
-	 */
-	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_tuple_compare(old_stmt, result_stmt, cmp_def) != 0) {
-		/*
-		 * Key has been changed: ignore this UPSERT and
-		 * @retval the old stmt.
-		 */
-		tuple_unref(result_stmt);
-		result_stmt = vy_stmt_dup(old_stmt);
-	}
 	return result_stmt;
 }
diff --git a/test/vinyl/gh-5087-upsert-negative-fieldno.result b/test/vinyl/gh-5087-upsert-negative-fieldno.result
new file mode 100644
index 000000000..f01b6c60d
--- /dev/null
+++ b/test/vinyl/gh-5087-upsert-negative-fieldno.result
@@ -0,0 +1,131 @@
+-- test-run result file version 2
+-- Test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+ | ---
+ | ...
+pk = s:create_index('pk')
+ | ---
+ | ...
+s:insert({1, 1, 1})
+ | ---
+ | - [1, 1, 1]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+s:upsert({1}, {{'=', 3, 100}})
+ | ---
+ | ...
+s:upsert({1}, {{'=', -1, 200}})
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select() -- {1, 1, 200}
+ | ---
+ | - - [1, 1, 200]
+ | ...
+
+s:delete({1})
+ | ---
+ | ...
+s:insert({1, 1, 1})
+ | ---
+ | - [1, 1, 1]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+s:upsert({1}, {{'=', -3, 100}})
+ | ---
+ | ...
+s:upsert({1}, {{'=', -1, 200}})
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+-- Two upserts are squashed into one which can't be applied since
+-- one of its operations modifies PK.
+--
+s:select() -- {1, 1, 1}
+ | ---
+ | - - [1, 1, 1]
+ | ...
+
+s:delete({1})
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+ | ---
+ | ...
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+ | ---
+ | ...
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+ | ---
+ | ...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+ | ---
+ | ...
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select() -- {1, 1}
+ | ---
+ | - - [1, 1]
+ | ...
+
+s:delete({1})
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+ | ---
+ | ...
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+ | ---
+ | ...
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+ | ---
+ | ...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+ | ---
+ | ...
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select()
+ | ---
+ | - - [1, 1, 300]
+ | ...
+
+s:drop()
+ | ---
+ | ...
diff --git a/test/vinyl/gh-5087-upsert-negative-fieldno.test.lua b/test/vinyl/gh-5087-upsert-negative-fieldno.test.lua
new file mode 100644
index 000000000..046065e98
--- /dev/null
+++ b/test/vinyl/gh-5087-upsert-negative-fieldno.test.lua
@@ -0,0 +1,48 @@
+-- Test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+pk = s:create_index('pk')
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', 3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+s:select() -- {1, 1, 200}
+
+s:delete({1})
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', -3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+-- Two upserts are squashed into one which can't be applied since
+-- one of its operations modifies PK.
+--
+s:select() -- {1, 1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+box.snapshot()
+s:select() -- {1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+box.snapshot()
+s:select()
+
+s:drop()
-- 
2.17.1



More information about the Tarantool-patches mailing list