[PATCH v2 7/7] vinyl: eliminate disk read on REPLACE/DELETE
Vladimir Davydov
vdavydov.dev at gmail.com
Wed Aug 22 20:08:33 MSK 2018
On Tue, Aug 21, 2018 at 07:13:50PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev at gmail.com> [18/08/21 15:19]:
> > /*
> > - * All indexes of a space must be consistent, i.e.
> > - * if a tuple is present in one index, it must be
> > - * present in all other indexes as well, so we can
> > - * get here only if there's a bug somewhere in vinyl.
> > - * Don't abort as core dump won't really help us in
> > - * this case. Just warn the user and proceed to the
> > - * next tuple.
> > + * Invalidate the cache entry so that we won't read
> > + * the overwritten tuple again from the cache.
> > */
> > - say_warn("%s: key %s missing in primary index",
> > - vy_lsm_name(lsm), vy_stmt_str(tuple));
> > + vy_cache_on_write(&lsm->cache, tuple, NULL);
>
> Please add a comment why you invalidate the cache.
Done.
> > + /*
> > + * Extract space id, LSN of the deferred DELETE statement,
> > + * and the deleted tuple from the system space row.
> > + */
> > + uint32_t space_id;
> > + if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0)
> > + diag_raise();
> > + int64_t lsn;
> > + if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0)
> > + diag_raise();
> > + const char *delete_data = tuple_field(stmt->new_tuple, 2);
> > + if (delete_data == NULL) {
> > + diag_set(ClientError, ER_NO_SUCH_FIELD, 2);
> > + diag_raise();
>
> Please use tuple iterator instead.
Done. Note, I have to add a couple of tuple_next helpers to do that -
see the branch.
> > + diag_raise();
> > + if (space->index_count <= 1)
> > + return;
>
> Please add a comment when this can be the case - e.g. the space
> was altered after we created a deferred delete. I can't imagine
> any other case.
Done.
> > + struct tuple *delete = vy_stmt_new_surrogate_delete_raw(pk->mem_format,
> > + delete_data, delete_data_end);
> > + if (delete == NULL)
> > + diag_raise();
> > + vy_stmt_set_lsn(delete, lsn);
>
> Please say a few words why you reset the statement lsn and how this works
> downstream (when processed by the write iterator).
Done.
> > + /* Insert the deferred DELETE into secondary indexes. */
> > + int rc = 0;
> > + size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
> > + const struct tuple *region_stmt = NULL;
> > + for (uint32_t i = 1; i < space->index_count; i++) {
> > + struct vy_lsm *lsm = vy_lsm(space->index[i]);
> > + if (vy_is_committed_one(env, lsm))
> > + continue;
> > + /*
> > + * As usual, rotate the active in-memory index if
> > + * schema was changed or dump was triggered. Do it
> > + * only if processing the first statement, because
> > + * dump may be triggered by one of the statements
> > + * of this transaction (see vy_quota_force_use()
> > + * below), in which case we must not do rotation
> > + * as we want all statements to land in the same
> > + * in-memory index. This is safe, as long as we
> > + * don't yield between statements.
> > + */
> > + struct vy_mem *mem = lsm->mem;
> > + if (is_first_statement &&
> > + (mem->space_cache_version != space_cache_version ||
> > + mem->generation != *lsm->env->p_generation)) {
> > + rc = vy_lsm_rotate_mem(lsm);
> > + if (rc != 0)
> > + break;
> > + mem = lsm->mem;
> > + }
> > + rc = vy_lsm_set(lsm, mem, delete, ®ion_stmt);
> > + if (rc != 0)
> > + break;
> > + vy_lsm_commit_stmt(lsm, mem, region_stmt);
>
> Can we share this code with vy_replace()?
I don't think so, vy_replace() inserts tuples into tx write set.
I assume, you meant vy_tx_write(). Well, that would be difficult,
because the code is scattered there between prepare and commit phases.
>
> > + break;
> > + }
> > + vy_mem_pin(mem);
> > + trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
> > + txn_on_commit(txn, on_commit);
>
> What about on_rollback? If you missed it, please add a test case
> :)
Fixed. Added a test too :)
> > +/**
> > + * Try to generate a deferred DELETE statement on tx commit.
> > + *
> > + * This function is supposed to be called for a primary index
> > + * statement which was executed without deletion of the overwritten
> > + * tuple from secondary indexes. It looks up the overwritten tuple
> > + * in memory and, if found, produces the deferred DELETEs and
> > + * inserts them into the transaction log.
> > + *
>
> Please mention it's not only an optimization, explain
> why we logically need it (we can get out of memory error when
> trying to insert during dump).
Done.
Branch updated. Incremental diff is below.
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1a45fac9..fb121402 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1292,8 +1292,10 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
*result = NULL;
}
/*
- * Invalidate the cache entry so that we won't read
- * the overwritten tuple again from the cache.
+ * We must purge stale tuples from the cache before
+ * storing the resulting interval in order to avoid
+ * chain intersections, which are not tolerated by
+ * the tuple cache implementation.
*/
vy_cache_on_write(&lsm->cache, tuple, NULL);
return 0;
@@ -4301,10 +4303,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
/* {{{ Deferred DELETE handling */
-/**
- * Callback invoked after a deferred DELETE statement has been
- * committed to _vinyl_deferred_delete system space.
- */
static void
vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
{
@@ -4316,6 +4314,16 @@ vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
*/
assert(mem->dump_lsn <= txn->signature);
mem->dump_lsn = txn->signature;
+ /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
+ vy_mem_unpin(mem);
+}
+
+static void
+vy_deferred_delete_on_rollback(struct trigger *trigger, void *event)
+{
+ (void)event;
+ struct vy_mem *mem = trigger->data;
+ /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
vy_mem_unpin(mem);
}
@@ -4358,17 +4366,17 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
* Extract space id, LSN of the deferred DELETE statement,
* and the deleted tuple from the system space row.
*/
+ struct tuple_iterator it;
+ tuple_rewind(&it, stmt->new_tuple);
uint32_t space_id;
- if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0)
+ if (tuple_next_u32(&it, &space_id) != 0)
diag_raise();
- int64_t lsn;
- if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0)
+ uint64_t lsn;
+ if (tuple_next_u64(&it, &lsn) != 0)
diag_raise();
- const char *delete_data = tuple_field(stmt->new_tuple, 2);
- if (delete_data == NULL) {
- diag_set(ClientError, ER_NO_SUCH_FIELD, 2);
+ const char *delete_data = tuple_next_with_type(&it, MP_ARRAY);
+ if (delete_data == NULL)
diag_raise();
- }
const char *delete_data_end = delete_data;
mp_next(&delete_data_end);
@@ -4376,6 +4384,11 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
struct space *space = space_cache_find(space_id);
if (space == NULL)
diag_raise();
+ /*
+ * All secondary indexes could have been dropped, in
+ * which case we don't need to generate deferred DELETE
+ * statements anymore.
+ */
if (space->index_count <= 1)
return;
/*
@@ -4392,14 +4405,18 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
delete_data, delete_data_end);
if (delete == NULL)
diag_raise();
- vy_stmt_set_lsn(delete, lsn);
/*
* A deferred DELETE may be generated after new statements
- * were committed for the deleted key while the read iterator
- * assumes that newer sources always store newer statements.
- * Mark deferred DELETEs with the VY_STMT_SKIP_READ flag so
- * as not to break the read iterator assumptions.
+ * were committed for the deleted key. So we must use the
+ * original LSN (not the one of the WAL row) when inserting
+ * a deferred DELETE into an index to make sure that it will
+ * purge the appropriate tuple on compaction. However, this
+ * also breaks the read iterator invariant that states that
+ * newer sources contain newer statements for the same key.
+ * So we mark deferred DELETEs with the VY_STMT_SKIP_READ
+ * flag, which makes the read iterator ignore them.
*/
+ vy_stmt_set_lsn(delete, lsn);
vy_stmt_set_flags(delete, VY_STMT_SKIP_READ);
/* Insert the deferred DELETE into secondary indexes. */
@@ -4451,9 +4468,19 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
rc = -1;
break;
}
+ struct trigger *on_rollback = region_alloc(&fiber()->gc,
+ sizeof(*on_commit));
+ if (on_rollback == NULL) {
+ diag_set(OutOfMemory, sizeof(*on_commit),
+ "region", "struct trigger");
+ rc = -1;
+ break;
+ }
vy_mem_pin(mem);
trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
+ trigger_create(on_rollback, vy_deferred_delete_on_rollback, mem, NULL);
txn_on_commit(txn, on_commit);
+ txn_on_rollback(txn, on_rollback);
}
size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
assert(mem_used_after >= mem_used_before);
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 27cd9bb7..590b4483 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -479,6 +479,12 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem,
* in memory and, if found, produces the deferred DELETEs and
* inserts them into the transaction log.
*
+ * Generating DELETEs before committing a transaction rather than
+ * postponing it to dump isn't just an optimization. The point is
+ * that we can't generate deferred DELETEs during dump, because
+ * if we run out of memory, we won't be able to schedule another
+ * dump to free some.
+ *
* Affects @tx->log, @v->stmt.
*
* Returns 0 on success, -1 on memory allocation error.
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 28271fc9..cdffa198 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1844,3 +1844,81 @@ s.index.sk:stat().memory.rows
s:drop()
---
...
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+---
+...
+errinj = box.error.injection
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+---
+...
+s:replace{1, 10}
+---
+- [1, 10]
+...
+box.snapshot()
+---
+- ok
+...
+s:replace{1, 20}
+---
+- [1, 20]
+...
+box.snapshot()
+---
+- ok
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+---
+- ok
+...
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+errors = box.stat.ERROR.total
+---
+...
+s.index.pk:compact()
+---
+...
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 0
+---
+- 0
+...
+errinj.set("ERRINJ_WAL_IO", false)
+---
+- ok
+...
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 1
+---
+- 1
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+---
+- ok
+...
+box.snapshot() -- ok
+---
+- ok
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 000067d3..c2332a69 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -736,3 +736,32 @@ s.index.sk:select()
s.index.sk:stat().memory.rows
s:drop()
+
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+errinj = box.error.injection
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {run_count_per_level = 10})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+s:replace{1, 10}
+box.snapshot()
+s:replace{1, 20}
+box.snapshot()
+
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+errinj.set("ERRINJ_WAL_IO", true)
+errors = box.stat.ERROR.total
+s.index.pk:compact()
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 0
+errinj.set("ERRINJ_WAL_IO", false)
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 1
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+
+box.snapshot() -- ok
+s:drop()
More information about the Tarantool-patches
mailing list