Tarantool development patches archive
 help / color / mirror / Atom feed
From: Nikita Pettik <korablev@tarantool.org>
To: tarantool-patches@dev.tarantool.org
Cc: v.shpilevoy@tarantool.org
Subject: [Tarantool-patches] [PATCH 2/2] vinyl: rework upsert operation
Date: Wed, 29 Jul 2020 04:15:42 +0300	[thread overview]
Message-ID: <14aec86329c7820470e40e47a3d1b5655b531732.1595985135.git.korablev@tarantool.org> (raw)
In-Reply-To: <cover.1595985135.git.korablev@tarantool.org>
In-Reply-To: <cover.1595985135.git.korablev@tarantool.org>

Previous upsert implementation had a few drawback which led to several
bugs and issues.

Issue #5092 (redundant update operations execution)

In a nutshell, application of upsert(s) consists of two actions
(see vy_apply_upsert()): execute and squash. Consider example:

insert({1, 1})  -- stored on disk
upsert({1}, {{'-', 2, 20}}) -- old ups1
upsert({1}, {{'+', 2, 10}}) -- new ups2

'Execute' takes update operations from the new upsert and combines them
with key of the old upsert.  {1} --> {'+', 2, 10} can't be evaluated since
key consists of only one field. Note that in case upsert doesn't fold
into insert the upsert's tuple and the tuple stored in index are
different. In our particular case, tuple stored on disk has two fields,
so upsert's update operation can be applied to it. If upsert's operation
can't be executed using key of old upsert, we simply continue processing
squash step. Squash is a combination of update operations: arithmetic
operations are combined so we don't have to store actions over the same
field; the rest operations - are merged into single array. As a result,
we get one upsert with combined operations: upsert({1}, {{'+', 2, -10}}).
Then vy_apply_upsert is called again to apply new upsert on the top of
terminal statement - insert{1, 1}. Since now tuple has second field,
update operations can be executed and corresponding result is {1, -9}
which in turn is the final result of upsert merging procedure.
Now imagine that we have following upserts:

upsert({1, 1}, {{'-', 2, 20}}) -- old ups1
upsert({1}, {{'+', 2, 10}}) -- new ups2

In this case tuple execution successfully finishes and modifies upsert's
tuple: {2, 1} --> {'+', 2, 10} == {2, 11}
However, we still have to squash/accumulate update operations since they
should be applied on tuple stored on disk later. After all, at the we
have next upsert: upsert({2, 11}, {{'+', 2, -10}}). Then it is applied
on the top of insert({1, 1}) and we get the same result as in the first
case - {1, -9}. The only difference is that upsert's tuple was modified.
As one can see, execution of update operations applied to upsert's tuple
is redundant in the case index already contains tuple with the same key
(i.e. when upserts turns into update). Instead, we are able to
accumulate/squash update operations only. When the last upsert is being
applied, we can either execute all update operation on tuple fetched
from index (i.e. upsert is update) OR on tuple specified in the first
upsert (i.e. first upsert is insert).

Issue #5105 (upsert doesn't follow associative property)

Secondly, current approach breaks associative property: after upserts'
update operations are merged into one array, part of them (related to
one upsert) can be skipped, meanwhile the rest - is applied. For
instance:

-- Index is over second field.
i = s:create_index('pk', {parts={2, 'uint'}})
s:replace{1, 2, 3, 'default'}
s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
-- First update operation modifies primary key, so upsert must be ignored.
s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})

After merging two upserts we get the next one:
upsert({2, 2, 2}, {{'=', 4, 'upserted'}, {'#', 1, 1}, {'!', 3, 1}}

While we executing update operations, we don't tell ones from different
upserts. Thus, if one operation fails, the rest are ignored as well. As
a result, first upsert won't be applied, even despite the fact it is
absolutely OK.

To resolve this issue, let's group update operations of each upsert into
separate array. So that operations related to particular upsert are
stored in single array. In terms of previous example we will get:
upsert({2, 2, 2}, {{{'=', 4, 'upserted'}}, {{'#', 1, 1}, {'!', 3, 1}}}

Also note that we don't longer have to apply update operations on tuple
in vy_apply_upsert() if we deal with two upserts: it can be done once we
face terminal statement; or if there's no underlying statement (it is
delete op or doesn't exist at all) we apply all update arrays except the
first one on upsert's tuple.

Arithmetic operations still can be combined in case there's no unsigned
fields in space format. Otherwise, result of subtraction can turn out to
be negative and resulting tuple won't satisfy this property.

This patch also introduces format check of upsert application (#1622
issue). In case it doesn't satisfy space's format, corresponding error
is logged and upsert is skipped.

Closes #1622
Closes #5105
Closes #5092
Part of #5107
---
 src/box/vinyl.c                 |   2 +-
 src/box/vy_stmt.c               |  28 ++--
 src/box/vy_stmt.h               |   5 +-
 src/box/vy_upsert.c             | 305 +++++++++++++++++++++++++++-------------
 test/unit/vy_iterators_helper.c |   2 +-
 test/vinyl/upsert.result        | 289 +++++++++++++++++++++++++++++++++++++
 test/vinyl/upsert.test.lua      | 121 ++++++++++++++++
 7 files changed, 644 insertions(+), 108 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 32301d7ba..eab688147 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1984,7 +1984,7 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm,
 	operations[0].iov_base = (void *)expr;
 	operations[0].iov_len = expr_end - expr;
 	vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end,
-				    operations, 1);
+				    operations, 1, false);
 	if (vystmt == NULL)
 		return -1;
 	assert(vy_stmt_type(vystmt) == IPROTO_UPSERT);
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 92e0aa1c5..f1833e0b6 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -313,16 +313,22 @@ vy_key_dup(const char *key)
 static struct tuple *
 vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 		     const char *tuple_end, struct iovec *ops,
-		     int op_count, enum iproto_type type)
+		     int op_count, enum iproto_type type, bool is_ops_encoded)
 {
 	mp_tuple_assert(tuple_begin, tuple_end);
 
 	const char *tmp = tuple_begin;
 	mp_decode_array(&tmp);
 
+	/*
+	 * ops are grouped in one extra array.
+	 * See vy_apply_upsert() for details.
+	 */
 	size_t ops_size = 0;
 	for (int i = 0; i < op_count; ++i)
 		ops_size += ops[i].iov_len;
+	if (! is_ops_encoded)
+		ops_size += mp_sizeof_array(op_count);
 
 	struct tuple *stmt = NULL;
 	struct region *region = &fiber()->gc;
@@ -360,6 +366,8 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	field_map_build(&builder, wpos - field_map_size);
 	memcpy(wpos, tuple_begin, mpsize);
 	wpos += mpsize;
+	if (! is_ops_encoded)
+		wpos = mp_encode_array(wpos, op_count);
 	for (struct iovec *op = ops, *end = ops + op_count;
 	     op != end; ++op) {
 		memcpy(wpos, op->iov_base, op->iov_len);
@@ -374,10 +382,11 @@ end:
 struct tuple *
 vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end, struct iovec *operations,
-		   uint32_t ops_cnt)
+		   uint32_t ops_cnt, bool is_ops_encoded)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    operations, ops_cnt, IPROTO_UPSERT);
+				    operations, ops_cnt, IPROTO_UPSERT,
+				    is_ops_encoded);
 }
 
 struct tuple *
@@ -385,7 +394,7 @@ vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin,
 		    const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_REPLACE);
+				    NULL, 0, IPROTO_REPLACE, true);
 }
 
 struct tuple *
@@ -393,7 +402,7 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_INSERT);
+				    NULL, 0, IPROTO_INSERT, true);
 }
 
 struct tuple *
@@ -401,7 +410,7 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_DELETE);
+				    NULL, 0, IPROTO_DELETE, true);
 }
 
 struct tuple *
