[PATCH v2 7/7] vinyl: eliminate disk read on REPLACE/DELETE

Vladimir Davydov vdavydov.dev at gmail.com
Wed Aug 22 20:08:33 MSK 2018


On Tue, Aug 21, 2018 at 07:13:50PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev at gmail.com> [18/08/21 15:19]:
> >  		/*
> > -		 * All indexes of a space must be consistent, i.e.
> > -		 * if a tuple is present in one index, it must be
> > -		 * present in all other indexes as well, so we can
> > -		 * get here only if there's a bug somewhere in vinyl.
> > -		 * Don't abort as core dump won't really help us in
> > -		 * this case. Just warn the user and proceed to the
> > -		 * next tuple.
> > +		 * Invalidate the cache entry so that we won't read
> > +		 * the overwritten tuple again from the cache.
> >  		 */
> > -		say_warn("%s: key %s missing in primary index",
> > -			 vy_lsm_name(lsm), vy_stmt_str(tuple));
> > +		vy_cache_on_write(&lsm->cache, tuple, NULL);
> 
> Please add a comment why you invalidate the cache.

Done.

> > +	/*
> > +	 * Extract space id, LSN of the deferred DELETE statement,
> > +	 * and the deleted tuple from the system space row.
> > +	 */
> > +	uint32_t space_id;
> > +	if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0)
> > +		diag_raise();
> > +	int64_t lsn;
> > +	if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0)
> > +		diag_raise();
> > +	const char *delete_data = tuple_field(stmt->new_tuple, 2);
> > +	if (delete_data == NULL) {
> > +		diag_set(ClientError, ER_NO_SUCH_FIELD, 2);
> > +		diag_raise();
> 
> Please use tuple iterator instead.

Done. Note, I have to add a couple of tuple_next helpers to do that -
see the branch.

> > +		diag_raise();
> > +	if (space->index_count <= 1)
> > +		return;
> 
> Please add a comment when this can be the case - e.g. the space
> was altered after we created a deferred delete. I can't imagine
> any other case.

Done.

> > +	struct tuple *delete = vy_stmt_new_surrogate_delete_raw(pk->mem_format,
> > +						delete_data, delete_data_end);
> > +	if (delete == NULL)
> > +		diag_raise();
> > +	vy_stmt_set_lsn(delete, lsn);
> 
> Please say a few words why you reset the statement lsn and how this works
> downstream (when processed by the write iterator).

Done.

> > +	/* Insert the deferred DELETE into secondary indexes. */
> > +	int rc = 0;
> > +	size_t mem_used_before = lsregion_used(&env->mem_env.allocator);
> > +	const struct tuple *region_stmt = NULL;
> > +	for (uint32_t i = 1; i < space->index_count; i++) {
> > +		struct vy_lsm *lsm = vy_lsm(space->index[i]);
> > +		if (vy_is_committed_one(env, lsm))
> > +			continue;
> > +		/*
> > +		 * As usual, rotate the active in-memory index if
> > +		 * schema was changed or dump was triggered. Do it
> > +		 * only if processing the first statement, because
> > +		 * dump may be triggered by one of the statements
> > +		 * of this transaction (see vy_quota_force_use()
> > +		 * below), in which case we must not do rotation
> > +		 * as we want all statements to land in the same
> > +		 * in-memory index. This is safe, as long as we
> > +		 * don't yield between statements.
> > +		 */
> > +		struct vy_mem *mem = lsm->mem;
> > +		if (is_first_statement &&
> > +		    (mem->space_cache_version != space_cache_version ||
> > +		     mem->generation != *lsm->env->p_generation)) {
> > +			rc = vy_lsm_rotate_mem(lsm);
> > +			if (rc != 0)
> > +				break;
> > +			mem = lsm->mem;
> > +		}
> > +		rc = vy_lsm_set(lsm, mem, delete, &region_stmt);
> > +		if (rc != 0)
> > +			break;
> > +		vy_lsm_commit_stmt(lsm, mem, region_stmt);
> 
> Can we share this code with vy_replace()?

I don't think so, vy_replace() inserts tuples into tx write set.
I assume, you meant vy_tx_write(). Well, that would be difficult,
because the code is scattered there between prepare and commit phases.

> 
> > +			break;
> > +		}
> > +		vy_mem_pin(mem);
> > +		trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
> > +		txn_on_commit(txn, on_commit);
> 
> What about on_rollback? If you missed it, please add a test case
> :)

