From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtp56.i.mail.ru (smtp56.i.mail.ru [217.69.128.36]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id 4864F430407 for ; Sat, 8 Aug 2020 17:21:48 +0300 (MSK) From: Nikita Pettik Date: Sat, 8 Aug 2020 17:21:45 +0300 Message-Id: Subject: [Tarantool-patches] [PATCH V2] vinyl: rework upsert operation List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: tarantool-patches@dev.tarantool.org Cc: v.shpilevoy@tarantool.org Previous upsert implementation had a few drawbacks which led to number of bugs and issues. Issue #5092 (redundant update operations execution) In a nutshell, application of upsert(s) (on top of another upsert) consists of two actions (see vy_apply_upsert()): execute and squash. Consider example: insert({1, 1}) -- terminal statement, stored on disk upsert({1}, {{'-', 2, 20}}) -- old ups1 upsert({1}, {{'+', 2, 10}}) -- new ups2 'Execute' step takes update operations from the new upsert and combines them with key of the old upsert. {1} + {'+', 2, 10} can't be evaluated since key consists of only one field. Note that in case upsert doesn't fold into insert the upsert's tuple and the tuple stored in index can be different In our particular case, tuple stored on disk has two fields ({1, 1}), so first upsert's update operation can be applied to it: {1, 1} + {'+', 2, 10} --> {1, 11}. If upsert's operation can't be executed using key of old upsert, we simply continue processing squash step. In turn 'squash' is a combination of update operations: arithmetic operations are combined so we don't have to store actions over the same field; the rest operations - are merged into single array. As a result, we get one upsert with squashed operations: upsert({1}, {{'+', 2, -10}}). Then vy_apply_upsert() is called again to apply new upsert on the top of terminal statement - insert{1, 1}. Since now tuple has second field, update operations can be executed and corresponding result is {1, -9}. It is the final result of upsert application procedure. Now imagine that we have following upserts: upsert({1, 1}, {{'-', 2, 20}}) -- old ups1 upsert({1}, {{'+', 2, 10}}) -- new ups2 In this case execution successfully finishes and modifies old upsert's tuple: {1, 1} + {'+', 2, 10} --> {1, 11} However, we still have to squash/accumulate update operations since they may be applied on tuple stored on disk later. After all, we have following upsert: upsert({2, 11}, {{'+', 2, -10}}). Then it is applied on the top of insert({1, 1}) and we get the same result as in the first case - {1, -9}. The only difference is that upsert's tuple was modified. As one can see, execution of update operations applied to upsert's tuple is redundant in the case index already contains tuple with the same key (i.e. when upserts turns into update). Instead, we are able to accumulate/squash update operations only. When the last upsert is being applied, we can either execute all update operation on tuple fetched from index (i.e. upsert is update) OR on tuple specified in the first upsert (i.e. first upsert is insert). Issue #5105 (upsert doesn't follow associative property) Secondly, current approach breaks associative property: after upsert's update operations are merged into one array, part of them (related to one upsert) can be skipped, meanwhile the rest - is applied. For instance: -- Index is over second field. i = s:create_index('pk', {parts={2, 'uint'}}) s:replace{1, 2, 3, 'default'} s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}}) -- First update operation modifies primary key, so upsert must be ignored. s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}}) After merging two upserts we get the next one: upsert({2, 2, 2}, {{'=', 4, 'upserted'}, {'#', 1, 1}, {'!', 3, 1}} While we executing update operations, we don't distinguish operations from different upserts. Thus, if one operation fails, the rest are ignored as well. As a result, first (in general case - all preceding squashed upserts) upsert won't be applied, even despite the fact it is absolutely correct. What is more, user gets no error/warning concerning this fact. Issue #1622 (no upsert result validation) After upsert application, there's no check verifying that result satisfies space's format: number of fields, their types, overflows etc. Due to this tuples violating format may appear in the space, which in turn may lead to unpredictable consequences. To resolve these issues, let's group update operations of each upsert into separate array. So that operations related to particular upsert are stored in single array. In terms of previous example we will get: upsert({2, 2, 2}, {{{'=', 4, 'upserted'}}, {{'#', 1, 1}, {'!', 3, 1}}} Also note that we don't longer have to apply update operations on tuple in vy_apply_upsert() when it comes for two upserts: it can be done once we face terminal statement; or if there's no underlying statement (i.e. it is delete statement or no statement at all) we apply all update arrays except the first one on upsert's tuple. In case one of operations from array fail, we skip the rest operations from this array and process to the next array. After successful application of update operations of each array, we check that the resulting tuple fits into space format. If they aren't, we rollback applied operations, log error and moving to the next group of operations. Arithmetic operations still can be combined in case there's no unsigned fields in space format. Otherwise, result of subtraction can turn out to be negative and resulting tuple won't satisfy this property. Closes #1622 Closes #5105 Closes #5092 Part of #5107 --- Issues: https://github.com/tarantool/tarantool/issues/1622 https://github.com/tarantool/tarantool/issues/5105 https://github.com/tarantool/tarantool/issues/5092 https://github.com/tarantool/tarantool/issues/5107 Branch: https://github.com/tarantool/tarantool/tree/np/gh-5107-dont-squash-ops @ChangeLog: - Rework upsert operation in vinyl so that now (gh-5107): - if upsert can't be applied it is skipped and corresponding error is logged (gh-1622); - upserts now follow associative property: result of several upserts doesn't depend on the order of their application (gh-5105); - upserts referring to -1 fieldno are handled correctly now (gh-5087). src/box/vinyl.c | 2 +- src/box/vy_stmt.c | 28 ++- src/box/vy_stmt.h | 5 +- src/box/vy_upsert.c | 370 ++++++++++++++++++++---------- src/box/xrow_update.c | 61 +++++ test/unit/vy_iterators_helper.c | 2 +- test/vinyl/upsert.result | 394 ++++++++++++++++++++++++++++++++ test/vinyl/upsert.test.lua | 165 +++++++++++++ 8 files changed, 893 insertions(+), 134 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 32301d7ba..eab688147 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1984,7 +1984,7 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm, operations[0].iov_base = (void *)expr; operations[0].iov_len = expr_end - expr; vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end, - operations, 1); + operations, 1, false); if (vystmt == NULL) return -1; assert(vy_stmt_type(vystmt) == IPROTO_UPSERT); diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index 92e0aa1c5..2ee82b3f2 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -313,16 +313,22 @@ vy_key_dup(const char *key) static struct tuple * vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, struct iovec *ops, - int op_count, enum iproto_type type) + int op_count, enum iproto_type type, bool is_ops_encoded) { mp_tuple_assert(tuple_begin, tuple_end); const char *tmp = tuple_begin; mp_decode_array(&tmp); + /* + * ops are grouped in one extra array. + * See vy_apply_upsert() for details. + */ size_t ops_size = 0; for (int i = 0; i < op_count; ++i) ops_size += ops[i].iov_len; + if (!is_ops_encoded) + ops_size += mp_sizeof_array(op_count); struct tuple *stmt = NULL; struct region *region = &fiber()->gc; @@ -360,6 +366,8 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin, field_map_build(&builder, wpos - field_map_size); memcpy(wpos, tuple_begin, mpsize); wpos += mpsize; + if (!is_ops_encoded) + wpos = mp_encode_array(wpos, op_count); for (struct iovec *op = ops, *end = ops + op_count; op != end; ++op) { memcpy(wpos, op->iov_base, op->iov_len); @@ -374,10 +382,11 @@ end: struct tuple * vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, struct iovec *operations, - uint32_t ops_cnt) + uint32_t ops_cnt, bool is_ops_encoded) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - operations, ops_cnt, IPROTO_UPSERT); + operations, ops_cnt, IPROTO_UPSERT, + is_ops_encoded); } struct tuple * @@ -385,7 +394,7 @@ vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_REPLACE); + NULL, 0, IPROTO_REPLACE, true); } struct tuple * @@ -393,7 +402,7 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_INSERT); + NULL, 0, IPROTO_INSERT, true); } struct tuple * @@ -401,7 +410,7 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_DELETE); + NULL, 0, IPROTO_DELETE, true); } struct tuple * @@ -735,19 +744,20 @@ vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format) /* Always use key format for DELETE statements. */ stmt = vy_stmt_new_with_ops(env->key_format, request.key, request.key_end, - NULL, 0, IPROTO_DELETE); + NULL, 0, IPROTO_DELETE, true); break; case IPROTO_INSERT: case IPROTO_REPLACE: stmt = vy_stmt_new_with_ops(format, request.tuple, request.tuple_end, - NULL, 0, request.type); + NULL, 0, request.type, true); break; case IPROTO_UPSERT: ops.iov_base = (char *)request.ops; ops.iov_len = request.ops_end - request.ops; stmt = vy_stmt_new_upsert(format, request.tuple, - request.tuple_end, &ops, 1); + request.tuple_end, &ops, + 1, true); break; default: /* TODO: report filename. */ diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index 25219230d..65acbecac 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -528,6 +528,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, * @param part_count Part count from key definition. * @param operations Vector of update operations. * @param ops_cnt Length of the update operations vector. + * @param is_ops_encoded True, if update operations are already packed + * into extra msgpack array. * * @retval NULL Memory allocation error. * @retval not NULL Success. @@ -535,7 +537,8 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, struct tuple * vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, - struct iovec *operations, uint32_t ops_cnt); + struct iovec *operations, uint32_t ops_cnt, + bool is_ops_encoded); /** * Create REPLACE statement from UPSERT statement. diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c index e697b6321..3811e91b3 100644 --- a/src/box/vy_upsert.c +++ b/src/box/vy_upsert.c @@ -39,38 +39,217 @@ #include "column_mask.h" /** - * Try to squash two upsert series (msgspacked index_base + ops) - * Try to create a tuple with squahed operations + * Check that key hasn't been changed after applying upsert operation. + */ +static bool +vy_apply_result_does_cross_pk(struct tuple *old_stmt, const char *result, + const char *result_end, struct key_def *cmp_def, + uint64_t col_mask) +{ + if (!key_update_can_be_skipped(cmp_def->column_mask, col_mask)) { + struct tuple *tuple = + vy_stmt_new_replace(tuple_format(old_stmt), result, + result_end); + int cmp_res = vy_stmt_compare(old_stmt, HINT_NONE, tuple, + HINT_NONE, cmp_def); + tuple_unref(tuple); + return cmp_res != 0; + } + return false; +} + +/** + * Apply update operations stored in @a upsert on tuple @a stmt. If @a stmt is + * void statement (i.e. it is NULL or delete statement) then operations are + * applied on tuple stored in @a upsert. Update operations of @a upsert which + * can't be applied are skipped along side with other operations from single + * group (i.e. packed in one msgpack array); errors may be logged depending on + * @a suppress_error flag. * - * @retval 0 && *result_stmt != NULL : successful squash - * @retval 0 && *result_stmt == NULL : unsquashable sources - * @retval -1 - memory error + * @param upsert Upsert statement to be applied on @a stmt. + * @param stmt Statement to be used as base for upsert operations. + * @param cmp_def Key definition required to provide check of primary key + * modification. + * @return Tuple containing result of upsert application; NULL in case OOM. + */ +static struct tuple * +vy_apply_upsert_on_terminal_stmt(struct tuple *upsert, struct tuple *stmt, + struct key_def *cmp_def, bool suppress_error) +{ + assert(vy_stmt_type(upsert) == IPROTO_UPSERT); + assert(stmt == NULL || vy_stmt_type(stmt) != IPROTO_UPSERT); + + uint32_t mp_size; + const char *new_ops = vy_stmt_upsert_ops(upsert, &mp_size); + /* Msgpack containing result of upserts application. */ + const char *result_mp; + bool stmt_is_void = stmt == NULL || vy_stmt_type(stmt) == IPROTO_DELETE; + if (stmt_is_void) + result_mp = vy_upsert_data_range(upsert, &mp_size); + else + result_mp = tuple_data_range(stmt, &mp_size); + const char *result_mp_end = result_mp + mp_size; + /* + * xrow_upsert_execute() allocates result using region, + * so save starting point to release it later. + */ + struct region *region = &fiber()->gc; + size_t region_svp = region_used(region); + uint64_t column_mask = COLUMN_MASK_FULL; + struct tuple_format *format = tuple_format(upsert); + + uint32_t ups_cnt = mp_decode_array(&new_ops); + const char *ups_ops = new_ops; + /* + * In case upsert folds into insert, we must skip first + * update operations. + */ + if (stmt_is_void) { + ups_cnt--; + mp_next(&ups_ops); + } + for (uint32_t i = 0; i < ups_cnt; ++i) { + assert(mp_typeof(*ups_ops) == MP_ARRAY); + const char *ups_ops_end = ups_ops; + mp_next(&ups_ops_end); + const char *exec_res = result_mp; + exec_res = xrow_upsert_execute(ups_ops, ups_ops_end, result_mp, + result_mp_end, format, &mp_size, + 0, suppress_error, &column_mask); + if (exec_res == NULL) { + if (! suppress_error) { + struct error *e = diag_last_error(diag_get()); + assert(e != NULL); + /* Bail out immediately in case of OOM. */ + if (e->type != &type_ClientError) { + region_truncate(region, region_svp); + return NULL; + } + diag_log(); + } + ups_ops = ups_ops_end; + continue; + } + /* + * If it turns out that resulting tuple modifies primary + * key, then simply ignore this upsert. + */ + if (vy_apply_result_does_cross_pk(stmt, exec_res, + exec_res + mp_size, cmp_def, + column_mask)) { + if (!suppress_error) { + say_error("upsert operations %s are not applied"\ + " due to primary key modification", + mp_str(ups_ops)); + } + ups_ops = ups_ops_end; + continue; + } + ups_ops = ups_ops_end; + /* + * Result statement must satisfy space's format. Since upsert's + * tuple correctness is already checked in vy_upsert(), let's + * use its format to provide result verification. + */ + struct tuple_format *format = tuple_format(upsert); + if (tuple_validate_raw(format, exec_res) != 0) { + if (! suppress_error) + diag_log(); + continue; + } + result_mp = exec_res; + result_mp_end = exec_res + mp_size; + } + struct tuple *new_terminal_stmt = vy_stmt_new_replace(format, result_mp, + result_mp_end); + region_truncate(region, region_svp); + if (new_terminal_stmt == NULL) + return NULL; + vy_stmt_set_lsn(new_terminal_stmt, vy_stmt_lsn(upsert)); + return new_terminal_stmt; +} + +/** + * Unpack upsert's update operations from msgpack array + * into array of iovecs. */ +static void +upsert_ops_to_iovec(const char *ops, uint32_t ops_cnt, struct iovec *iov_arr) +{ + for (uint32_t i = 0; i < ops_cnt; ++i) { + assert(mp_typeof(*ops) == MP_ARRAY); + iov_arr[i].iov_base = (char *) ops; + mp_next(&ops); + iov_arr[i].iov_len = ops - (char *) iov_arr[i].iov_base; + } +} + +/** + * Attempt at squashing arithmetic operations applied on the same tuple + * fields. We never squash first member (i.e. operations from the oldest + * upsert) from old operations, since it must be skipped in case such + * upsert turns into insert. + * Also note that content of @a new_ops and @a old_ops arrays can be modified + * as a result of function invocation. + * Array containing resulting operations is allocated using region. + * In case of OOM function returns -1; 0 otherwise. + * */ static int vy_upsert_try_to_squash(struct tuple_format *format, - const char *key_mp, const char *key_mp_end, - const char *old_serie, const char *old_serie_end, - const char *new_serie, const char *new_serie_end, - struct tuple **result_stmt) + struct iovec *new_ops, uint32_t new_ops_cnt, + struct iovec *old_ops, uint32_t old_ops_cnt, + struct iovec **res_ops, uint32_t *res_ops_cnt) { - *result_stmt = NULL; - - size_t squashed_size; - const char *squashed = - xrow_upsert_squash(old_serie, old_serie_end, - new_serie, new_serie_end, format, - &squashed_size); - if (squashed == NULL) + uint32_t squashed_ops = 0; + for (uint32_t i = 0; i < new_ops_cnt; ++i) { + const char *new = (const char *) new_ops[i].iov_base; + const char *new_end = new + new_ops[i].iov_len; + /* + * Don't touch first update operation since it should + * be skipped in case first upsert folds into insert. + */ + for (uint32_t j = 1; j < old_ops_cnt; ++j) { + const char *old = + (const char *) old_ops[j].iov_base; + const char *old_end = + (const char *) old + old_ops[j].iov_len; + /* Operation may already be squashed. */ + if (old == NULL) + continue; + size_t squashed_size; + const char *res = + xrow_upsert_squash(old, old_end, new, new_end, + format, &squashed_size); + if (res != NULL) { + new_ops[i].iov_base = (char *) res; + new_ops[i].iov_len = squashed_size; + old_ops[j].iov_base = NULL; + old_ops[j].iov_len = 0; + squashed_ops++; + } + } + } + *res_ops_cnt = new_ops_cnt + old_ops_cnt - squashed_ops; + if (squashed_ops == 0) { + *res_ops = old_ops; return 0; - /* Successful squash! */ - struct iovec operations[1]; - operations[0].iov_base = (void *)squashed; - operations[0].iov_len = squashed_size; - - *result_stmt = vy_stmt_new_upsert(format, key_mp, key_mp_end, - operations, 1); - if (*result_stmt == NULL) + } + size_t ops_size; + *res_ops = region_alloc_array(&fiber()->gc, typeof(*res_ops[0]), + *res_ops_cnt, &ops_size); + if (*res_ops == NULL) { + diag_set(OutOfMemory, ops_size, "region_alloc_array", + "res_ops"); return -1; + } + /* Fill gaps (i.e. old squashed operations) in the resulting array. */ + for (uint32_t i = 0; i < old_ops_cnt; ++i) { + if (old_ops[i].iov_base != NULL) + (*res_ops)[i] = old_ops[i]; + } + for (uint32_t i = old_ops_cnt - squashed_ops, j = 0; j < new_ops_cnt; + ++i, ++j) + (*res_ops)[i] = new_ops[j]; return 0; } @@ -87,122 +266,69 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt, assert(new_stmt != old_stmt); assert(vy_stmt_type(new_stmt) == IPROTO_UPSERT); - if (old_stmt == NULL || vy_stmt_type(old_stmt) == IPROTO_DELETE) { - /* - * INSERT case: return new stmt. - */ - return vy_stmt_replace_from_upsert(new_stmt); + struct tuple *result_stmt = NULL; + if (old_stmt == NULL || vy_stmt_type(old_stmt) != IPROTO_UPSERT) { + return vy_apply_upsert_on_terminal_stmt(new_stmt, old_stmt, + cmp_def, suppress_error); } - struct tuple_format *format = tuple_format(new_stmt); - + assert(old_stmt != NULL); + assert(vy_stmt_type(old_stmt) == IPROTO_UPSERT); /* - * Unpack UPSERT operation from the new stmt + * Unpack UPSERT operation from the old and new stmts. */ uint32_t mp_size; - const char *new_ops; - new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size); - const char *new_ops_end = new_ops + mp_size; - + const char *old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size); + const char *old_stmt_mp = vy_upsert_data_range(old_stmt, &mp_size); + const char *old_stmt_mp_end = old_stmt_mp + mp_size; + const char *new_ops = vy_stmt_upsert_ops(new_stmt, &mp_size); /* - * Apply new operations to the old stmt + * UPSERT + UPSERT case: try to squash arithmetic operations. + * Firstly, unpack operations to iovec array. */ - const char *result_mp; - if (vy_stmt_type(old_stmt) == IPROTO_UPSERT) - result_mp = vy_upsert_data_range(old_stmt, &mp_size); - else - result_mp = tuple_data_range(old_stmt, &mp_size); - const char *result_mp_end = result_mp + mp_size; - struct tuple *result_stmt = NULL; + struct tuple_format *format = tuple_format(old_stmt); struct region *region = &fiber()->gc; size_t region_svp = region_used(region); - uint8_t old_type = vy_stmt_type(old_stmt); - uint64_t column_mask = COLUMN_MASK_FULL; - result_mp = xrow_upsert_execute(new_ops, new_ops_end, result_mp, - result_mp_end, format, &mp_size, - 0, suppress_error, &column_mask); - if (result_mp == NULL) { + uint32_t old_ops_cnt = mp_decode_array(&old_ops); + uint32_t new_ops_cnt = mp_decode_array(&new_ops); + size_t ops_size; + struct iovec *operations = + region_alloc_array(region, typeof(operations[0]), + old_ops_cnt + new_ops_cnt, &ops_size); + if (operations == NULL) { region_truncate(region, region_svp); + diag_set(OutOfMemory, ops_size, "region_alloc_array", + "operations"); return NULL; } - result_mp_end = result_mp + mp_size; - if (old_type != IPROTO_UPSERT) { - assert(old_type == IPROTO_INSERT || - old_type == IPROTO_REPLACE); - /* - * UPDATE case: return the updated old stmt. - */ - result_stmt = vy_stmt_new_replace(format, result_mp, - result_mp_end); + upsert_ops_to_iovec(old_ops, old_ops_cnt, operations); + upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt]); + struct iovec *res_ops = NULL; + uint32_t res_ops_cnt = 0; + if (vy_upsert_try_to_squash(format, &operations[old_ops_cnt], + new_ops_cnt, operations, old_ops_cnt, + &res_ops, &res_ops_cnt) != 0) { + /* OOM */ region_truncate(region, region_svp); - if (result_stmt == NULL) - return NULL; /* OOM */ - vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt)); - goto check_key; + return NULL; } - + assert(res_ops_cnt > 0); + assert(res_ops != NULL); /* - * Unpack UPSERT operation from the old stmt + * Adding update operations. We keep order of update operations in + * the array the same. It is vital since first set of operations + * must be skipped in case upsert folds into insert. For instance: + * old_ops = {{{op1}, {op2}}, {{op3}}} + * new_ops = {{{op4}, {op5}}} + * res_ops = {{{op1}, {op2}}, {{op3}}, {{op4}, {op5}}} + * If upsert corresponding to old_ops becomes insert, then + * {{op1}, {op2}} update operations are not applied. */ - assert(old_stmt != NULL); - const char *old_ops; - old_ops = vy_stmt_upsert_ops(old_stmt, &mp_size); - const char *old_ops_end = old_ops + mp_size; - assert(old_ops_end > old_ops); - - /* - * UPSERT + UPSERT case: combine operations - */ - assert(old_ops_end - old_ops > 0); - if (vy_upsert_try_to_squash(format, result_mp, result_mp_end, - old_ops, old_ops_end, new_ops, new_ops_end, - &result_stmt) != 0) { - region_truncate(region, region_svp); - return NULL; - } - if (result_stmt != NULL) { - region_truncate(region, region_svp); - vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt)); - goto check_key; - } - - /* Failed to squash, simply add one upsert to another */ - int old_ops_cnt, new_ops_cnt; - struct iovec operations[3]; - - old_ops_cnt = mp_decode_array(&old_ops); - operations[1].iov_base = (void *)old_ops; - operations[1].iov_len = old_ops_end - old_ops; - - new_ops_cnt = mp_decode_array(&new_ops); - operations[2].iov_base = (void *)new_ops; - operations[2].iov_len = new_ops_end - new_ops; - - char ops_buf[16]; - char *header = mp_encode_array(ops_buf, old_ops_cnt + new_ops_cnt); - operations[0].iov_base = (void *)ops_buf; - operations[0].iov_len = header - ops_buf; - - result_stmt = vy_stmt_new_upsert(format, result_mp, result_mp_end, - operations, 3); + result_stmt = vy_stmt_new_upsert(format, old_stmt_mp, old_stmt_mp_end, + res_ops, res_ops_cnt, false); region_truncate(region, region_svp); if (result_stmt == NULL) return NULL; vy_stmt_set_lsn(result_stmt, vy_stmt_lsn(new_stmt)); - -check_key: - /* - * Check that key hasn't been changed after applying operations. - */ - if (!key_update_can_be_skipped(cmp_def->column_mask, column_mask) && - vy_stmt_compare(old_stmt, HINT_NONE, result_stmt, - HINT_NONE, cmp_def) != 0) { - /* - * Key has been changed: ignore this UPSERT and - * @retval the old stmt. - */ - tuple_unref(result_stmt); - result_stmt = vy_stmt_dup(old_stmt); - } return result_stmt; } diff --git a/src/box/xrow_update.c b/src/box/xrow_update.c index 833975f03..62893c735 100644 --- a/src/box/xrow_update.c +++ b/src/box/xrow_update.c @@ -416,6 +416,60 @@ xrow_upsert_execute(const char *expr,const char *expr_end, return xrow_update_finish(&update, format, p_tuple_len); } +/** Return true if value stored in @a arith_op satisfy format requirements. */ +static bool +update_arith_op_does_satisfy_format(struct xrow_update_op *arith_op, + struct tuple_format *format) +{ + /* + * Upsert's tuple may contain more tuples than specified in + * space's format. In this case they are assumed to be unsigned. + */ + enum field_type fld_type = FIELD_TYPE_UNSIGNED; + if (arith_op->field_no < 0 || + (uint32_t) arith_op->field_no < tuple_format_field_count(format)) { + struct tuple_field *fld = + tuple_format_field(format, arith_op->field_no); + assert(fld != NULL); + fld_type = fld->type; + } + struct xrow_update_arg_arith arith = arith_op->arg.arith; + if (fld_type == FIELD_TYPE_UNSIGNED) { + if (arith.type == XUPDATE_TYPE_INT) { + if (!int96_is_uint64(&arith.int96)) + return false; + return true; + } + /* Can't store decimals/floating points in unsigned field. */ + return false; + } + if (fld_type == FIELD_TYPE_INTEGER) { + if (arith.type == XUPDATE_TYPE_INT) + return true; + return false; + } + if (fld_type == FIELD_TYPE_DOUBLE) { + if (arith.type == XUPDATE_TYPE_FLOAT || + arith.type == XUPDATE_TYPE_DOUBLE) + return true; + return false; + } + if (fld_type == FIELD_TYPE_NUMBER || fld_type == FIELD_TYPE_SCALAR) { + if (arith.type != XUPDATE_TYPE_DECIMAL) + return true; + return false; + } + if (fld_type == FIELD_TYPE_DECIMAL) { + if (arith.type == XUPDATE_TYPE_DECIMAL) + return true; + return false; + } + if (fld_type == FIELD_TYPE_ANY) + return true; + /* All other field types are not compatible with numerics. */ + return false; +} + const char * xrow_upsert_squash(const char *expr1, const char *expr1_end, const char *expr2, const char *expr2_end, @@ -535,6 +589,13 @@ xrow_upsert_squash(const char *expr1, const char *expr1_end, if (xrow_update_arith_make(op[1], arith, &res.arg.arith) != 0) return NULL; + res.field_no = op[0]->field_no; + /* + * Do not squash operations if they don't satisfy + * format. + */ + if (!update_arith_op_does_satisfy_format(&res, format)) + return NULL; res_ops = mp_encode_array(res_ops, 3); res_ops = mp_encode_str(res_ops, (const char *)&op[0]->opcode, 1); diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c index 0d20f19ef..15470920b 100644 --- a/test/unit/vy_iterators_helper.c +++ b/test/unit/vy_iterators_helper.c @@ -112,7 +112,7 @@ vy_new_simple_stmt(struct tuple_format *format, struct key_def *key_def, ops = mp_encode_int(ops, templ->upsert_value); operations[0].iov_base = tmp; operations[0].iov_len = ops - tmp; - ret = vy_stmt_new_upsert(format, buf, pos, operations, 1); + ret = vy_stmt_new_upsert(format, buf, pos, operations, 1, true); fail_if(ret == NULL); break; } diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result index 3a7f6629d..343084e81 100644 --- a/test/vinyl/upsert.result +++ b/test/vinyl/upsert.result @@ -899,3 +899,397 @@ s:select() s:drop() --- ... +-- gh-5107: don't squash upsert operations into one array. +-- +-- gh-5087: test upsert execution/squash referring to fields in reversed +-- order (via negative indexing). +-- +s = box.schema.create_space('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk') +--- +... +s:insert({1, 1, 1}) +--- +- [1, 1, 1] +... +box.snapshot() +--- +- ok +... +s:upsert({1}, {{'=', 3, 100}}) +--- +... +s:upsert({1}, {{'=', -1, 200}}) +--- +... +box.snapshot() +--- +- ok +... +s:select() -- {1, 1, 200} +--- +- - [1, 1, 200] +... +s:delete({1}) +--- +... +s:insert({1, 1, 1}) +--- +- [1, 1, 1] +... +box.snapshot() +--- +- ok +... +s:upsert({1}, {{'=', -3, 100}}) +--- +... +s:upsert({1}, {{'=', -1, 200}}) +--- +... +box.snapshot() +--- +- ok +... +-- gh-5105: Two upserts are NOT squashed into one, so only one (first one) +-- is skipped, meanwhile second one is applied. +-- +s:select() -- {1, 1, 1} +--- +- - [1, 1, 200] +... +s:delete({1}) +--- +... +box.snapshot() +--- +- ok +... +s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1} +--- +... +s:upsert({1}, {{'+', -1, 100}}) -- {1, 101} +--- +... +s:upsert({1}, {{'-', 2, 100}}) -- {1, 1} +--- +... +s:upsert({1}, {{'+', -1, 200}}) -- {1, 201} +--- +... +s:upsert({1}, {{'-', 2, 200}}) -- {1, 1} +--- +... +box.snapshot() +--- +- ok +... +s:select() -- {1, 1} +--- +- - [1, 1] +... +s:delete({1}) +--- +... +box.snapshot() +--- +- ok +... +s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1} +--- +... +s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1} +--- +... +s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100} +--- +... +s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300} +--- +... +s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300} +--- +... +box.snapshot() +--- +- ok +... +s:select() +--- +- - [1, 1, 300] +... +s:drop() +--- +... +-- gh-1622: upsert operations which break space format are not applied. +-- +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 }) +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1, 1} +--- +- [1, 1] +... +-- Error is logged, upsert is not applied. +-- +s:upsert({1, 1}, {{'=', 3, 5}}) +--- +... +-- During read the incorrect upsert is ignored. +-- +s:select{} +--- +- - [1, 1] +... +-- Try to set incorrect field_count in a transaction. +-- +box.begin() +--- +... +s:replace{2, 2} +--- +- [2, 2] +... +s:upsert({2, 2}, {{'=', 3, 2}}) +--- +... +s:select{} +--- +- - [1, 1] + - [2, 2] +... +box.commit() +--- +... +s:select{} +--- +- - [1, 1] + - [2, 2] +... +-- Read incorrect upsert from a run: it should be ignored. +-- +box.snapshot() +--- +- ok +... +s:select{} +--- +- - [1, 1] + - [2, 2] +... +s:upsert({2, 2}, {{'=', 3, 20}}) +--- +... +box.snapshot() +--- +- ok +... +s:select{} +--- +- - [1, 1] + - [2, 2] +... +-- Execute replace/delete after invalid upsert. +-- +box.snapshot() +--- +- ok +... +s:upsert({2, 2}, {{'=', 3, 30}}) +--- +... +s:replace{2, 3} +--- +- [2, 3] +... +s:select{} +--- +- - [1, 1] + - [2, 3] +... +s:upsert({1, 1}, {{'=', 3, 30}}) +--- +... +s:delete{1} +--- +... +s:select{} +--- +- - [2, 3] +... +-- Invalid upsert in a sequence of upserts is skipped meanwhile +-- the rest are applied. +-- +box.snapshot() +--- +- ok +... +s:upsert({2, 2}, {{'+', 2, 5}}) +--- +... +s:upsert({2, 2}, {{'=', 3, 40}}) +--- +... +s:upsert({2, 2}, {{'+', 2, 5}}) +--- +... +s:select{} +--- +- - [2, 13] +... +box.snapshot() +--- +- ok +... +s:select{} +--- +- - [2, 13] +... +s:drop() +--- +... +-- Test different scenarious during which update operations squash can't +-- take place due to format violations. +-- +decimal = require('decimal') +--- +... +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 }) +--- +... +s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\ + {name='s', type='scalar'}, {name='f', type='double'},\ + {name='d', type='decimal'}}) +--- +... +pk = s:create_index('pk') +--- +... +s:replace{1, 1, 1, 1.1, decimal.new(1.1) } +--- +- [1, 1, 1, 1.1, 1.1] +... +s:replace{2, 1, 1, 1.1, decimal.new(1.1)} +--- +- [2, 1, 1, 1.1, 1.1] +... +box.snapshot() +--- +- ok +... +-- Can't assign integer to float field. First operation is still applied. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}}) +--- +... +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}}) +--- +... +-- Can't add floating point to integer (result is floating point). +-- +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}}) +--- +... +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}}) +--- +... +box.snapshot() +--- +- ok +... +s:select() +--- +- - [1, 1, 1, 5.1, 1.1] + - [2, 6, 1, 1.1, 1.1] +... +-- Integer overflow check. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}}) +--- +... +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}}) +--- +... +-- Negative result of subtraction stored in unsigned field. +-- +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}}) +--- +... +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}}) +--- +... +box.snapshot() +--- +- ok +... +s:select() +--- +- - [1, 1, 9223372036854775809, 5.1, 1.1] + - [2, 8, 1, 1.1, 1.1] +... +-- Decimals do not fit into numerics and vice versa. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}}) +--- +... +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}}) +--- +... +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}}) +--- +... +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}}) +--- +... +box.snapshot() +--- +- ok +... +s:select() +--- +- - [1, 1, 9223372036854775809, 5.1, 2.1] + - [2, 8, 1, 1.1, 1.1] +... +s:drop() +--- +... +-- Make sure upserts satisfy associativity rule. +-- +s = box.schema.space.create('test', {engine='vinyl'}) +--- +... +i = s:create_index('pk', {parts={2, 'uint'}}) +--- +... +s:replace{1, 2, 3, 'default'} +--- +- [1, 2, 3, 'default'] +... +box.snapshot() +--- +- ok +... +s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}}) +--- +... +-- Upsert will fail and thus ignored. +-- +s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}}) +--- +... +box.snapshot() +--- +- ok +... +s:select{} +--- +- - [1, 2, 3, 'upserted'] +... +s:drop() +--- +... diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua index 1d77474da..5c5391f44 100644 --- a/test/vinyl/upsert.test.lua +++ b/test/vinyl/upsert.test.lua @@ -372,3 +372,168 @@ box.snapshot() s:select() s:drop() + +-- gh-5107: don't squash upsert operations into one array. +-- +-- gh-5087: test upsert execution/squash referring to fields in reversed +-- order (via negative indexing). +-- +s = box.schema.create_space('test', {engine = 'vinyl'}) +pk = s:create_index('pk') +s:insert({1, 1, 1}) +box.snapshot() + +s:upsert({1}, {{'=', 3, 100}}) +s:upsert({1}, {{'=', -1, 200}}) +box.snapshot() +s:select() -- {1, 1, 200} + +s:delete({1}) +s:insert({1, 1, 1}) +box.snapshot() + +s:upsert({1}, {{'=', -3, 100}}) +s:upsert({1}, {{'=', -1, 200}}) +box.snapshot() +-- gh-5105: Two upserts are NOT squashed into one, so only one (first one) +-- is skipped, meanwhile second one is applied. +-- +s:select() -- {1, 1, 1} + +s:delete({1}) +box.snapshot() + +s:upsert({1, 1}, {{'=', -2, 300}}) -- {1, 1} +s:upsert({1}, {{'+', -1, 100}}) -- {1, 101} +s:upsert({1}, {{'-', 2, 100}}) -- {1, 1} +s:upsert({1}, {{'+', -1, 200}}) -- {1, 201} +s:upsert({1}, {{'-', 2, 200}}) -- {1, 1} +box.snapshot() +s:select() -- {1, 1} + +s:delete({1}) +box.snapshot() + +s:upsert({1, 1, 1}, {{'!', -1, 300}}) -- {1, 1, 1} +s:upsert({1}, {{'+', -2, 100}}) -- {1, 101, 1} +s:upsert({1}, {{'=', -1, 100}}) -- {1, 101, 100} +s:upsert({1}, {{'+', -1, 200}}) -- {1, 101, 300} +s:upsert({1}, {{'-', -2, 100}}) -- {1, 1, 300} +box.snapshot() +s:select() + +s:drop() + +-- gh-1622: upsert operations which break space format are not applied. +-- +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 2 }) +pk = s:create_index('pk') +s:replace{1, 1} +-- Error is logged, upsert is not applied. +-- +s:upsert({1, 1}, {{'=', 3, 5}}) +-- During read the incorrect upsert is ignored. +-- +s:select{} + +-- Try to set incorrect field_count in a transaction. +-- +box.begin() +s:replace{2, 2} +s:upsert({2, 2}, {{'=', 3, 2}}) +s:select{} +box.commit() +s:select{} + +-- Read incorrect upsert from a run: it should be ignored. +-- +box.snapshot() +s:select{} +s:upsert({2, 2}, {{'=', 3, 20}}) +box.snapshot() +s:select{} + +-- Execute replace/delete after invalid upsert. +-- +box.snapshot() +s:upsert({2, 2}, {{'=', 3, 30}}) +s:replace{2, 3} +s:select{} + +s:upsert({1, 1}, {{'=', 3, 30}}) +s:delete{1} +s:select{} + +-- Invalid upsert in a sequence of upserts is skipped meanwhile +-- the rest are applied. +-- +box.snapshot() +s:upsert({2, 2}, {{'+', 2, 5}}) +s:upsert({2, 2}, {{'=', 3, 40}}) +s:upsert({2, 2}, {{'+', 2, 5}}) +s:select{} +box.snapshot() +s:select{} + +s:drop() + +-- Test different scenarious during which update operations squash can't +-- take place due to format violations. +-- +decimal = require('decimal') + +s = box.schema.space.create('test', { engine = 'vinyl', field_count = 5 }) +s:format({{name='id', type='unsigned'}, {name='u', type='unsigned'},\ + {name='s', type='scalar'}, {name='f', type='double'},\ + {name='d', type='decimal'}}) +pk = s:create_index('pk') +s:replace{1, 1, 1, 1.1, decimal.new(1.1) } +s:replace{2, 1, 1, 1.1, decimal.new(1.1)} +box.snapshot() +-- Can't assign integer to float field. First operation is still applied. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 4, 4}}) +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'=', 4, 4}}) +-- Can't add floating point to integer (result is floating point). +-- +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5}}) +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 5.5}}) +box.snapshot() +s:select() +-- Integer overflow check. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}}) +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 3, 9223372036854775808}}) +-- Negative result of subtraction stored in unsigned field. +-- +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, 2}}) +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, 10}}) +box.snapshot() +s:select() +-- Decimals do not fit into numerics and vice versa. +-- +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 5, 2}}) +s:upsert({1, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 5, 1}}) +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'+', 2, decimal.new(2.1)}}) +s:upsert({2, 1, 1, 2.5, decimal.new(1.1)}, {{'-', 2, decimal.new(1.2)}}) +box.snapshot() +s:select() + +s:drop() + +-- Make sure upserts satisfy associativity rule. +-- +s = box.schema.space.create('test', {engine='vinyl'}) +i = s:create_index('pk', {parts={2, 'uint'}}) +s:replace{1, 2, 3, 'default'} +box.snapshot() + +s:upsert({2, 2, 2}, {{'=', 4, 'upserted'}}) +-- Upsert will fail and thus ignored. +-- +s:upsert({2, 2, 2}, {{'#', 1, 1}, {'!', 3, 1}}) +box.snapshot() + +s:select{} + +s:drop() -- 2.17.1