@@ -735,19 +744,20 @@ vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format)
 		/* Always use key format for DELETE statements. */
 		stmt = vy_stmt_new_with_ops(env->key_format,
 					    request.key, request.key_end,
-					    NULL, 0, IPROTO_DELETE);
+					    NULL, 0, IPROTO_DELETE, true);
 		break;
 	case IPROTO_INSERT:
 	case IPROTO_REPLACE:
 		stmt = vy_stmt_new_with_ops(format, request.tuple,
 					    request.tuple_end,
-					    NULL, 0, request.type);
+					    NULL, 0, request.type, true);
 		break;
 	case IPROTO_UPSERT:
 		ops.iov_base = (char *)request.ops;
 		ops.iov_len = request.ops_end - request.ops;
 		stmt = vy_stmt_new_upsert(format, request.tuple,
-					  request.tuple_end, &ops, 1);
+					  request.tuple_end, &ops,
+					  1, true);
 		break;
 	default:
 		/* TODO: report filename. */
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 1b718e26b..4619fa218 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -534,6 +534,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
  * @param part_count Part count from key definition.
  * @param operations Vector of update operations.
  * @param ops_cnt Length of the update operations vector.
+ * @param is_ops_encoded True, if update operations are already packed
+  *                      into extra msgpack array.
  *
  * @retval NULL     Memory allocation error.
  * @retval not NULL Success.
@@ -541,7 +543,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
 struct tuple *
 vy_stmt_new_upsert(struct tuple_format *format,
 		   const char *tuple_begin, const char *tuple_end,
-		   struct iovec *operations, uint32_t ops_cnt);
+		   struct iovec *operations, uint32_t ops_cnt,
+		   bool is_ops_encoded);
 
 /**
  * Create REPLACE statement from UPSERT statement.
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index 797492c2b..caf2482c7 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -68,12 +68,173 @@ vy_upsert_try_to_squash(struct tuple_format *format,
 	operations[0].iov_len = squashed_size;
 
 	*result_stmt = vy_stmt_new_upsert(format, key_mp, key_mp_end,
-					  operations, 1);
+					  operations, 1, false);
 	if (*result_stmt == NULL)
 		return -1;
 	return 0;
 }
 
+/**
+ * Check that key hasn't been changed after applying upsert operation.
+ */
+static bool
+vy_apply_result_does_cross_pk(struct tuple *old_stmt, const char *result,
+			      const char *result_end, struct key_def *cmp_def,
+			      uint64_t col_mask)
+{
+	if (!key_update_can_be_skipped(cmp_def->column_mask, col_mask)) {
+		struct tuple *tuple =
+			vy_stmt_new_replace(tuple_format(old_stmt), result,
+					    result_end);
+		int cmp_res = vy_stmt_compare(old_stmt, HINT_NONE, tuple,
+					       HINT_NONE, cmp_def);
+		tuple_unref(tuple);
+		return cmp_res != 0;
+	}
+	return false;
+}
+
+/**
+ * Apply update operations stored in @new_stmt (which is assumed to
+ * be upsert statement) on tuple @old_stmt. If @old_stmt is void
+ * statement (i.e. it is NULL or delete statement) then operations
+ * are applied on tuple @new_stmt. All operations which can't be
+ * applied are skipped; errors may be logged depending on @supress_error
+ * flag.
+ *
+ * @upsert Upsert statement to be applied on @stmt.
+ * @stmt Statement to be used as base for upsert operations.
+ * @cmp_def Key definition required to provide check of primary key
+ *          modification.
+ * @retrun Tuple containing result of upsert application;
+ *         NULL in case OOM.
+ */
+static struct tuple *
+vy_apply_upsert_on_terminal_stmt(struct tuple *upsert, struct tuple *stmt,
+				 struct key_def *cmp_def, bool suppress_error)
+{
+	assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
+	assert(stmt == NULL || vy_stmt_type(stmt) != IPROTO_UPSERT);
+
+	uint32_t mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(upsert, &mp_size);
+	/* Msgpack containing result of upserts application. */
+	const char *result_mp;
+	if (vy_stmt_is_void(stmt))
+		result_mp = vy_upsert_data_range(upsert, &mp_size);
+	else
+		result_mp = tuple_data_range(stmt, &mp_size);
+	const char *result_mp_end = result_mp + mp_size;
+	/*
+	 * xrow_upsert_execute() allocates result using region,
+	 * so save starting point to release it later.
+	 */
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	uint64_t column_mask = COLUMN_MASK_FULL;
+	struct tuple_format *format = tuple_format(upsert);
+
+	uint32_t ups_cnt = mp_decode_array(&new_ops);
+	const char *ups_ops = new_ops;
+	/*
+	 * In case upsert folds into insert, we must skip first
+	 * update operations.
+	 */
+	if (vy_stmt_is_void(stmt)) {
+		ups_cnt--;
+		mp_next(&ups_ops);
+	}
+	for (uint32_t i = 0; i < ups_cnt; ++i) {
+		assert(mp_typeof(*ups_ops) == MP_ARRAY);
+		const char *ups_ops_end = ups_ops;
+		mp_next(&ups_ops_end);
+		const char *exec_res = result_mp;
+		exec_res = xrow_upsert_execute(ups_ops, ups_ops_end, result_mp,
+					       result_mp_end, format, &mp_size,
+					       0, suppress_error, &column_mask);
+		if (exec_res == NULL) {
+			if (! suppress_error) {
+				assert(diag_last_error(diag_get()) != NULL);
+				struct error *e = diag_last_error(diag_get());
+				/* Bail out immediately in case of OOM. */
+				if (e->type != &type_ClientError) {
+					region_truncate(region, region_svp);
+					return NULL;
+				}
+				diag_log();
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		/*
+		 * If it turns out that resulting tuple modifies primary
+		 * key, than simply ignore this upsert.
+		 */
+		if (vy_apply_result_does_cross_pk(stmt, exec_res,
+						  exec_res + mp_size, cmp_def,
+						  column_mask)) {
+			if (! suppress_error) {
+				say_error("upsert operations %s are not applied"\
+					  " due to primary key modification",
+					  mp_str(ups_ops));
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		ups_ops = ups_ops_end;
+		/*
+		 * In case statement exists its format must
+		 * satisfy space's format. Otherwise, upsert's
+		 * tuple is checked to fit format once it is
+		 * processed in vy_upsert().
+		 */
+		if (stmt != NULL) {
+			if (tuple_validate_raw(tuple_format(stmt),
+					       exec_res) != 0) {
+				if (! suppress_error)
+					diag_log();
+				continue;
+			}
+		}
+		result_mp = exec_res;
+		result_mp_end = exec_res + mp_size;
+	}
+	struct tuple *new_terminal_stmt = vy_stmt_new_replace(format, result_mp,
+							      result_mp_end);
+	region_truncate(region, region_svp);
+	if (new_terminal_stmt == NULL)
+		return NULL;
+	vy_stmt_set_lsn(new_terminal_stmt, vy_stmt_lsn(upsert));
+	return new_terminal_stmt;
+}
+
+static bool
+tuple_format_is_suitable_for_squash(struct tuple_format *format)
+{
+	struct tuple_field *field;
+	json_tree_foreach_entry_preorder(field, &format->fields.root,
+					 struct tuple_field, token) {
+		if (field->type == FIELD_TYPE_UNSIGNED)
+				return false;
+	}
+	return true;
+}
+
+/**
+ * Unpack upsert's update operations from msgpack array
+ * into array of iovecs.
+ */
+static void
+upsert_ops_to_iovec(const char *ops, uint32_t ops_cnt, struct iovec *iov_arr)
+{
+	for (uint32_t i = 0; i < ops_cnt; ++i) {
+		assert(mp_typeof(*ops) == MP_ARRAY);
+		iov_arr[i].iov_base = (char *) ops;
+		mp_next(&ops);
+		iov_arr[i].iov_len = ops - (char *) iov_arr[i].iov_base;
+	}
+}
+
 struct tuple *
 vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt,
 		struct key_def *cmp_def, bool suppress_error)
@@ -87,122 +248,74 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt,
 	assert(new_stmt != old_stmt);
 	assert(vy_stmt_type(new_stmt) == IPROTO_UPSERT);
 
-	if (old_stmt == NULL || vy_stmt_type(old_stmt) == IPROTO_DELETE) {
-		/*
-		 * INSERT case: return new stmt.
-		 */
-		return vy_stmt_replace_from_upsert(new_stmt);
+	struct tuple *result_stmt = NULL;
+	if (old_stmt == NULL || vy_stmt_type(old_stmt) != IPROTO_UPSERT) {
+		return vy_apply_upsert_on_terminal_stmt(new_stmt, old_stmt,
+						        cmp_def, suppress_error);
 	}
 
-	struct tuple_format *format = tuple_format(new_stmt);
-
+	assert(vy_stmt_type(old_stmt) == IPROTO_UPSERT);
 	/*
-	 * Unpack UPSERT operation from the new stmt
+	 * Unpack UPSERT operation from the old and new stmts.
 	 */
+	assert(old_stmt != NULL);
 	uint32_t mp_size;
-	const char *new_ops;
-	new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
-	const char *new_ops_end = new_ops + mp_size;
+	const char *old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
+	const char *old_ops_end = old_ops + mp_size;
+	assert(old_ops_end > old_ops);
+	const char *old_stmt_mp = vy_upsert_data_range(old_stmt, &mp_size);
+	const char *old_stmt_mp_end = old_stmt_mp + mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
 
 	/*
-	 * Apply new operations to the old stmt
+	 * UPSERT + UPSERT case: squash arithmetic operations.
+	 * Note that we can process this only in case result
+	 * can't break format under no circumstances. Since
+	 * subtraction can lead to negative values, unsigned
+	 * field are considered to be inappropriate.
 	 */
-	const char *result_mp;
-	if (vy_stmt_type(old_stmt) == IPROTO_UPSERT)
-		result_mp = vy_upsert_data_range(old_stmt, &mp_size);
-	else
-		result_mp = tuple_data_range(old_stmt, &mp_size);
-	const char *result_mp_end = result_mp + mp_size;
-	struct tuple *result_stmt = NULL;
+	struct tuple_format *format = tuple_format(old_stmt);
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
-	uint8_t old_type = vy_stmt_type(old_stmt);
-	uint64_t column_mask = COLUMN_MASK_FULL;
-	result_mp = xrow_upsert_execute(new_ops, new_ops_end, result_mp,
-					result_mp_end, format, &mp_size,
-					0, suppress_error, &column_mask);
-	if (result_mp == NULL) {
-		region_truncate(region, region_svp);
-		return NULL;
+	if (tuple_format_is_suitable_for_squash(format)) {
+		const char *new_ops_end = new_ops + mp_size;
+		if (vy_upsert_try_to_squash(format, old_stmt_mp, old_stmt_mp_end,
+					    old_ops, old_ops_end, new_ops,
+					    new_ops_end, &result_stmt) != 0) {
+			/* OOM */
+			region_truncate(region, region_svp);
+			return NULL;
+		}
 	}
-	result_mp_end = result_mp + mp_size;
-	if (old_type != IPROTO_UPSERT) {
-		assert(old_type == IPROTO_INSERT ||
-		       old_type == IPROTO_REPLACE);
-		/*
-		 * UPDATE case: return the updated old stmt.
-		 */
-		result_stmt = vy_stmt_new_replace(format, result_mp,
-						  result_mp_end);
-		region_truncate(region, region_svp);
-		if (result_stmt == NULL)
-			return NULL; /* OOM */
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
-	}
-
-	/*
-	 * Unpack UPSERT operation from the old stmt
-	 */
-	assert(old_stmt != NULL);
-	const char *old_ops;
-	old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
-	const char *old_ops_end = old_ops + mp_size;
-	assert(old_ops_end > old_ops);
-
 	/*
-	 * UPSERT + UPSERT case: combine operations
+	 * Adding update operations. We keep order of update operations in
+	 * the array the same. It is vital since first set of operations
+	 * must be skipped in case upsert folds into insert. For instance:
+	 * old_ops = {{{op1}, {op2}}, {{op3}}}
+	 * new_ops = {{{op4}, {op5}}}
+	 * res_ops = {{{op1}, {op2}}, {{op3}}, {{op4}, {op5}}}
+	 * If upsert corresponding to old_ops becomes insert, then
+	 * {{op1}, {op2}} update operations are not applied.
 	 */
-	assert(old_ops_end - old_ops > 0);
-	if (vy_upsert_try_to_squash(format, result_mp, result_mp_end,
-				    old_ops, old_ops_end, new_ops, new_ops_end,
-				    &result_stmt) != 0) {
+	uint32_t old_ops_cnt = mp_decode_array(&old_ops);
+	uint32_t new_ops_cnt = mp_decode_array(&new_ops);
+	size_t ops_size = sizeof(struct iovec) * (old_ops_cnt + new_ops_cnt);
+	struct iovec *operations = region_alloc(region, ops_size);
+	if (operations == NULL) {
 		region_truncate(region, region_svp);
+		diag_set(OutOfMemory, ops_size, "region_alloc", "operations");
 		return NULL;
 	}
-	if (result_stmt != NULL) {
-		region_truncate(region, region_svp);
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
-	}
+	upsert_ops_to_iovec(old_ops, old_ops_cnt, operations);
+	upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt]);
 
-	/* Failed to squash, simply add one upsert to another */
-	int old_ops_cnt, new_ops_cnt;
-	struct iovec operations[3];
-
-	old_ops_cnt = mp_decode_array(&old_ops);
-	operations[1].iov_base = (void *)old_ops;
-	operations[1].iov_len = old_ops_end - old_ops;
-
-	new_ops_cnt = mp_decode_array(&new_ops);
-	operations[2].iov_base = (void *)new_ops;
-	operations[2].iov_len = new_ops_end - new_ops;
-
-	char ops_buf[16];
-	char *header = mp_encode_array(ops_buf, old_ops_cnt + new_ops_cnt);
-	operations[0].iov_base = (void *)ops_buf;
-	operations[0].iov_len = header - ops_buf;
-
-	result_stmt = vy_stmt_new_upsert(format, result_mp, result_mp_end,
-					 operations, 3);
+	result_stmt = vy_stmt_new_upsert(format, old_stmt_mp, old_stmt_mp_end,
+					 operations, old_ops_cnt + new_ops_cnt,
+					 false);
 	region_truncate(region, region_svp);
 	if (result_stmt == NULL)
 		return NULL;
 	vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
 
-check_key:
-	/*
-	 * Check that key hasn't been changed after applying operations.
-	 */
-	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_stmt_compare(old_stmt, HINT_NONE, result_stmt,
-			    HINT_NONE, cmp_def) != 0) {
-		/*
-		 * Key has been changed: ignore this UPSERT and
-		 * @retval the old stmt.
-		 */
-		tuple_unref(result_stmt);
-		result_stmt = vy_stmt_dup(old_stmt);
-	}
 	return result_stmt;
 }
diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c
index 0d20f19ef..15470920b 100644
--- a/test/unit/vy_iterators_helper.c
+++ b/test/unit/vy_iterators_helper.c
@@ -112,7 +112,7 @@ vy_new_simple_stmt(struct tuple_format *format, struct key_def *key_def,
 			ops = mp_encode_int(ops, templ->upsert_value);
 		operations[0].iov_base = tmp;
 		operations[0].iov_len = ops - tmp;
-		ret = vy_stmt_new_upsert(format, buf, pos, operations, 1);
+		ret = vy_stmt_new_upsert(format, buf, pos, operations, 1, true);
 		fail_if(ret == NULL);
 		break;
 	}
diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result
index 3a7f6629d..a20db2ad2 100644
--- a/test/vinyl/upsert.result
+++ b/test/vinyl/upsert.result
@@ -899,3 +899,292 @@ s:select()
 s:drop()
 ---
 ...
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- Test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', 3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1, 200}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', -3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+-- Two upserts are NOT squashed into one, so only one
+-- (first one) is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+---
+...
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+---
+...
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1}
+---
+- - [1, 1]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+---
+...
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+---
+...
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+---
+...
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 300]
+...
+s:drop()
+---
+...
+-- Upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+---
+...
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+---
+- - [1, 1]
+...
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+---
+...
+s:replace{2, 2}
+---
+- [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 2}})
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+box.commit()
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 20}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'=', 3, 30}})
+---
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 3]
+...
+s:upsert({1, 1}, {{'=', 3, 30}})
+---
+...
+s:delete{1}
+---
+...
+s:select{}
+---
+- - [2, 3]
+...
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:upsert({2, 2}, {{'=', 3, 40}})
+---
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:select{}
+---
+- - [2, 13]
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [2, 13]
+...
+s:drop()
+---
+...
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+---
+...
+i = s:create_index('pk', {parts={2, 'uint'}})
+---
+...
+s:replace{1, 2, 3, 'default'}
+---
+- [1, 2, 3, 'default']
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+---
+...
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 2, 3, 'upserted']
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua
index 1d77474da..4a350344d 100644
--- a/test/vinyl/upsert.test.lua
+++ b/test/vinyl/upsert.test.lua
@@ -372,3 +372,124 @@ box.snapshot()
 s:select()
 
 s:drop()
