From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from smtpng1.m.smailru.net (smtpng1.m.smailru.net [94.100.181.251]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id A310C44643A for ; Wed, 7 Oct 2020 01:12:59 +0300 (MSK) References: <743a25a986ebbe4388d8f6ffc7d1502f20a5efb9.1601729099.git.korablev@tarantool.org> From: Vladislav Shpilevoy Message-ID: Date: Wed, 7 Oct 2020 00:12:57 +0200 MIME-Version: 1.0 In-Reply-To: <743a25a986ebbe4388d8f6ffc7d1502f20a5efb9.1601729099.git.korablev@tarantool.org> Content-Type: text/plain; charset=utf-8 Content-Language: en-US Content-Transfer-Encoding: 7bit Subject: Re: [Tarantool-patches] [PATCH v3 1/2] vinyl: rework upsert operation List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Nikita Pettik , tarantool-patches@dev.tarantool.org Thanks for the patch! See 3 comments below, and my fixes on top of this branch in a separate commit. > src/box/vinyl.c | 2 +- > src/box/vy_stmt.c | 28 +- > src/box/vy_stmt.h | 5 +- > src/box/vy_upsert.c | 302 +++++++++++--------- > test/unit/vy_iterators_helper.c | 2 +- > test/vinyl/upsert.result | 473 ++++++++++++++++++++++++++++++++ > test/vinyl/upsert.test.lua | 194 +++++++++++++ > 7 files changed, 867 insertions(+), 139 deletions(-) > > diff --git a/src/box/vinyl.c b/src/box/vinyl.c > index cee39c58c..9d9ee0613 100644 > --- a/src/box/vinyl.c > +++ b/src/box/vinyl.c > @@ -1976,7 +1976,7 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm, > operations[0].iov_base = (void *)expr; > operations[0].iov_len = expr_end - expr; > vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end, > - operations, 1); > + operations, 1, false); 1. You don't really need that flag. In the previous review I tried to explain it, but now I added a commit on top of your branch where I removed this parameter. Take a look. This argument does not have much sense. First of all, the operations array is not really operations. Iovecs here can be separate operations; it can be a single iovec with all operations; it can be several iovecs with first having additional MP_ARRAY encoded before a first operation. So these are just pieces of raw data. And you can use that. You can push a header of these operations as a part of 'operations' array always, like I did in my commit. No need to pass a new argument for that. > if (vystmt == NULL) > return -1; > assert(vy_stmt_type(vystmt) == IPROTO_UPSERT); > diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c > index e697b6321..2b5a16730 100644 > --- a/src/box/vy_upsert.c > +++ b/src/box/vy_upsert.c > @@ -39,39 +39,150 @@ > #include "column_mask.h" > > /** > - * Try to squash two upsert series (msgspacked index_base + ops) > - * Try to create a tuple with squahed operations > + * Check that key hasn't been changed after applying upsert operation. > + */ > +static bool > +vy_apply_result_does_cross_pk(struct tuple *old_stmt, const char *result, > + const char *result_end, struct key_def *cmp_def, > + uint64_t col_mask) > +{ > + if (!key_update_can_be_skipped(cmp_def->column_mask, col_mask)) { > + struct tuple *tuple = > + vy_stmt_new_replace(tuple_format(old_stmt), result, > + result_end); > + int cmp_res = vy_stmt_compare(old_stmt, HINT_NONE, tuple, > + HINT_NONE, cmp_def); > + tuple_unref(tuple); > + return cmp_res != 0; > + } > + return false; > +} > + > +/** > + * Apply update operations stored in @a upsert on tuple @a stmt. If @a stmt is > + * void statement (i.e. it is NULL or delete statement) then operations are > + * applied on tuple stored in @a upsert. Update operations of @a upsert which 2. Applied on tuple stored in @a upsert? Upsert logic is that the operations are ignored, if it appears the statement is first in its key. Did you mean, that you apply all operation sets except the first set? Below I see a comment /* * In case upsert folds into insert, we must skip first * update operations. */ So this is probably it. Just the function comment is a bit misleading. > + * can't be applied are skipped along side with other operations from single > + * group (i.e. packed in one msgpack array); errors may be logged depending on > + * @a suppress_error flag. > * > - * @retval 0 && *result_stmt != NULL : successful squash > - * @retval 0 && *result_stmt == NULL : unsquashable sources > - * @retval -1 - memory error > + * @param upsert Upsert statement to be applied on @a stmt. > + * @param stmt Statement to be used as base for upsert operations. > + * @param cmp_def Key definition required to provide check of primary key > + * modification. 3. param suppress_error is missing. > + * @return Tuple containing result of upsert application; NULL in case OOM. > */ My commit on top of the branch: ==================== [tosquash] vinyl: remove is_ops_encoded argument It was used in function vy_stmt_new_with_ops, but wasn't really needed. Because the function takes an array of iovec structs, where the caller can put the array header. diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 9d9ee0613..ee87a6c6e 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1972,11 +1972,15 @@ vy_lsm_upsert(struct vy_tx *tx, struct vy_lsm *lsm, { assert(tx == NULL || tx->state == VINYL_TX_READY); struct tuple *vystmt; - struct iovec operations[1]; - operations[0].iov_base = (void *)expr; - operations[0].iov_len = expr_end - expr; + struct iovec operations[2]; + /* MP_ARRAY with size 1. */ + char header = 0x91; + operations[0].iov_base = &header; + operations[0].iov_len = 1; + operations[1].iov_base = (void *)expr; + operations[1].iov_len = expr_end - expr; vystmt = vy_stmt_new_upsert(lsm->mem_format, tuple, tuple_end, - operations, 1, false); + operations, 2); if (vystmt == NULL) return -1; assert(vy_stmt_type(vystmt) == IPROTO_UPSERT); diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index 2ee82b3f2..92e0aa1c5 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -313,22 +313,16 @@ vy_key_dup(const char *key) static struct tuple * vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, struct iovec *ops, - int op_count, enum iproto_type type, bool is_ops_encoded) + int op_count, enum iproto_type type) { mp_tuple_assert(tuple_begin, tuple_end); const char *tmp = tuple_begin; mp_decode_array(&tmp); - /* - * ops are grouped in one extra array. - * See vy_apply_upsert() for details. - */ size_t ops_size = 0; for (int i = 0; i < op_count; ++i) ops_size += ops[i].iov_len; - if (!is_ops_encoded) - ops_size += mp_sizeof_array(op_count); struct tuple *stmt = NULL; struct region *region = &fiber()->gc; @@ -366,8 +360,6 @@ vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin, field_map_build(&builder, wpos - field_map_size); memcpy(wpos, tuple_begin, mpsize); wpos += mpsize; - if (!is_ops_encoded) - wpos = mp_encode_array(wpos, op_count); for (struct iovec *op = ops, *end = ops + op_count; op != end; ++op) { memcpy(wpos, op->iov_base, op->iov_len); @@ -382,11 +374,10 @@ end: struct tuple * vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, struct iovec *operations, - uint32_t ops_cnt, bool is_ops_encoded) + uint32_t ops_cnt) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - operations, ops_cnt, IPROTO_UPSERT, - is_ops_encoded); + operations, ops_cnt, IPROTO_UPSERT); } struct tuple * @@ -394,7 +385,7 @@ vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_REPLACE, true); + NULL, 0, IPROTO_REPLACE); } struct tuple * @@ -402,7 +393,7 @@ vy_stmt_new_insert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_INSERT, true); + NULL, 0, IPROTO_INSERT); } struct tuple * @@ -410,7 +401,7 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, const char *tuple_end) { return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, - NULL, 0, IPROTO_DELETE, true); + NULL, 0, IPROTO_DELETE); } struct tuple * @@ -744,20 +735,19 @@ vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format) /* Always use key format for DELETE statements. */ stmt = vy_stmt_new_with_ops(env->key_format, request.key, request.key_end, - NULL, 0, IPROTO_DELETE, true); + NULL, 0, IPROTO_DELETE); break; case IPROTO_INSERT: case IPROTO_REPLACE: stmt = vy_stmt_new_with_ops(format, request.tuple, request.tuple_end, - NULL, 0, request.type, true); + NULL, 0, request.type); break; case IPROTO_UPSERT: ops.iov_base = (char *)request.ops; ops.iov_len = request.ops_end - request.ops; stmt = vy_stmt_new_upsert(format, request.tuple, - request.tuple_end, &ops, - 1, true); + request.tuple_end, &ops, 1); break; default: /* TODO: report filename. */ diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index 65acbecac..24c7eaad7 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -526,10 +526,11 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, * @param tuple_end End of the array that begins from @param tuple_begin. * @param format Format of a tuple for offsets generating. * @param part_count Part count from key definition. - * @param operations Vector of update operations. + * @param operations Vector of update operation pieces. Each iovec here may be + * a part of an operation, or a whole operation, or something including + * several operations. It is just a list of buffers. Each buffer is not + * interpreted as an independent operation. * @param ops_cnt Length of the update operations vector. - * @param is_ops_encoded True, if update operations are already packed - * into extra msgpack array. * * @retval NULL Memory allocation error. * @retval not NULL Success. @@ -537,8 +538,7 @@ vy_stmt_new_delete(struct tuple_format *format, const char *tuple_begin, struct tuple * vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin, const char *tuple_end, - struct iovec *operations, uint32_t ops_cnt, - bool is_ops_encoded); + struct iovec *operations, uint32_t ops_cnt); /** * Create REPLACE statement from UPSERT statement. diff --git a/src/box/vy_upsert.c b/src/box/vy_upsert.c index 2b5a16730..d73896cad 100644 --- a/src/box/vy_upsert.c +++ b/src/box/vy_upsert.c @@ -223,18 +223,23 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt, size_t region_svp = region_used(region); uint32_t old_ops_cnt = mp_decode_array(&old_ops); uint32_t new_ops_cnt = mp_decode_array(&new_ops); + uint32_t total_ops_cnt = old_ops_cnt + new_ops_cnt; size_t ops_size; struct iovec *operations = region_alloc_array(region, typeof(operations[0]), - old_ops_cnt + new_ops_cnt, &ops_size); + total_ops_cnt + 1, &ops_size); if (operations == NULL) { region_truncate(region, region_svp); diag_set(OutOfMemory, ops_size, "region_alloc_array", "operations"); return NULL; } - upsert_ops_to_iovec(old_ops, old_ops_cnt, operations); - upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt]); + char header[16]; + char *header_end = mp_encode_array(header, total_ops_cnt); + operations[0].iov_base = header; + operations[0].iov_len = header_end - header; + upsert_ops_to_iovec(old_ops, old_ops_cnt, &operations[1]); + upsert_ops_to_iovec(new_ops, new_ops_cnt, &operations[old_ops_cnt + 1]); /* * Adding update operations. We keep order of update operations in * the array the same. It is vital since first set of operations @@ -246,8 +251,7 @@ vy_apply_upsert(struct tuple *new_stmt, struct tuple *old_stmt, * {{op1}, {op2}} update operations are not applied. */ result_stmt = vy_stmt_new_upsert(format, old_stmt_mp, old_stmt_mp_end, - operations, old_ops_cnt + new_ops_cnt, - false); + operations, total_ops_cnt + 1); region_truncate(region, region_svp); if (result_stmt == NULL) return NULL; diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c index 15470920b..0d20f19ef 100644 --- a/test/unit/vy_iterators_helper.c +++ b/test/unit/vy_iterators_helper.c @@ -112,7 +112,7 @@ vy_new_simple_stmt(struct tuple_format *format, struct key_def *key_def, ops = mp_encode_int(ops, templ->upsert_value); operations[0].iov_base = tmp; operations[0].iov_len = ops - tmp; - ret = vy_stmt_new_upsert(format, buf, pos, operations, 1, true); + ret = vy_stmt_new_upsert(format, buf, pos, operations, 1); fail_if(ret == NULL); break; }