From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Date: Wed, 22 Aug 2018 20:08:33 +0300 From: Vladimir Davydov Subject: Re: [PATCH v2 7/7] vinyl: eliminate disk read on REPLACE/DELETE Message-ID: <20180822170833.museekqcvyev7ka5@esperanza> References: <0ab3db91f86e4321fd835bc474d300b91b970517.1534847663.git.vdavydov.dev@gmail.com> <20180821161350.GG28159@chai> MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Disposition: inline In-Reply-To: <20180821161350.GG28159@chai> To: Konstantin Osipov Cc: tarantool-patches@freelists.org List-ID: On Tue, Aug 21, 2018 at 07:13:50PM +0300, Konstantin Osipov wrote: > * Vladimir Davydov [18/08/21 15:19]: > > /* > > - * All indexes of a space must be consistent, i.e. > > - * if a tuple is present in one index, it must be > > - * present in all other indexes as well, so we can > > - * get here only if there's a bug somewhere in vinyl. > > - * Don't abort as core dump won't really help us in > > - * this case. Just warn the user and proceed to the > > - * next tuple. > > + * Invalidate the cache entry so that we won't read > > + * the overwritten tuple again from the cache. > > */ > > - say_warn("%s: key %s missing in primary index", > > - vy_lsm_name(lsm), vy_stmt_str(tuple)); > > + vy_cache_on_write(&lsm->cache, tuple, NULL); > > Please add a comment why you invalidate the cache. Done. > > + /* > > + * Extract space id, LSN of the deferred DELETE statement, > > + * and the deleted tuple from the system space row. > > + */ > > + uint32_t space_id; > > + if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0) > > + diag_raise(); > > + int64_t lsn; > > + if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0) > > + diag_raise(); > > + const char *delete_data = tuple_field(stmt->new_tuple, 2); > > + if (delete_data == NULL) { > > + diag_set(ClientError, ER_NO_SUCH_FIELD, 2); > > + diag_raise(); > > Please use tuple iterator instead. Done. Note, I have to add a couple of tuple_next helpers to do that - see the branch. > > + diag_raise(); > > + if (space->index_count <= 1) > > + return; > > Please add a comment when this can be the case - e.g. the space > was altered after we created a deferred delete. I can't imagine > any other case. Done. > > + struct tuple *delete = vy_stmt_new_surrogate_delete_raw(pk->mem_format, > > + delete_data, delete_data_end); > > + if (delete == NULL) > > + diag_raise(); > > + vy_stmt_set_lsn(delete, lsn); > > Please say a few words why you reset the statement lsn and how this works > downstream (when processed by the write iterator). Done. > > + /* Insert the deferred DELETE into secondary indexes. */ > > + int rc = 0; > > + size_t mem_used_before = lsregion_used(&env->mem_env.allocator); > > + const struct tuple *region_stmt = NULL; > > + for (uint32_t i = 1; i < space->index_count; i++) { > > + struct vy_lsm *lsm = vy_lsm(space->index[i]); > > + if (vy_is_committed_one(env, lsm)) > > + continue; > > + /* > > + * As usual, rotate the active in-memory index if > > + * schema was changed or dump was triggered. Do it > > + * only if processing the first statement, because > > + * dump may be triggered by one of the statements > > + * of this transaction (see vy_quota_force_use() > > + * below), in which case we must not do rotation > > + * as we want all statements to land in the same > > + * in-memory index. This is safe, as long as we > > + * don't yield between statements. > > + */ > > + struct vy_mem *mem = lsm->mem; > > + if (is_first_statement && > > + (mem->space_cache_version != space_cache_version || > > + mem->generation != *lsm->env->p_generation)) { > > + rc = vy_lsm_rotate_mem(lsm); > > + if (rc != 0) > > + break; > > + mem = lsm->mem; > > + } > > + rc = vy_lsm_set(lsm, mem, delete, ®ion_stmt); > > + if (rc != 0) > > + break; > > + vy_lsm_commit_stmt(lsm, mem, region_stmt); > > Can we share this code with vy_replace()? I don't think so, vy_replace() inserts tuples into tx write set. I assume, you meant vy_tx_write(). Well, that would be difficult, because the code is scattered there between prepare and commit phases. > > > + break; > > + } > > + vy_mem_pin(mem); > > + trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL); > > + txn_on_commit(txn, on_commit); > > What about on_rollback? If you missed it, please add a test case > :) Fixed. Added a test too :) > > +/** > > + * Try to generate a deferred DELETE statement on tx commit. > > + * > > + * This function is supposed to be called for a primary index > > + * statement which was executed without deletion of the overwritten > > + * tuple from secondary indexes. It looks up the overwritten tuple > > + * in memory and, if found, produces the deferred DELETEs and > > + * inserts them into the transaction log. > > + * > > Please mention it's not only an optimization, explain > why we logically need it (we can get out of memory error when > trying to insert during dump). Done. Branch updated. Incremental diff is below. diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 1a45fac9..fb121402 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1292,8 +1292,10 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, *result = NULL; } /* - * Invalidate the cache entry so that we won't read - * the overwritten tuple again from the cache. + * We must purge stale tuples from the cache before + * storing the resulting interval in order to avoid + * chain intersections, which are not tolerated by + * the tuple cache implementation. */ vy_cache_on_write(&lsm->cache, tuple, NULL); return 0; @@ -4301,10 +4303,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, /* {{{ Deferred DELETE handling */ -/** - * Callback invoked after a deferred DELETE statement has been - * committed to _vinyl_deferred_delete system space. - */ static void vy_deferred_delete_on_commit(struct trigger *trigger, void *event) { @@ -4316,6 +4314,16 @@ vy_deferred_delete_on_commit(struct trigger *trigger, void *event) */ assert(mem->dump_lsn <= txn->signature); mem->dump_lsn = txn->signature; + /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */ + vy_mem_unpin(mem); +} + +static void +vy_deferred_delete_on_rollback(struct trigger *trigger, void *event) +{ + (void)event; + struct vy_mem *mem = trigger->data; + /* Unpin the mem pinned in vy_deferred_delete_on_replace(). */ vy_mem_unpin(mem); } @@ -4358,17 +4366,17 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event) * Extract space id, LSN of the deferred DELETE statement, * and the deleted tuple from the system space row. */ + struct tuple_iterator it; + tuple_rewind(&it, stmt->new_tuple); uint32_t space_id; - if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0) + if (tuple_next_u32(&it, &space_id) != 0) diag_raise(); - int64_t lsn; - if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0) + uint64_t lsn; + if (tuple_next_u64(&it, &lsn) != 0) diag_raise(); - const char *delete_data = tuple_field(stmt->new_tuple, 2); - if (delete_data == NULL) { - diag_set(ClientError, ER_NO_SUCH_FIELD, 2); + const char *delete_data = tuple_next_with_type(&it, MP_ARRAY); + if (delete_data == NULL) diag_raise(); - } const char *delete_data_end = delete_data; mp_next(&delete_data_end); @@ -4376,6 +4384,11 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event) struct space *space = space_cache_find(space_id); if (space == NULL) diag_raise(); + /* + * All secondary indexes could have been dropped, in + * which case we don't need to generate deferred DELETE + * statements anymore. + */ if (space->index_count <= 1) return; /* @@ -4392,14 +4405,18 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event) delete_data, delete_data_end); if (delete == NULL) diag_raise(); - vy_stmt_set_lsn(delete, lsn); /* * A deferred DELETE may be generated after new statements - * were committed for the deleted key while the read iterator - * assumes that newer sources always store newer statements. - * Mark deferred DELETEs with the VY_STMT_SKIP_READ flag so - * as not to break the read iterator assumptions. + * were committed for the deleted key. So we must use the + * original LSN (not the one of the WAL row) when inserting + * a deferred DELETE into an index to make sure that it will + * purge the appropriate tuple on compaction. However, this + * also breaks the read iterator invariant that states that + * newer sources contain newer statements for the same key. + * So we mark deferred DELETEs with the VY_STMT_SKIP_READ + * flag, which makes the read iterator ignore them. */ + vy_stmt_set_lsn(delete, lsn); vy_stmt_set_flags(delete, VY_STMT_SKIP_READ); /* Insert the deferred DELETE into secondary indexes. */ @@ -4451,9 +4468,19 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event) rc = -1; break; } + struct trigger *on_rollback = region_alloc(&fiber()->gc, + sizeof(*on_commit)); + if (on_rollback == NULL) { + diag_set(OutOfMemory, sizeof(*on_commit), + "region", "struct trigger"); + rc = -1; + break; + } vy_mem_pin(mem); trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL); + trigger_create(on_rollback, vy_deferred_delete_on_rollback, mem, NULL); txn_on_commit(txn, on_commit); + txn_on_rollback(txn, on_rollback); } size_t mem_used_after = lsregion_used(&env->mem_env.allocator); assert(mem_used_after >= mem_used_before); diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c index 27cd9bb7..590b4483 100644 --- a/src/box/vy_tx.c +++ b/src/box/vy_tx.c @@ -479,6 +479,12 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem, * in memory and, if found, produces the deferred DELETEs and * inserts them into the transaction log. * + * Generating DELETEs before committing a transaction rather than + * postponing it to dump isn't just an optimization. The point is + * that we can't generate deferred DELETEs during dump, because + * if we run out of memory, we won't be able to schedule another + * dump to free some. + * * Affects @tx->log, @v->stmt. * * Returns 0 on success, -1 on memory allocation error. diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result index 28271fc9..cdffa198 100644 --- a/test/vinyl/errinj.result +++ b/test/vinyl/errinj.result @@ -1844,3 +1844,81 @@ s.index.sk:stat().memory.rows s:drop() --- ... +-- +-- Check that tarantool doesn't hang or crash if error +-- occurs while writing a deferred DELETE to WAL. +-- +fiber = require('fiber') +--- +... +errinj = box.error.injection +--- +... +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +_ = s:create_index('pk', {run_count_per_level = 10}) +--- +... +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) +--- +... +s:replace{1, 10} +--- +- [1, 10] +... +box.snapshot() +--- +- ok +... +s:replace{1, 20} +--- +- [1, 20] +... +box.snapshot() +--- +- ok +... +errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001) +--- +- ok +... +errinj.set("ERRINJ_WAL_IO", true) +--- +- ok +... +errors = box.stat.ERROR.total +--- +... +s.index.pk:compact() +--- +... +while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end +--- +... +s.index.pk:stat().disk.compact.count -- 0 +--- +- 0 +... +errinj.set("ERRINJ_WAL_IO", false) +--- +- ok +... +while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +s.index.pk:stat().disk.compact.count -- 1 +--- +- 1 +... +errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0) +--- +- ok +... +box.snapshot() -- ok +--- +- ok +... +s:drop() +--- +... diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua index 000067d3..c2332a69 100644 --- a/test/vinyl/errinj.test.lua +++ b/test/vinyl/errinj.test.lua @@ -736,3 +736,32 @@ s.index.sk:select() s.index.sk:stat().memory.rows s:drop() + +-- +-- Check that tarantool doesn't hang or crash if error +-- occurs while writing a deferred DELETE to WAL. +-- +fiber = require('fiber') +errinj = box.error.injection + +s = box.schema.space.create('test', {engine = 'vinyl'}) +_ = s:create_index('pk', {run_count_per_level = 10}) +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) +s:replace{1, 10} +box.snapshot() +s:replace{1, 20} +box.snapshot() + +errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001) +errinj.set("ERRINJ_WAL_IO", true) +errors = box.stat.ERROR.total +s.index.pk:compact() +while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end +s.index.pk:stat().disk.compact.count -- 0 +errinj.set("ERRINJ_WAL_IO", false) +while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +s.index.pk:stat().disk.compact.count -- 1 +errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0) + +box.snapshot() -- ok +s:drop()