+
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- Test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+pk = s:create_index('pk')
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', 3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+s:select() -- {1, 1, 200}
+
+s:delete({1})
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', -3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+-- Two upserts are NOT squashed into one, so only one
+-- (first one) is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+box.snapshot()
+s:select() -- {1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+box.snapshot()
+s:select()
+
+s:drop()
+
+-- Upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+pk = s:create_index('pk')
+s:replace{1, 1}
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+s:replace{2, 2}
+s:upsert({2, 2}, {{'=', 3, 2}})
+s:select{}
+box.commit()
+s:select{}
+
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+s:select{}
+s:upsert({2, 2}, {{'=', 3, 20}})
+box.snapshot()
+s:select{}
+
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'=', 3, 30}})
+s:replace{2, 3}
+s:select{}
+
+s:upsert({1, 1}, {{'=', 3, 30}})
+s:delete{1}
+s:select{}
+
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:upsert({2, 2}, {{'=', 3, 40}})
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:select{}
+box.snapshot()
+s:select{}
+
+s:drop()
+
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+i = s:create_index('pk', {parts={2, 'uint'}})
+s:replace{1, 2, 3, 'default'}
+box.snapshot()
+
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+box.snapshot()
+
+s:select{}
+
+s:drop()
-- 
2.15.1

  parent reply	other threads:[~2020-07-29  1:15 UTC|newest]

Thread overview: 8+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-07-29  1:15 [Tarantool-patches] [PATCH 0/2] vinyl: rework upsert internals Nikita Pettik
2020-07-29  1:15 ` [Tarantool-patches] [PATCH 1/2] vy_stmt: introduce vy_stmt_is_void() helper Nikita Pettik
2020-07-29  1:15 ` Nikita Pettik [this message]
2020-07-30 23:31   ` [Tarantool-patches] [PATCH 2/2] vinyl: rework upsert operation Vladislav Shpilevoy
2020-08-02 14:44   ` Vladislav Shpilevoy
2020-08-08 14:51     ` Nikita Pettik
2020-07-30 23:32 ` [Tarantool-patches] [PATCH 0/2] vinyl: rework upsert internals Vladislav Shpilevoy
2020-08-08 14:23   ` Nikita Pettik

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=14aec86329c7820470e40e47a3d1b5655b531732.1595985135.git.korablev@tarantool.org \
    --to=korablev@tarantool.org \
    --cc=tarantool-patches@dev.tarantool.org \
    --cc=v.shpilevoy@tarantool.org \
    --subject='Re: [Tarantool-patches] [PATCH 2/2] vinyl: rework upsert operation' \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox