Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH V2] vinyl: rework upsert operation
@ 2020-08-08 14:21 Nikita Pettik
  2020-08-12 21:23 ` Vladislav Shpilevoy
  0 siblings, 1 reply; 3+ messages in thread
From: Nikita Pettik @ 2020-08-08 14:21 UTC (permalink / raw)
  To: tarantool-patches; +Cc: v.shpilevoy

Previous upsert implementation had a few drawbacks which led to number
of bugs and issues.

Issue #5092 (redundant update operations execution)

In a nutshell, application of upsert(s) (on top of another upsert)
consists of two actions (see vy_apply_upsert()): execute and squash.
Consider example:

insert({1, 1})  -- terminal statement, stored on disk
upsert({1}, {{'-', 2, 20}}) -- old ups1
upsert({1}, {{'+', 2, 10}}) -- new ups2

'Execute' step takes update operations from the new upsert and combines them
with key of the old upsert.  {1} + {'+', 2, 10} can't be evaluated since
key consists of only one field. Note that in case upsert doesn't fold
into insert the upsert's tuple and the tuple stored in index can be
different In our particular case, tuple stored on disk has two fields
({1, 1}), so first upsert's update operation can be applied to it:
{1, 1} + {'+', 2, 10} --> {1, 11}. If upsert's operation can't be executed
using key of old upsert, we simply continue processing squash step.
In turn 'squash' is a combination of update operations: arithmetic
operations are combined so we don't have to store actions over the same
field; the rest operations - are merged into single array. As a result,
we get one upsert with squashed operations: upsert({1}, {{'+', 2, -10}}).
Then vy_apply_upsert() is called again to apply new upsert on the top of
terminal statement - insert{1, 1}. Since now tuple has second field,
update operations can be executed and corresponding result is {1, -9}.
It is the final result of upsert application procedure.
Now imagine that we have following upserts:

upsert({1, 1}, {{'-', 2, 20}}) -- old ups1
upsert({1}, {{'+', 2, 10}}) -- new ups2

In this case execution successfully finishes and modifies old upsert's
tuple: {1, 1} + {'+', 2, 10} --> {1, 11}
However, we still have to squash/accumulate update operations since they
may be applied on tuple stored on disk later. After all, we have
following upsert: upsert({2, 11}, {{'+', 2, -10}}). Then it is applied
on the top of insert({1, 1}) and we get the same result as in the first
case - {1, -9}. The only difference is that upsert's tuple was modified.
As one can see, execution of update operations applied to upsert's tuple
is redundant in the case index already contains tuple with the same key
(i.e. when upserts turns into update). Instead, we are able to
accumulate/squash update operations only. When the last upsert is being
applied, we can either execute all update operation on tuple fetched
from index (i.e. upsert is update) OR on tuple specified in the first
upsert (i.e. first upsert is insert).

Issue #5105 (upsert doesn't follow associative property)

Secondly, current approach breaks associative property: after upsert's
update operations are merged into one array, part of them (related to
one upsert) can be skipped, meanwhile the rest - is applied. For
instance:

-- Index is over second field.
i = s:create_index('pk', {parts={2, 'uint'}})
s:replace{1, 2, 3, 'default'}
s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
-- First update operation modifies primary key, so upsert must be ignored.
s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})

After merging two upserts we get the next one:
upsert({2, 2, 2}, {{'=', 4, 'upserted'}, {'#', 1, 1}, {'!', 3, 1}}

While we executing update operations, we don't distinguish operations from
different upserts. Thus, if one operation fails, the rest are ignored
as well. As a result, first (in general case - all preceding squashed
upserts) upsert won't be applied, even despite the fact it is
absolutely correct. What is more, user gets no error/warning concerning
this fact.

Issue #1622 (no upsert result validation)

After upsert application, there's no check verifying that result
satisfies space's format: number of fields, their types, overflows etc.
Due to this tuples violating format may appear in the space, which in
turn may lead to unpredictable consequences.

To resolve these issues, let's group update operations of each upsert into
separate array. So that operations related to particular upsert are
stored in single array. In terms of previous example we will get:
upsert({2, 2, 2}, {{{'=', 4, 'upserted'}}, {{'#', 1, 1}, {'!', 3, 1}}}

Also note that we don't longer have to apply update operations on tuple
in vy_apply_upsert() when it comes for two upserts: it can be done once we
face terminal statement; or if there's no underlying statement (i.e. it is
delete statement or no statement at all) we apply all update arrays except
the first one on upsert's tuple. In case one of operations from array
fail, we skip the rest operations from this array and process to the
next array. After successful application of update operations of each
array, we check that the resulting tuple fits into space format. If they
aren't, we rollback applied operations, log error and moving to the next
group of operations.

Arithmetic operations still can be combined in case there's no unsigned
fields in space format. Otherwise, result of subtraction can turn out to
be negative and resulting tuple won't satisfy this property.

Closes #1622
Closes #5105
Closes #5092
Part of #5107
---
Issues:
https://github.com/tarantool/tarantool/issues/1622
https://github.com/tarantool/tarantool/issues/5105
https://github.com/tarantool/tarantool/issues/5092
https://github.com/tarantool/tarantool/issues/5107
Branch:
https://github.com/tarantool/tarantool/tree/np/gh-5107-dont-squash-ops

@ChangeLog:
 - Rework upsert operation in vinyl so that now (gh-5107):
   - if upsert can't be applied it is skipped and corresponding error is logged (gh-1622);
   - upserts now follow associative property: result of several upserts
     doesn't depend on the order of their application (gh-5105);
   - upserts referring to -1 fieldno are handled correctly now (gh-5087).

 src/box/vinyl.c                 |   2 +-
 src/box/vy_stmt.c               |  28 ++-
 src/box/vy_stmt.h               |   5 +-
 src/box/vy_upsert.c             | 370 ++++++++++++++++++++----------
 src/box/xrow_update.c           |  61 +++++
 test/unit/vy_iterators_helper.c |   2 +-
 test/vinyl/upsert.result        | 394 ++++++++++++++++++++++++++++++++
 test/vinyl/upsert.test.lua      | 165 +++++++++++++
 8 files changed, 893 insertions(+), 134 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 32301d7ba..eab688147 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1984,7 +1984,7 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm,
 	operations[0].iov_base = (void *)expr;
 	operations[0].iov_len = expr_end - expr;
 	vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end,
-				    operations, 1);
+				    operations, 1, false);
 	if (vystmt == NULL)
 		return -1;
 	assert(vy_stmt_type(vystmt) == IPROTO_UPSERT);
diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c
index 92e0aa1c5..2ee82b3f2 100644
--- a/src/box/vy_stmt.c
+++ b/src/box/vy_stmt.c
@@ -313,16 +313,22 @@ vy_key_dup(const char *key)
 static struct tuple *
 vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 		     const char *tuple_end, struct iovec *ops,
-		     int op_count, enum iproto_type type)
+		     int op_count, enum iproto_type type, bool is_ops_encoded)
 {
 	mp_tuple_assert(tuple_begin, tuple_end);
 
 	const char *tmp = tuple_begin;
 	mp_decode_array(&tmp);
 
+	/*
+	 * ops are grouped in one extra array.
+	 * See vy_apply_upsert() for details.
+	 */
 	size_t ops_size = 0;
 	for (int i = 0; i < op_count; ++i)
 		ops_size += ops[i].iov_len;
+	if (!is_ops_encoded)
+		ops_size += mp_sizeof_array(op_count);
 
 	struct tuple *stmt = NULL;
 	struct region *region = &fiber()->gc;
@@ -360,6 +366,8 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin,
 	field_map_build(&builder, wpos - field_map_size);
 	memcpy(wpos, tuple_begin, mpsize);
 	wpos += mpsize;
+	if (!is_ops_encoded)
+		wpos = mp_encode_array(wpos, op_count);
 	for (struct iovec *op = ops, *end = ops + op_count;
 	     op != end; ++op) {
 		memcpy(wpos, op->iov_base, op->iov_len);
@@ -374,10 +382,11 @@ end:
 struct tuple *
 vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end, struct iovec *operations,
-		   uint32_t ops_cnt)
+		   uint32_t ops_cnt, bool is_ops_encoded)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    operations, ops_cnt, IPROTO_UPSERT);
+				    operations, ops_cnt, IPROTO_UPSERT,
+				    is_ops_encoded);
 }
 
 struct tuple *
@@ -385,7 +394,7 @@ vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin,
 		    const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_REPLACE);
+				    NULL, 0, IPROTO_REPLACE, true);
 }
 
 struct tuple *
@@ -393,7 +402,7 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_INSERT);
+				    NULL, 0, IPROTO_INSERT, true);
 }
 
 struct tuple *
@@ -401,7 +410,7 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
 		   const char *tuple_end)
 {
 	return vy_stmt_new_with_ops(format, tuple_begin, tuple_end,
-				    NULL, 0, IPROTO_DELETE);
+				    NULL, 0, IPROTO_DELETE, true);
 }
 
 struct tuple *
@@ -735,19 +744,20 @@ vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format)
 		/* Always use key format for DELETE statements. */
 		stmt = vy_stmt_new_with_ops(env->key_format,
 					    request.key, request.key_end,
-					    NULL, 0, IPROTO_DELETE);
+					    NULL, 0, IPROTO_DELETE, true);
 		break;
 	case IPROTO_INSERT:
 	case IPROTO_REPLACE:
 		stmt = vy_stmt_new_with_ops(format, request.tuple,
 					    request.tuple_end,
-					    NULL, 0, request.type);
+					    NULL, 0, request.type, true);
 		break;
 	case IPROTO_UPSERT:
 		ops.iov_base = (char *)request.ops;
 		ops.iov_len = request.ops_end - request.ops;
 		stmt = vy_stmt_new_upsert(format, request.tuple,
-					  request.tuple_end, &ops, 1);
+					  request.tuple_end, &ops,
+					  1, true);
 		break;
 	default:
 		/* TODO: report filename. */
diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h
index 25219230d..65acbecac 100644
--- a/src/box/vy_stmt.h
+++ b/src/box/vy_stmt.h
@@ -528,6 +528,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
  * @param part_count Part count from key definition.
  * @param operations Vector of update operations.
  * @param ops_cnt Length of the update operations vector.
+ * @param is_ops_encoded True, if update operations are already packed
+  *                      into extra msgpack array.
  *
  * @retval NULL     Memory allocation error.
  * @retval not NULL Success.
@@ -535,7 +537,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin,
 struct tuple *
 vy_stmt_new_upsert(struct tuple_format *format,
 		   const char *tuple_begin, const char *tuple_end,
-		   struct iovec *operations, uint32_t ops_cnt);
+		   struct iovec *operations, uint32_t ops_cnt,
+		   bool is_ops_encoded);
 
 /**
  * Create REPLACE statement from UPSERT statement.
diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c
index e697b6321..3811e91b3 100644
--- a/src/box/vy_upsert.c
+++ b/src/box/vy_upsert.c
@@ -39,38 +39,217 @@
 #include "column_mask.h"
 
 /**
- * Try to squash two upsert series (msgspacked index_base + ops)
- * Try to create a tuple with squahed operations
+ * Check that key hasn't been changed after applying upsert operation.
+ */
+static bool
+vy_apply_result_does_cross_pk(struct tuple *old_stmt, const char *result,
+			      const char *result_end, struct key_def *cmp_def,
+			      uint64_t col_mask)
+{
+	if (!key_update_can_be_skipped(cmp_def->column_mask, col_mask)) {
+		struct tuple *tuple =
+			vy_stmt_new_replace(tuple_format(old_stmt), result,
+					    result_end);
+		int cmp_res = vy_stmt_compare(old_stmt, HINT_NONE, tuple,
+					      HINT_NONE, cmp_def);
+		tuple_unref(tuple);
+		return cmp_res != 0;
+	}
+	return false;
+}
+
+/**
+ * Apply update operations stored in @a upsert on tuple @a stmt. If @a stmt is
+ * void statement (i.e. it is NULL or delete statement) then operations are
+ * applied on tuple stored in @a upsert. Update operations of @a upsert which
+ * can't be applied are skipped along side with other operations from single
+ * group (i.e. packed in one msgpack array); errors may be logged depending on
+ * @a suppress_error flag.
  *
- * @retval 0 && *result_stmt != NULL : successful squash
- * @retval 0 && *result_stmt == NULL : unsquashable sources
- * @retval -1 - memory error
+ * @param upsert Upsert statement to be applied on @a stmt.
+ * @param stmt Statement to be used as base for upsert operations.
+ * @param cmp_def Key definition required to provide check of primary key
+ *                modification.
+ * @return Tuple containing result of upsert application; NULL in case OOM.
+ */
+static struct tuple *
+vy_apply_upsert_on_terminal_stmt(struct tuple *upsert, struct tuple *stmt,
+				 struct key_def *cmp_def, bool suppress_error)
+{
+	assert(vy_stmt_type(upsert) == IPROTO_UPSERT);
+	assert(stmt == NULL || vy_stmt_type(stmt) != IPROTO_UPSERT);
+
+	uint32_t mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(upsert, &mp_size);
+	/* Msgpack containing result of upserts application. */
+	const char *result_mp;
+	bool stmt_is_void = stmt == NULL || vy_stmt_type(stmt) == IPROTO_DELETE;
+	if (stmt_is_void)
+		result_mp = vy_upsert_data_range(upsert, &mp_size);
+	else
+		result_mp = tuple_data_range(stmt, &mp_size);
+	const char *result_mp_end = result_mp + mp_size;
+	/*
+	 * xrow_upsert_execute() allocates result using region,
+	 * so save starting point to release it later.
+	 */
+	struct region *region = &fiber()->gc;
+	size_t region_svp = region_used(region);
+	uint64_t column_mask = COLUMN_MASK_FULL;
+	struct tuple_format *format = tuple_format(upsert);
+
+	uint32_t ups_cnt = mp_decode_array(&new_ops);
+	const char *ups_ops = new_ops;
+	/*
+	 * In case upsert folds into insert, we must skip first
+	 * update operations.
+	 */
+	if (stmt_is_void) {
+		ups_cnt--;
+		mp_next(&ups_ops);
+	}
+	for (uint32_t i = 0; i < ups_cnt; ++i) {
+		assert(mp_typeof(*ups_ops) == MP_ARRAY);
+		const char *ups_ops_end = ups_ops;
+		mp_next(&ups_ops_end);
+		const char *exec_res = result_mp;
+		exec_res = xrow_upsert_execute(ups_ops, ups_ops_end, result_mp,
+					       result_mp_end, format, &mp_size,
+					       0, suppress_error, &column_mask);
+		if (exec_res == NULL) {
+			if (! suppress_error) {
+				struct error *e = diag_last_error(diag_get());
+				assert(e != NULL);
+				/* Bail out immediately in case of OOM. */
+				if (e->type != &type_ClientError) {
+					region_truncate(region, region_svp);
+					return NULL;
+				}
+				diag_log();
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		/*
+		 * If it turns out that resulting tuple modifies primary
+		 * key, then simply ignore this upsert.
+		 */
+		if (vy_apply_result_does_cross_pk(stmt, exec_res,
+						  exec_res + mp_size, cmp_def,
+						  column_mask)) {
+			if (!suppress_error) {
+				say_error("upsert operations %s are not applied"\
+					  " due to primary key modification",
+					  mp_str(ups_ops));
+			}
+			ups_ops = ups_ops_end;
+			continue;
+		}
+		ups_ops = ups_ops_end;
+		/*
+		 * Result statement must satisfy space's format. Since upsert's
+		 * tuple correctness is already checked in vy_upsert(), let's
+		 * use its format to provide result verification.
+		 */
+		struct tuple_format *format = tuple_format(upsert);
+		if (tuple_validate_raw(format, exec_res) != 0) {
+			if (! suppress_error)
+				diag_log();
+			continue;
+		}
+		result_mp = exec_res;
+		result_mp_end = exec_res + mp_size;
+	}
+	struct tuple *new_terminal_stmt = vy_stmt_new_replace(format, result_mp,
+							      result_mp_end);
+	region_truncate(region, region_svp);
+	if (new_terminal_stmt == NULL)
+		return NULL;
+	vy_stmt_set_lsn(new_terminal_stmt, vy_stmt_lsn(upsert));
+	return new_terminal_stmt;
+}
+
+/**
+ * Unpack upsert's update operations from msgpack array
+ * into array of iovecs.
  */
+static void
+upsert_ops_to_iovec(const char *ops, uint32_t ops_cnt, struct iovec *iov_arr)
+{
+	for (uint32_t i = 0; i < ops_cnt; ++i) {
+		assert(mp_typeof(*ops) == MP_ARRAY);
+		iov_arr[i].iov_base = (char *) ops;
+		mp_next(&ops);
+		iov_arr[i].iov_len = ops - (char *) iov_arr[i].iov_base;
+	}
+}
+
+/**
+ * Attempt at squashing arithmetic operations applied on the same tuple
+ * fields. We never squash first member (i.e. operations from the oldest
+ * upsert) from old operations, since it must be skipped in case such
+ * upsert turns into insert.
+ * Also note that content of @a new_ops and @a old_ops arrays can be modified
+ * as a result of function invocation.
+ * Array containing resulting operations is allocated using region.
+ * In case of OOM function returns -1; 0 otherwise.
+ * */
 static int
 vy_upsert_try_to_squash(struct tuple_format *format,
-			const char *key_mp, const char *key_mp_end,
-			const char *old_serie, const char *old_serie_end,
-			const char *new_serie, const char *new_serie_end,
-			struct tuple **result_stmt)
+			struct iovec *new_ops, uint32_t new_ops_cnt,
+			struct iovec *old_ops, uint32_t old_ops_cnt,
+			struct iovec **res_ops, uint32_t *res_ops_cnt)
 {
-	*result_stmt = NULL;
-
-	size_t squashed_size;
-	const char *squashed =
-		xrow_upsert_squash(old_serie, old_serie_end,
-				   new_serie, new_serie_end, format,
-				   &squashed_size);
-	if (squashed == NULL)
+	uint32_t squashed_ops = 0;
+	for (uint32_t i = 0; i < new_ops_cnt; ++i) {
+		const char *new = (const char *) new_ops[i].iov_base;
+		const char *new_end = new + new_ops[i].iov_len;
+		/*
+		 * Don't touch first update operation since it should
+		 * be skipped in case first upsert folds into insert.
+		 */
+		for (uint32_t j = 1; j < old_ops_cnt; ++j) {
+			const char *old =
+				(const char *) old_ops[j].iov_base;
+			const char *old_end =
+				(const char *) old + old_ops[j].iov_len;
+			/* Operation may already be squashed. */
+			if (old == NULL)
+				continue;
+			size_t squashed_size;
+			const char *res =
+				xrow_upsert_squash(old, old_end, new, new_end,
+						   format, &squashed_size);
+			if (res != NULL) {
+				new_ops[i].iov_base = (char *) res;
+				new_ops[i].iov_len = squashed_size;
+				old_ops[j].iov_base = NULL;
+				old_ops[j].iov_len = 0;
+				squashed_ops++;
+			}
+		}
+	}
+	*res_ops_cnt = new_ops_cnt + old_ops_cnt - squashed_ops;
+	if (squashed_ops == 0) {
+		*res_ops = old_ops;
 		return 0;
-	/* Successful squash! */
-	struct iovec operations[1];
-	operations[0].iov_base = (void *)squashed;
-	operations[0].iov_len = squashed_size;
-
-	*result_stmt = vy_stmt_new_upsert(format, key_mp, key_mp_end,
-					  operations, 1);
-	if (*result_stmt == NULL)
+	}
+	size_t ops_size;
+	*res_ops = region_alloc_array(&fiber()->gc, typeof(*res_ops[0]),
+				      *res_ops_cnt, &ops_size);
+	if (*res_ops == NULL) {
+		diag_set(OutOfMemory, ops_size, "region_alloc_array",
+			 "res_ops");
 		return -1;
+	}
+	/* Fill gaps (i.e. old squashed operations) in the resulting array. */
+	for (uint32_t i = 0; i < old_ops_cnt; ++i) {
+		if (old_ops[i].iov_base != NULL)
+			(*res_ops)[i] = old_ops[i];
+	}
+	for (uint32_t i = old_ops_cnt - squashed_ops, j = 0; j < new_ops_cnt;
+	     ++i, ++j)
+		(*res_ops)[i] = new_ops[j];
 	return 0;
 }
 
@@ -87,122 +266,69 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt,
 	assert(new_stmt != old_stmt);
 	assert(vy_stmt_type(new_stmt) == IPROTO_UPSERT);
 
-	if (old_stmt == NULL || vy_stmt_type(old_stmt) == IPROTO_DELETE) {
-		/*
-		 * INSERT case: return new stmt.
-		 */
-		return vy_stmt_replace_from_upsert(new_stmt);
+	struct tuple *result_stmt = NULL;
+	if (old_stmt == NULL || vy_stmt_type(old_stmt) != IPROTO_UPSERT) {
+		return vy_apply_upsert_on_terminal_stmt(new_stmt, old_stmt,
+						        cmp_def, suppress_error);
 	}
 
-	struct tuple_format *format = tuple_format(new_stmt);
-
+	assert(old_stmt != NULL);
+	assert(vy_stmt_type(old_stmt) == IPROTO_UPSERT);
 	/*
-	 * Unpack UPSERT operation from the new stmt
+	 * Unpack UPSERT operation from the old and new stmts.
 	 */
 	uint32_t mp_size;
-	const char *new_ops;
-	new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
-	const char *new_ops_end = new_ops + mp_size;
-
+	const char *old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
+	const char *old_stmt_mp = vy_upsert_data_range(old_stmt, &mp_size);
+	const char *old_stmt_mp_end = old_stmt_mp + mp_size;
+	const char *new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size);
 	/*
-	 * Apply new operations to the old stmt
+	 * UPSERT + UPSERT case: try to squash arithmetic operations.
+	 * Firstly, unpack operations to iovec array.
 	 */
-	const char *result_mp;
-	if (vy_stmt_type(old_stmt) == IPROTO_UPSERT)
-		result_mp = vy_upsert_data_range(old_stmt, &mp_size);
-	else
-		result_mp = tuple_data_range(old_stmt, &mp_size);
-	const char *result_mp_end = result_mp + mp_size;
-	struct tuple *result_stmt = NULL;
+	struct tuple_format *format = tuple_format(old_stmt);
 	struct region *region = &fiber()->gc;
 	size_t region_svp = region_used(region);
-	uint8_t old_type = vy_stmt_type(old_stmt);
-	uint64_t column_mask = COLUMN_MASK_FULL;
-	result_mp = xrow_upsert_execute(new_ops, new_ops_end, result_mp,
-					result_mp_end, format, &mp_size,
-					0, suppress_error, &column_mask);
-	if (result_mp == NULL) {
+	uint32_t old_ops_cnt = mp_decode_array(&old_ops);
+	uint32_t new_ops_cnt = mp_decode_array(&new_ops);
+	size_t ops_size;
+	struct iovec *operations =
+		region_alloc_array(region, typeof(operations[0]),
+				   old_ops_cnt + new_ops_cnt, &ops_size);
+	if (operations == NULL) {
 		region_truncate(region, region_svp);
+		diag_set(OutOfMemory, ops_size, "region_alloc_array",
+			 "operations");
 		return NULL;
 	}
-	result_mp_end = result_mp + mp_size;
-	if (old_type != IPROTO_UPSERT) {
-		assert(old_type == IPROTO_INSERT ||
-		       old_type == IPROTO_REPLACE);
-		/*
-		 * UPDATE case: return the updated old stmt.
-		 */
-		result_stmt = vy_stmt_new_replace(format, result_mp,
-						  result_mp_end);
+	upsert_ops_to_iovec(old_ops, old_ops_cnt, operations);
+	upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt]);
+	struct iovec *res_ops = NULL;
+	uint32_t res_ops_cnt = 0;
+	if (vy_upsert_try_to_squash(format, &operations[old_ops_cnt],
+				    new_ops_cnt, operations, old_ops_cnt,
+				    &res_ops, &res_ops_cnt) != 0) {
+		/* OOM */
 		region_truncate(region, region_svp);
-		if (result_stmt == NULL)
-			return NULL; /* OOM */
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
+		return NULL;
 	}
-
+	assert(res_ops_cnt > 0);
+	assert(res_ops != NULL);
 	/*
-	 * Unpack UPSERT operation from the old stmt
+	 * Adding update operations. We keep order of update operations in
+	 * the array the same. It is vital since first set of operations
+	 * must be skipped in case upsert folds into insert. For instance:
+	 * old_ops = {{{op1}, {op2}}, {{op3}}}
+	 * new_ops = {{{op4}, {op5}}}
+	 * res_ops = {{{op1}, {op2}}, {{op3}}, {{op4}, {op5}}}
+	 * If upsert corresponding to old_ops becomes insert, then
+	 * {{op1}, {op2}} update operations are not applied.
 	 */
-	assert(old_stmt != NULL);
-	const char *old_ops;
-	old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size);
-	const char *old_ops_end = old_ops + mp_size;
-	assert(old_ops_end > old_ops);
-
-	/*
-	 * UPSERT + UPSERT case: combine operations
-	 */
-	assert(old_ops_end - old_ops > 0);
-	if (vy_upsert_try_to_squash(format, result_mp, result_mp_end,
-				    old_ops, old_ops_end, new_ops, new_ops_end,
-				    &result_stmt) != 0) {
-		region_truncate(region, region_svp);
-		return NULL;
-	}
-	if (result_stmt != NULL) {
-		region_truncate(region, region_svp);
-		vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-		goto check_key;
-	}
-
-	/* Failed to squash, simply add one upsert to another */
-	int old_ops_cnt, new_ops_cnt;
-	struct iovec operations[3];
-
-	old_ops_cnt = mp_decode_array(&old_ops);
-	operations[1].iov_base = (void *)old_ops;
-	operations[1].iov_len = old_ops_end - old_ops;
-
-	new_ops_cnt = mp_decode_array(&new_ops);
-	operations[2].iov_base = (void *)new_ops;
-	operations[2].iov_len = new_ops_end - new_ops;
-
-	char ops_buf[16];
-	char *header = mp_encode_array(ops_buf, old_ops_cnt + new_ops_cnt);
-	operations[0].iov_base = (void *)ops_buf;
-	operations[0].iov_len = header - ops_buf;
-
-	result_stmt = vy_stmt_new_upsert(format, result_mp, result_mp_end,
-					 operations, 3);
+	result_stmt = vy_stmt_new_upsert(format, old_stmt_mp, old_stmt_mp_end,
+					 res_ops, res_ops_cnt, false);
 	region_truncate(region, region_svp);
 	if (result_stmt == NULL)
 		return NULL;
 	vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt));
-
-check_key:
-	/*
-	 * Check that key hasn't been changed after applying operations.
-	 */
-	if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) &&
-	    vy_stmt_compare(old_stmt, HINT_NONE, result_stmt,
-			    HINT_NONE, cmp_def) != 0) {
-		/*
-		 * Key has been changed: ignore this UPSERT and
-		 * @retval the old stmt.
-		 */
-		tuple_unref(result_stmt);
-		result_stmt = vy_stmt_dup(old_stmt);
-	}
 	return result_stmt;
 }
diff --git a/src/box/xrow_update.c b/src/box/xrow_update.c
index 833975f03..62893c735 100644
--- a/src/box/xrow_update.c
+++ b/src/box/xrow_update.c
@@ -416,6 +416,60 @@ xrow_upsert_execute(const char *expr,const char *expr_end,
 	return xrow_update_finish(&update, format, p_tuple_len);
 }
 
+/** Return true if value stored in @a arith_op satisfy format requirements. */
+static bool
+update_arith_op_does_satisfy_format(struct xrow_update_op *arith_op,
+				    struct tuple_format *format)
+{
+	/*
+	 * Upsert's tuple may contain more tuples than specified in
+	 * space's format. In this case they are assumed to be unsigned.
+	 */
+	enum field_type fld_type = FIELD_TYPE_UNSIGNED;
+	if (arith_op->field_no < 0 ||
+	    (uint32_t) arith_op->field_no < tuple_format_field_count(format)) {
+		struct tuple_field *fld =
+			tuple_format_field(format, arith_op->field_no);
+		assert(fld != NULL);
+		fld_type = fld->type;
+	}
+	struct xrow_update_arg_arith arith = arith_op->arg.arith;
+	if (fld_type == FIELD_TYPE_UNSIGNED) {
+		if (arith.type == XUPDATE_TYPE_INT) {
+			if (!int96_is_uint64(&arith.int96))
+				return false;
+			return true;
+		}
+		/* Can't store decimals/floating points in unsigned field. */
+		return false;
+	}
+	if (fld_type == FIELD_TYPE_INTEGER) {
+		if (arith.type == XUPDATE_TYPE_INT)
+			return true;
+		return false;
+	}
+	if (fld_type == FIELD_TYPE_DOUBLE) {
+		if (arith.type == XUPDATE_TYPE_FLOAT ||
+		    arith.type == XUPDATE_TYPE_DOUBLE)
+			return true;
+		return false;
+	}
+	if (fld_type == FIELD_TYPE_NUMBER || fld_type == FIELD_TYPE_SCALAR) {
+		if (arith.type != XUPDATE_TYPE_DECIMAL)
+			return true;
+		return false;
+	}
+	if (fld_type == FIELD_TYPE_DECIMAL) {
+		if (arith.type == XUPDATE_TYPE_DECIMAL)
+			return true;
+		return false;
+	}
+	if (fld_type == FIELD_TYPE_ANY)
+		return true;
+	/* All other field types are not compatible with numerics. */
+	return false;
+}
+
 const char *
 xrow_upsert_squash(const char *expr1, const char *expr1_end,
 		   const char *expr2, const char *expr2_end,
@@ -535,6 +589,13 @@ xrow_upsert_squash(const char *expr1, const char *expr1_end,
 		if (xrow_update_arith_make(op[1], arith,
 					   &res.arg.arith) != 0)
 			return NULL;
+		res.field_no = op[0]->field_no;
+		/*
+		 * Do not squash operations if they don't satisfy
+		 * format.
+		 */
+		if (!update_arith_op_does_satisfy_format(&res, format))
+			return NULL;
 		res_ops = mp_encode_array(res_ops, 3);
 		res_ops = mp_encode_str(res_ops,
 					(const char *)&op[0]->opcode, 1);
diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c
index 0d20f19ef..15470920b 100644
--- a/test/unit/vy_iterators_helper.c
+++ b/test/unit/vy_iterators_helper.c
@@ -112,7 +112,7 @@ vy_new_simple_stmt(struct tuple_format *format, struct key_def *key_def,
 			ops = mp_encode_int(ops, templ->upsert_value);
 		operations[0].iov_base = tmp;
 		operations[0].iov_len = ops - tmp;
-		ret = vy_stmt_new_upsert(format, buf, pos, operations, 1);
+		ret = vy_stmt_new_upsert(format, buf, pos, operations, 1, true);
 		fail_if(ret == NULL);
 		break;
 	}
diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result
index 3a7f6629d..343084e81 100644
--- a/test/vinyl/upsert.result
+++ b/test/vinyl/upsert.result
@@ -899,3 +899,397 @@ s:select()
 s:drop()
 ---
 ...
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- gh-5087: test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', 3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1, 200}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+s:insert({1, 1, 1})
+---
+- [1, 1, 1]
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1}, {{'=', -3, 100}})
+---
+...
+s:upsert({1}, {{'=', -1, 200}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+-- gh-5105: Two upserts are NOT squashed into one, so only one (first one)
+-- is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+---
+- - [1, 1, 200]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+---
+...
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+---
+...
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select() -- {1, 1}
+---
+- - [1, 1]
+...
+s:delete({1})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+---
+...
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+---
+...
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+---
+...
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+---
+...
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 300]
+...
+s:drop()
+---
+...
+-- gh-1622: upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1, 1}
+---
+- [1, 1]
+...
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+---
+...
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+---
+- - [1, 1]
+...
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+---
+...
+s:replace{2, 2}
+---
+- [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 2}})
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+box.commit()
+---
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+s:upsert({2, 2}, {{'=', 3, 20}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 2]
+...
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'=', 3, 30}})
+---
+...
+s:replace{2, 3}
+---
+- [2, 3]
+...
+s:select{}
+---
+- - [1, 1]
+  - [2, 3]
+...
+s:upsert({1, 1}, {{'=', 3, 30}})
+---
+...
+s:delete{1}
+---
+...
+s:select{}
+---
+- - [2, 3]
+...
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:upsert({2, 2}, {{'=', 3, 40}})
+---
+...
+s:upsert({2, 2}, {{'+', 2, 5}})
+---
+...
+s:select{}
+---
+- - [2, 13]
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [2, 13]
+...
+s:drop()
+---
+...
+-- Test different scenarious during which update operations squash can't
+-- take place due to format violations.
+--
+decimal = require('decimal')
+---
+...
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 })
+---
+...
+s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\
+          {name='s', type='scalar'}, {name='f', type='double'},\
+          {name='d', type='decimal'}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+s:replace{1, 1, 1, 1.1, decimal.new(1.1) }
+---
+- [1, 1, 1, 1.1, 1.1]
+...
+s:replace{2, 1, 1, 1.1, decimal.new(1.1)}
+---
+- [2, 1, 1, 1.1, 1.1]
+...
+box.snapshot()
+---
+- ok
+...
+-- Can't assign integer to float field. First operation is still applied.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}})
+---
+...
+-- Can't add floating point to integer (result is floating point).
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 1, 5.1, 1.1]
+  - [2, 6, 1, 1.1, 1.1]
+...
+-- Integer overflow check.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+---
+...
+-- Negative result of subtraction stored in unsigned field.
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 9223372036854775809, 5.1, 1.1]
+  - [2, 8, 1, 1.1, 1.1]
+...
+-- Decimals do not fit into numerics and vice versa.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}})
+---
+...
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}})
+---
+...
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select()
+---
+- - [1, 1, 9223372036854775809, 5.1, 2.1]
+  - [2, 8, 1, 1.1, 1.1]
+...
+s:drop()
+---
+...
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+---
+...
+i = s:create_index('pk', {parts={2, 'uint'}})
+---
+...
+s:replace{1, 2, 3, 'default'}
+---
+- [1, 2, 3, 'default']
+...
+box.snapshot()
+---
+- ok
+...
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+---
+...
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+---
+...
+box.snapshot()
+---
+- ok
+...
+s:select{}
+---
+- - [1, 2, 3, 'upserted']
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua
index 1d77474da..5c5391f44 100644
--- a/test/vinyl/upsert.test.lua
+++ b/test/vinyl/upsert.test.lua
@@ -372,3 +372,168 @@ box.snapshot()
 s:select()
 
 s:drop()
+
+-- gh-5107: don't squash upsert operations into one array.
+--
+-- gh-5087: test upsert execution/squash referring to fields in reversed
+-- order (via negative indexing).
+--
+s = box.schema.create_space('test', {engine = 'vinyl'})
+pk = s:create_index('pk')
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', 3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+s:select() -- {1, 1, 200}
+
+s:delete({1})
+s:insert({1, 1, 1})
+box.snapshot()
+
+s:upsert({1}, {{'=', -3, 100}})
+s:upsert({1}, {{'=', -1, 200}})
+box.snapshot()
+-- gh-5105: Two upserts are NOT squashed into one, so only one (first one)
+-- is skipped, meanwhile second one is applied.
+--
+s:select() -- {1, 1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 100}}) -- {1, 101}
+s:upsert({1}, {{'-', 2, 100}}) -- {1, 1}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 201}
+s:upsert({1}, {{'-', 2, 200}}) -- {1, 1}
+box.snapshot()
+s:select() -- {1, 1}
+
+s:delete({1})
+box.snapshot()
+
+s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1}
+s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1}
+s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100}
+s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300}
+s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300}
+box.snapshot()
+s:select()
+
+s:drop()
+
+-- gh-1622: upsert operations which break space format are not applied.
+--
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 })
+pk = s:create_index('pk')
+s:replace{1, 1}
+-- Error is logged, upsert is not applied.
+--
+s:upsert({1, 1}, {{'=', 3, 5}})
+-- During read the incorrect upsert is ignored.
+--
+s:select{}
+
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+s:replace{2, 2}
+s:upsert({2, 2}, {{'=', 3, 2}})
+s:select{}
+box.commit()
+s:select{}
+
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+s:select{}
+s:upsert({2, 2}, {{'=', 3, 20}})
+box.snapshot()
+s:select{}
+
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'=', 3, 30}})
+s:replace{2, 3}
+s:select{}
+
+s:upsert({1, 1}, {{'=', 3, 30}})
+s:delete{1}
+s:select{}
+
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:upsert({2, 2}, {{'=', 3, 40}})
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:select{}
+box.snapshot()
+s:select{}
+
+s:drop()
+
+-- Test different scenarious during which update operations squash can't
+-- take place due to format violations.
+--
+decimal = require('decimal')
+
+s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 })
+s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\
+          {name='s', type='scalar'}, {name='f', type='double'},\
+          {name='d', type='decimal'}})
+pk = s:create_index('pk')
+s:replace{1, 1, 1, 1.1, decimal.new(1.1) }
+s:replace{2, 1, 1, 1.1, decimal.new(1.1)}
+box.snapshot()
+-- Can't assign integer to float field. First operation is still applied.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}})
+-- Can't add floating point to integer (result is floating point).
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}})
+box.snapshot()
+s:select()
+-- Integer overflow check.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}})
+-- Negative result of subtraction stored in unsigned field.
+--
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}})
+box.snapshot()
+s:select()
+-- Decimals do not fit into numerics and vice versa.
+--
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}})
+s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}})
+s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}})
+box.snapshot()
+s:select()
+
+s:drop()
+
+-- Make sure upserts satisfy associativity rule.
+--
+s = box.schema.space.create('test', {engine='vinyl'})
+i = s:create_index('pk', {parts={2, 'uint'}})
+s:replace{1, 2, 3, 'default'}
+box.snapshot()
+
+s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}})
+-- Upsert will fail and thus ignored.
+--
+s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}})
+box.snapshot()
+
+s:select{}
+
+s:drop()
-- 
2.17.1

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2020-10-03 13:51 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-08-08 14:21 [Tarantool-patches] [PATCH V2] vinyl: rework upsert operation Nikita Pettik
2020-08-12 21:23 ` Vladislav Shpilevoy
2020-10-03 13:51   ` Nikita Pettik

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox