From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from mail-lj1-f195.google.com (mail-lj1-f195.google.com [209.85.208.195]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by dev.tarantool.org (Postfix) with ESMTPS id BD7204696C3 for ; Tue, 14 Apr 2020 01:12:31 +0300 (MSK) Received: by mail-lj1-f195.google.com with SMTP id l14so3406834ljj.5 for ; Mon, 13 Apr 2020 15:12:31 -0700 (PDT) Date: Tue, 14 Apr 2020 01:12:29 +0300 From: Konstantin Osipov Message-ID: <20200413221229.GA3462@atlas> References: <670c3876e58a7cfa14d45db1dc074a10dd034759.1586808463.git.korablev@tarantool.org> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <670c3876e58a7cfa14d45db1dc074a10dd034759.1586808463.git.korablev@tarantool.org> Subject: Re: [Tarantool-patches] [PATCH 2/2] vinyl: skip invalid upserts during squash List-Id: Tarantool development patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , To: Nikita Pettik Cc: tarantool-patches@dev.tarantool.org, v.shpilevoy@tarantool.org * Nikita Pettik [20/04/14 00:57]: > Instead of aborting merge sequence of upserts let's log appeared > errors and skip upserts which can't be applied. It makes sense > taking into consideration previous commit: now upsert statements which > can't be applied may appear in mems/runs (previously squash operation > might fail only due to OOM). As a result, if we didn't ignore invalid > upserts, dump or compaction processes would not be able to finish (owing > to inability to squash upserts). We should log the upsert itself, base64 encoded. Second, vy_history_apply may be called from many contexts - reads and writes, for example. We should only log and skip during compaction, not when reading, otherwise we'll spam the log file at least. Finally, let's double check there are no issues with the used format - can it become obsolete by the time it's used, e.g. if there is an online/non-blocking schema change that happened in tx thread (compaction is running in the write thread)? > > Closes #1622 > --- > src/box/vy_history.c | 20 +++- > src/box/vy_lsm.c | 13 +- > src/box/vy_tx.c | 29 +++-- > src/box/vy_write_iterator.c | 34 ++++-- > .../vinyl/gh-1622-skip-invalid-upserts.result | 113 +++++++++++++++++- > .../gh-1622-skip-invalid-upserts.test.lua | 41 ++++++- > 6 files changed, 220 insertions(+), 30 deletions(-) > > diff --git a/src/box/vy_history.c b/src/box/vy_history.c > index 0f3b71195..55eb3c669 100644 > --- a/src/box/vy_history.c > +++ b/src/box/vy_history.c > @@ -102,12 +102,20 @@ vy_history_apply(struct vy_history *history, struct key_def *cmp_def, > while (node != NULL) { > struct tuple *stmt = vy_apply_upsert(node->stmt, curr_stmt, > cmp_def, format, true); > - ++*upserts_applied; > - if (curr_stmt != NULL) > - tuple_unref(curr_stmt); > - if (stmt == NULL) > - return -1; > - curr_stmt = stmt; > + if (stmt == NULL) { > + /* > + * In case statement hasn't been applied, > + * simply skip it ignoring errors (otherwise > + * error will appear during tuple read). > + */ > + assert(diag_last_error(diag_get()) != NULL); > + diag_clear(diag_get()); > + } else { > + ++*upserts_applied; > + if (curr_stmt != NULL) > + tuple_unref(curr_stmt); > + curr_stmt = stmt; > + } > node = rlist_prev_entry_safe(node, &history->stmts, link); > } > *ret = curr_stmt; > diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c > index 04c9926a8..3d630fc01 100644 > --- a/src/box/vy_lsm.c > +++ b/src/box/vy_lsm.c > @@ -984,13 +984,20 @@ vy_lsm_commit_upsert(struct vy_lsm *lsm, struct vy_mem *mem, > struct tuple *upserted = > vy_apply_upsert(stmt, older, lsm->cmp_def, > lsm->mem_format, false); > - lsm->stat.upsert.applied++; > - > if (upserted == NULL) { > - /* OOM */ > + /* > + * OOM or upserted tuple does not fulfill > + * space format. Hence, upsert can't be > + * transformed into replace. Log error > + * and exit. > + */ > + struct error *e = diag_last_error(diag_get()); > + assert(e != NULL); > + error_log(e); > diag_clear(diag_get()); > return; > } > + lsm->stat.upsert.applied++; > int64_t upserted_lsn = vy_stmt_lsn(upserted); > if (upserted_lsn != lsn) { > /** > diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c > index 5029bd8a1..060a7f6a9 100644 > --- a/src/box/vy_tx.c > +++ b/src/box/vy_tx.c > @@ -515,11 +515,15 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem, > region_stmt); > tuple_unref(applied); > return rc; > + } else { > + /* > + * Ignore a memory error, because it is > + * not critical to apply the optimization. > + * Clear diag: otherwise error is set but > + * function may return success return code. > + */ > + diag_clear(diag_get()); > } > - /* > - * Ignore a memory error, because it is > - * not critical to apply the optimization. > - */ > } > } else { > /* Invalidate cache element. */ > @@ -1047,12 +1051,17 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt) > > applied = vy_apply_upsert(stmt, old->stmt, lsm->cmp_def, > lsm->mem_format, true); > - lsm->stat.upsert.applied++; > - if (applied == NULL) > - return -1; > - stmt = applied; > - assert(vy_stmt_type(stmt) != 0); > - lsm->stat.upsert.squashed++; > + if (applied == NULL) { > + struct error *e = diag_last_error(diag_get()); > + assert(e != NULL); > + error_log(e); > + diag_clear(diag_get()); > + } else { > + lsm->stat.upsert.applied++; > + stmt = applied; > + assert(vy_stmt_type(stmt) != 0); > + lsm->stat.upsert.squashed++; > + } > } > > /* Allocate a MVCC container. */ > diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c > index 7a6a20627..4b5b3e673 100644 > --- a/src/box/vy_write_iterator.c > +++ b/src/box/vy_write_iterator.c > @@ -850,10 +850,22 @@ vy_read_view_merge(struct vy_write_iterator *stream, struct tuple *hint, > vy_stmt_type(hint) != IPROTO_UPSERT); > struct tuple *applied = vy_apply_upsert(h->tuple, hint, > stream->cmp_def, stream->format, false); > - if (applied == NULL) > - return -1; > - vy_stmt_unref_if_possible(h->tuple); > - h->tuple = applied; > + if (applied == NULL) { > + /* > + * Current upsert can't be applied. > + * Let's skip it and log error. > + */ > + struct error *e = diag_last_error(diag_get()); > + assert(e != NULL); > + say_info("Upsert %s is not applied due to the error: %s", > + vy_stmt_str(h->tuple), e->errmsg); > + diag_clear(diag_get()); > + vy_stmt_unref_if_possible(h->tuple); > + h->tuple = hint; > + } else { > + vy_stmt_unref_if_possible(h->tuple); > + h->tuple = applied; > + } > } > /* Squash the rest of UPSERTs. */ > struct vy_write_history *result = h; > @@ -864,10 +876,16 @@ vy_read_view_merge(struct vy_write_iterator *stream, struct tuple *hint, > assert(result->tuple != NULL); > struct tuple *applied = vy_apply_upsert(h->tuple, result->tuple, > stream->cmp_def, stream->format, false); > - if (applied == NULL) > - return -1; > - vy_stmt_unref_if_possible(result->tuple); > - result->tuple = applied; > + if (applied == NULL) { > + struct error *e = diag_last_error(diag_get()); > + assert(e != NULL); > + say_info("Upsert %s is not applied due to the error: %s", > + vy_stmt_str(h->tuple), e->errmsg); > + diag_clear(diag_get()); > + } else { > + vy_stmt_unref_if_possible(result->tuple); > + result->tuple = applied; > + } > vy_stmt_unref_if_possible(h->tuple); > /* > * Don't bother freeing 'h' since it's > diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.result b/test/vinyl/gh-1622-skip-invalid-upserts.result > index 437ff3c51..b59ed53dc 100644 > --- a/test/vinyl/gh-1622-skip-invalid-upserts.result > +++ b/test/vinyl/gh-1622-skip-invalid-upserts.result > @@ -14,11 +14,120 @@ s:replace{1, 1} > s:upsert({1, 1}, {{'=', 3, 5}}) > | --- > | ... > --- Invalid upsert still appears during read. > +-- During read the incorrect upsert is ignored. > -- > s:select{} > | --- > - | - error: Tuple field count 3 does not match space field count 2 > + | - - [1, 1] > + | ... > + > +-- Try to set incorrect field_count in a transaction. > +-- > +box.begin() > + | --- > + | ... > +s:replace{2, 2} > + | --- > + | - [2, 2] > + | ... > +s:upsert({2, 2}, {{'=', 3, 2}}) > + | --- > + | ... > +s:select{} > + | --- > + | - - [1, 1] > + | - [2, 2] > + | ... > +box.commit() > + | --- > + | ... > +s:select{} > + | --- > + | - - [1, 1] > + | - [2, 2] > + | ... > + > +-- Read incorrect upsert from a run: it should be ignored. > +-- > +box.snapshot() > + | --- > + | - ok > + | ... > +s:select{} > + | --- > + | - - [1, 1] > + | - [2, 2] > + | ... > +s:upsert({2, 2}, {{'=', 3, 20}}) > + | --- > + | ... > +box.snapshot() > + | --- > + | - ok > + | ... > +s:select{} > + | --- > + | - - [1, 1] > + | - [2, 2] > + | ... > + > +-- Execute replace/delete after invalid upsert. > +-- > +box.snapshot() > + | --- > + | - ok > + | ... > +s:upsert({2, 2}, {{'=', 3, 30}}) > + | --- > + | ... > +s:replace{2, 3} > + | --- > + | - [2, 3] > + | ... > +s:select{} > + | --- > + | - - [1, 1] > + | - [2, 3] > + | ... > + > +s:upsert({1, 1}, {{'=', 3, 30}}) > + | --- > + | ... > +s:delete{1} > + | --- > + | ... > +s:select{} > + | --- > + | - - [2, 3] > + | ... > + > +-- Invalid upsert in a sequence of upserts is skipped meanwhile > +-- the rest are applied. > +-- > +box.snapshot() > + | --- > + | - ok > + | ... > +s:upsert({2, 2}, {{'+', 2, 5}}) > + | --- > + | ... > +s:upsert({2, 2}, {{'=', 3, 40}}) > + | --- > + | ... > +s:upsert({2, 2}, {{'+', 2, 5}}) > + | --- > + | ... > +s:select{} > + | --- > + | - - [2, 13] > + | ... > +box.snapshot() > + | --- > + | - ok > + | ... > +s:select{} > + | --- > + | - - [2, 13] > | ... > > s:drop() > diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua > index 952d2bcde..eb93393be 100644 > --- a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua > +++ b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua > @@ -4,8 +4,47 @@ s:replace{1, 1} > -- Error is logged, upsert is not applied. > -- > s:upsert({1, 1}, {{'=', 3, 5}}) > --- Invalid upsert still appears during read. > +-- During read the incorrect upsert is ignored. > -- > s:select{} > > +-- Try to set incorrect field_count in a transaction. > +-- > +box.begin() > +s:replace{2, 2} > +s:upsert({2, 2}, {{'=', 3, 2}}) > +s:select{} > +box.commit() > +s:select{} > + > +-- Read incorrect upsert from a run: it should be ignored. > +-- > +box.snapshot() > +s:select{} > +s:upsert({2, 2}, {{'=', 3, 20}}) > +box.snapshot() > +s:select{} > + > +-- Execute replace/delete after invalid upsert. > +-- > +box.snapshot() > +s:upsert({2, 2}, {{'=', 3, 30}}) > +s:replace{2, 3} > +s:select{} > + > +s:upsert({1, 1}, {{'=', 3, 30}}) > +s:delete{1} > +s:select{} > + > +-- Invalid upsert in a sequence of upserts is skipped meanwhile > +-- the rest are applied. > +-- > +box.snapshot() > +s:upsert({2, 2}, {{'+', 2, 5}}) > +s:upsert({2, 2}, {{'=', 3, 40}}) > +s:upsert({2, 2}, {{'+', 2, 5}}) > +s:select{} > +box.snapshot() > +s:select{} > + > s:drop() > \ No newline at end of file > -- > 2.17.1 > -- Konstantin Osipov, Moscow, Russia