Fixed. Added a test too :)

> > +/**
> > + * Try to generate a deferred DELETE statement on tx commit.
> > + *
> > + * This function is supposed to be called for a primary index
> > + * statement which was executed without deletion of the overwritten
> > + * tuple from secondary indexes. It looks up the overwritten tuple
> > + * in memory and, if found, produces the deferred DELETEs and
> > + * inserts them into the transaction log.
> > + *
> 
> Please mention it's not only an optimization, explain 
> why we logically need it (we can get out of memory error when
> trying to insert during dump).

Done.

Branch updated. Incremental diff is below.

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 1a45fac9..fb121402 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1292,8 +1292,10 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx,
 			*result = NULL;
 		}
 		/*
-		 * Invalidate the cache entry so that we won't read
-		 * the overwritten tuple again from the cache.
+		 * We must purge stale tuples from the cache before
+		 * storing the resulting interval in order to avoid
+		 * chain intersections, which are not tolerated by
+		 * the tuple cache implementation.
 		 */
 		vy_cache_on_write(&lsm->cache, tuple, NULL);
 		return 0;
@@ -4301,10 +4303,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 
 /* {{{ Deferred DELETE handling */
 
-/**
- * Callback invoked after a deferred DELETE statement has been
- * committed to _vinyl_deferred_delete system space.
- */
 static void
 vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
 {
@@ -4316,6 +4314,16 @@ vy_deferred_delete_on_commit(struct trigger *trigger, void *event)
 	 */
 	assert(mem->dump_lsn <= txn->signature);
 	mem->dump_lsn = txn->signature;
+	/* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
+	vy_mem_unpin(mem);
+}
+
+static void
+vy_deferred_delete_on_rollback(struct trigger *trigger, void *event)
+{
+	(void)event;
+	struct vy_mem *mem = trigger->data;
+	/* Unpin the mem pinned in vy_deferred_delete_on_replace(). */
 	vy_mem_unpin(mem);
 }
 
@@ -4358,17 +4366,17 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	 * Extract space id, LSN of the deferred DELETE statement,
 	 * and the deleted tuple from the system space row.
 	 */
+	struct tuple_iterator it;
+	tuple_rewind(&it, stmt->new_tuple);
 	uint32_t space_id;
-	if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0)
+	if (tuple_next_u32(&it, &space_id) != 0)
 		diag_raise();
-	int64_t lsn;
-	if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0)
+	uint64_t lsn;
+	if (tuple_next_u64(&it, &lsn) != 0)
 		diag_raise();
-	const char *delete_data = tuple_field(stmt->new_tuple, 2);
-	if (delete_data == NULL) {
-		diag_set(ClientError, ER_NO_SUCH_FIELD, 2);
+	const char *delete_data = tuple_next_with_type(&it, MP_ARRAY);
+	if (delete_data == NULL)
 		diag_raise();
-	}
 	const char *delete_data_end = delete_data;
 	mp_next(&delete_data_end);
 
@@ -4376,6 +4384,11 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 	struct space *space = space_cache_find(space_id);
 	if (space == NULL)
 		diag_raise();
+	/*
+	 * All secondary indexes could have been dropped, in
+	 * which case we don't need to generate deferred DELETE
+	 * statements anymore.
+	 */
 	if (space->index_count <= 1)
 		return;
 	/*
@@ -4392,14 +4405,18 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 						delete_data, delete_data_end);
 	if (delete == NULL)
 		diag_raise();
-	vy_stmt_set_lsn(delete, lsn);
 	/*
 	 * A deferred DELETE may be generated after new statements
-	 * were committed for the deleted key while the read iterator
-	 * assumes that newer sources always store newer statements.
-	 * Mark deferred DELETEs with the VY_STMT_SKIP_READ flag so
-	 * as not to break the read iterator assumptions.
+	 * were committed for the deleted key. So we must use the
+	 * original LSN (not the one of the WAL row) when inserting
+	 * a deferred DELETE into an index to make sure that it will
+	 * purge the appropriate tuple on compaction. However, this
+	 * also breaks the read iterator invariant that states that
+	 * newer sources contain newer statements for the same key.
+	 * So we mark deferred DELETEs with the VY_STMT_SKIP_READ
+	 * flag, which makes the read iterator ignore them.
 	 */
+	vy_stmt_set_lsn(delete, lsn);
 	vy_stmt_set_flags(delete, VY_STMT_SKIP_READ);
 
 	/* Insert the deferred DELETE into secondary indexes. */
@@ -4451,9 +4468,19 @@ vy_deferred_delete_on_replace(struct trigger *trigger, void *event)
 			rc = -1;
 			break;
 		}
+		struct trigger *on_rollback = region_alloc(&fiber()->gc,
+							   sizeof(*on_commit));
+		if (on_rollback == NULL) {
+			diag_set(OutOfMemory, sizeof(*on_commit),
+				 "region", "struct trigger");
+			rc = -1;
+			break;
+		}
 		vy_mem_pin(mem);
 		trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL);
+		trigger_create(on_rollback, vy_deferred_delete_on_rollback, mem, NULL);
 		txn_on_commit(txn, on_commit);
+		txn_on_rollback(txn, on_rollback);
 	}
 	size_t mem_used_after = lsregion_used(&env->mem_env.allocator);
 	assert(mem_used_after >= mem_used_before);
diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c
index 27cd9bb7..590b4483 100644
--- a/src/box/vy_tx.c
+++ b/src/box/vy_tx.c
@@ -479,6 +479,12 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem,
  * in memory and, if found, produces the deferred DELETEs and
  * inserts them into the transaction log.
  *
+ * Generating DELETEs before committing a transaction rather than
+ * postponing it to dump isn't just an optimization. The point is
+ * that we can't generate deferred DELETEs during dump, because
+ * if we run out of memory, we won't be able to schedule another
+ * dump to free some.
+ *
  * Affects @tx->log, @v->stmt.
  *
  * Returns 0 on success, -1 on memory allocation error.
diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result
index 28271fc9..cdffa198 100644
--- a/test/vinyl/errinj.result
+++ b/test/vinyl/errinj.result
@@ -1844,3 +1844,81 @@ s.index.sk:stat().memory.rows
 s:drop()
 ---
 ...
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+---
+...
+errinj = box.error.injection
+---
+...
+s = box.schema.space.create('test', {engine = 'vinyl'})
+---
+...
+_ = s:create_index('pk', {run_count_per_level = 10})
+---
+...
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+---
+...
+s:replace{1, 10}
+---
+- [1, 10]
+...
+box.snapshot()
+---
+- ok
+...
+s:replace{1, 20}
+---
+- [1, 20]
+...
+box.snapshot()
+---
+- ok
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+---
+- ok
+...
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+errors = box.stat.ERROR.total
+---
+...
+s.index.pk:compact()
+---
+...
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 0
+---
+- 0
+...
+errinj.set("ERRINJ_WAL_IO", false)
+---
+- ok
+...
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+---
+...
+s.index.pk:stat().disk.compact.count -- 1
+---
+- 1
+...
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+---
+- ok
+...
+box.snapshot() -- ok
+---
+- ok
+...
+s:drop()
+---
+...
diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua
index 000067d3..c2332a69 100644
--- a/test/vinyl/errinj.test.lua
+++ b/test/vinyl/errinj.test.lua
@@ -736,3 +736,32 @@ s.index.sk:select()
 s.index.sk:stat().memory.rows
 
 s:drop()
+
+--
+-- Check that tarantool doesn't hang or crash if error
+-- occurs while writing a deferred DELETE to WAL.
+--
+fiber = require('fiber')
+errinj = box.error.injection
+
+s = box.schema.space.create('test', {engine = 'vinyl'})
+_ = s:create_index('pk', {run_count_per_level = 10})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
+s:replace{1, 10}
+box.snapshot()
+s:replace{1, 20}
+box.snapshot()
+
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0.001)
+errinj.set("ERRINJ_WAL_IO", true)
+errors = box.stat.ERROR.total
+s.index.pk:compact()
+while box.stat.ERROR.total - errors == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 0
+errinj.set("ERRINJ_WAL_IO", false)
+while s.index.pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end
+s.index.pk:stat().disk.compact.count -- 1
+errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0)
+
+box.snapshot() -- ok
+s:drop()



More information about the Tarantool-patches mailing list