[Tarantool-patches] [PATCH 2/2] vinyl: skip invalid upserts during squash
Nikita Pettik
korablev at tarantool.org
Tue Apr 14 00:55:45 MSK 2020
Instead of aborting merge sequence of upserts let's log appeared
errors and skip upserts which can't be applied. It makes sense
taking into consideration previous commit: now upsert statements which
can't be applied may appear in mems/runs (previously squash operation
might fail only due to OOM). As a result, if we didn't ignore invalid
upserts, dump or compaction processes would not be able to finish (owing
to inability to squash upserts).
Closes #1622
---
src/box/vy_history.c | 20 +++-
src/box/vy_lsm.c | 13 +-
src/box/vy_tx.c | 29 +++--
src/box/vy_write_iterator.c | 34 ++++--
.../vinyl/gh-1622-skip-invalid-upserts.result | 113 +++++++++++++++++-
.../gh-1622-skip-invalid-upserts.test.lua | 41 ++++++-
6 files changed, 220 insertions(+), 30 deletions(-)
diff --git a/src/box/vy_history.c b/src/box/vy_history.c
index 0f3b71195..55eb3c669 100644
--- a/src/box/vy_history.c
+++ b/src/box/vy_history.c
@@ -102,12 +102,20 @@ vy_history_apply(struct vy_history *history, struct key_def *cmp_def,
while (node != NULL) {
struct tuple *stmt = vy_apply_upsert(node->stmt, curr_stmt,
cmp_def, format, true);
- ++*upserts_applied;
- if (curr_stmt != NULL)
- tuple_unref(curr_stmt);
- if (stmt == NULL)
- return -1;
- curr_stmt = stmt;
+ if (stmt == NULL) {
+ /*
+ * In case statement hasn't been applied,
+ * simply skip it ignoring errors (otherwise
+ * error will appear during tuple read).
+ */
+ assert(diag_last_error(diag_get()) != NULL);
+ diag_clear(diag_get());
+ } else {
+ ++*upserts_applied;
+ if (curr_stmt != NULL)
+ tuple_unref(curr_stmt);
+ curr_stmt = stmt;
+ }
node = rlist_prev_entry_safe(node, &history->stmts, link);
}
*ret = curr_stmt;
diff --git a/src/box/vy_lsm.c b/src/box/vy_lsm.c
index 04c9926a8..3d630fc01 100644
--- a/src/box/vy_lsm.c
+++ b/src/box/vy_lsm.c
@@ -984,13 +984,20 @@ vy_lsm_commit_upsert(struct vy_lsm *lsm, struct vy_mem *mem,
struct tuple *upserted =
vy_apply_upsert(stmt, older, lsm->cmp_def,
lsm->mem_format, false);
- lsm->stat.upsert.applied++;
-
if (upserted == NULL) {
- /* OOM */
+ /*
+ * OOM or upserted tuple does not fulfill
+ * space format. Hence, upsert can't be
+ * transformed into replace. Log error
+ * and exit.
+ */
+ struct error *e = diag_last_error(diag_get());
+ assert(e != NULL);
+ error_log(e);
diag_clear(diag_get());
return;
}
+ lsm->stat.upsert.applied++;
int64_t upserted_lsn = vy_stmt_lsn(upserted);
if (upserted_lsn != lsn) {
/**
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 5029bd8a1..060a7f6a9 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -515,11 +515,15 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem,
region_stmt);
tuple_unref(applied);
return rc;
+ } else {
+ /*
+ * Ignore a memory error, because it is
+ * not critical to apply the optimization.
+ * Clear diag: otherwise error is set but
+ * function may return success return code.
+ */
+ diag_clear(diag_get());
}
- /*
- * Ignore a memory error, because it is
- * not critical to apply the optimization.
- */
}
} else {
/* Invalidate cache element. */
@@ -1047,12 +1051,17 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt)
applied = vy_apply_upsert(stmt, old->stmt, lsm->cmp_def,
lsm->mem_format, true);
- lsm->stat.upsert.applied++;
- if (applied == NULL)
- return -1;
- stmt = applied;
- assert(vy_stmt_type(stmt) != 0);
- lsm->stat.upsert.squashed++;
+ if (applied == NULL) {
+ struct error *e = diag_last_error(diag_get());
+ assert(e != NULL);
+ error_log(e);
+ diag_clear(diag_get());
+ } else {
+ lsm->stat.upsert.applied++;
+ stmt = applied;
+ assert(vy_stmt_type(stmt) != 0);
+ lsm->stat.upsert.squashed++;
+ }
}
/* Allocate a MVCC container. */
diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c
index 7a6a20627..4b5b3e673 100644
--- a/src/box/vy_write_iterator.c
+++ b/src/box/vy_write_iterator.c
@@ -850,10 +850,22 @@ vy_read_view_merge(struct vy_write_iterator *stream, struct tuple *hint,
vy_stmt_type(hint) != IPROTO_UPSERT);
struct tuple *applied = vy_apply_upsert(h->tuple, hint,
stream->cmp_def, stream->format, false);
- if (applied == NULL)
- return -1;
- vy_stmt_unref_if_possible(h->tuple);
- h->tuple = applied;
+ if (applied == NULL) {
+ /*
+ * Current upsert can't be applied.
+ * Let's skip it and log error.
+ */
+ struct error *e = diag_last_error(diag_get());
+ assert(e != NULL);
+ say_info("Upsert %s is not applied due to the error: %s",
+ vy_stmt_str(h->tuple), e->errmsg);
+ diag_clear(diag_get());
+ vy_stmt_unref_if_possible(h->tuple);
+ h->tuple = hint;
+ } else {
+ vy_stmt_unref_if_possible(h->tuple);
+ h->tuple = applied;
+ }
}
/* Squash the rest of UPSERTs. */
struct vy_write_history *result = h;
@@ -864,10 +876,16 @@ vy_read_view_merge(struct vy_write_iterator *stream, struct tuple *hint,
assert(result->tuple != NULL);
struct tuple *applied = vy_apply_upsert(h->tuple, result->tuple,
stream->cmp_def, stream->format, false);
- if (applied == NULL)
- return -1;
- vy_stmt_unref_if_possible(result->tuple);
- result->tuple = applied;
+ if (applied == NULL) {
+ struct error *e = diag_last_error(diag_get());
+ assert(e != NULL);
+ say_info("Upsert %s is not applied due to the error: %s",
+ vy_stmt_str(h->tuple), e->errmsg);
+ diag_clear(diag_get());
+ } else {
+ vy_stmt_unref_if_possible(result->tuple);
+ result->tuple = applied;
+ }
vy_stmt_unref_if_possible(h->tuple);
/*
* Don't bother freeing 'h' since it's
diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.result b/test/vinyl/gh-1622-skip-invalid-upserts.result
index 437ff3c51..b59ed53dc 100644
--- a/test/vinyl/gh-1622-skip-invalid-upserts.result
+++ b/test/vinyl/gh-1622-skip-invalid-upserts.result
@@ -14,11 +14,120 @@ s:replace{1, 1}
s:upsert({1, 1}, {{'=', 3, 5}})
| ---
| ...
--- Invalid upsert still appears during read.
+-- During read the incorrect upsert is ignored.
--
s:select{}
| ---
- | - error: Tuple field count 3 does not match space field count 2
+ | - - [1, 1]
+ | ...
+
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+ | ---
+ | ...
+s:replace{2, 2}
+ | ---
+ | - [2, 2]
+ | ...
+s:upsert({2, 2}, {{'=', 3, 2}})
+ | ---
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | - [2, 2]
+ | ...
+box.commit()
+ | ---
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | - [2, 2]
+ | ...
+
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | - [2, 2]
+ | ...
+s:upsert({2, 2}, {{'=', 3, 20}})
+ | ---
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | - [2, 2]
+ | ...
+
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:upsert({2, 2}, {{'=', 3, 30}})
+ | ---
+ | ...
+s:replace{2, 3}
+ | ---
+ | - [2, 3]
+ | ...
+s:select{}
+ | ---
+ | - - [1, 1]
+ | - [2, 3]
+ | ...
+
+s:upsert({1, 1}, {{'=', 3, 30}})
+ | ---
+ | ...
+s:delete{1}
+ | ---
+ | ...
+s:select{}
+ | ---
+ | - - [2, 3]
+ | ...
+
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:upsert({2, 2}, {{'+', 2, 5}})
+ | ---
+ | ...
+s:upsert({2, 2}, {{'=', 3, 40}})
+ | ---
+ | ...
+s:upsert({2, 2}, {{'+', 2, 5}})
+ | ---
+ | ...
+s:select{}
+ | ---
+ | - - [2, 13]
+ | ...
+box.snapshot()
+ | ---
+ | - ok
+ | ...
+s:select{}
+ | ---
+ | - - [2, 13]
| ...
s:drop()
diff --git a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua
index 952d2bcde..eb93393be 100644
--- a/test/vinyl/gh-1622-skip-invalid-upserts.test.lua
+++ b/test/vinyl/gh-1622-skip-invalid-upserts.test.lua
@@ -4,8 +4,47 @@ s:replace{1, 1}
-- Error is logged, upsert is not applied.
--
s:upsert({1, 1}, {{'=', 3, 5}})
--- Invalid upsert still appears during read.
+-- During read the incorrect upsert is ignored.
--
s:select{}
+-- Try to set incorrect field_count in a transaction.
+--
+box.begin()
+s:replace{2, 2}
+s:upsert({2, 2}, {{'=', 3, 2}})
+s:select{}
+box.commit()
+s:select{}
+
+-- Read incorrect upsert from a run: it should be ignored.
+--
+box.snapshot()
+s:select{}
+s:upsert({2, 2}, {{'=', 3, 20}})
+box.snapshot()
+s:select{}
+
+-- Execute replace/delete after invalid upsert.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'=', 3, 30}})
+s:replace{2, 3}
+s:select{}
+
+s:upsert({1, 1}, {{'=', 3, 30}})
+s:delete{1}
+s:select{}
+
+-- Invalid upsert in a sequence of upserts is skipped meanwhile
+-- the rest are applied.
+--
+box.snapshot()
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:upsert({2, 2}, {{'=', 3, 40}})
+s:upsert({2, 2}, {{'+', 2, 5}})
+s:select{}
+box.snapshot()
+s:select{}
+
s:drop()
\ No newline at end of file
--
2.17.1
More information about the Tarantool-patches
mailing list