From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2 7/7] vinyl: eliminate disk read on REPLACE/DELETE Date: Tue, 21 Aug 2018 14:15:40 +0300 Message-Id: <0ab3db91f86e4321fd835bc474d300b91b970517.1534847663.git.vdavydov.dev@gmail.com> In-Reply-To: References: MIME-Version: 1.0 In-Reply-To: References: Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: 8bit To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: When executing a REPLACE or DELETE request for a vinyl space, we need to delete the old tuple from secondary indexes if any, e.g. if there's a space with the primary index over field 1 and a secondary index over field 2 and there's REPLACE{1, 10} in the space, then REPLACE{1, 20} has to generate DELETE{10, 1} in order to overwrite REPLACE{10, 1} before inserting REPLACE{20, 1} into the secondary index. Currently, we generate DELETEs for secondary indexes immediately on request execution, which makes REPLACE/DELETE operations disk-bound in case the space has secondary indexes, because in order to delete the old tuple we have to look it up in the primary index first. Actually, we can postpone DELETE generation and still yield correct results. All we have to do is compare each tuple read from a secondary index with the full tuple corresponding to it in the primary index: if they match, then the tuple is OK to return to the user; if the don't, then the tuple was overwritten in the primary index and we have to skip it. This doesn't introduce any overhead, because we have to look up full tuples in the primary index while reading a secondary index anyways. For instance, consider the example given in the previous paragraph: if we don't insert DELETE{10, 1} into the secondary index, then we will encounter REPLACE{10, 1} when reading it, but the tuple corresponding to it in the primary index is REPLACE{1, 20} != REPLACE{10, 1} so we skip it. This is the first thing that this patch does. However, skipping garbage tuples isn't enough. We have to purge them sooner or later, otherwise we risk iterating over thousands of stale tuples before encountering a fresh one, which would adversely affect latency of SELECT requests over a secondary index. So we mark each and every REPLACE and DELETE statement that was inserted into the primary index without generating DELETEs for secondary index with a special per statement flag VY_STMT_DEFERRED_DELETE and generate DELETEs for these statements when the time comes. The time comes when the primary index finally gets compacted. When writing a compacted run, we iterate over all tuples in the order set by the primary key from newer to older tuples, so each statement marked with VY_STMT_DEFERRED_DELETE will be followed by the tuple it overwrote, provided there's enough runs compacted. We take these tuples and send them to the tx thread over cbus (compaction is done in a worker thread, remember), where deferred DELETEs are generated and inserted into secondary indexes. Well, it isn't that simple actually, but you should have got the basic idea by now. The first problem here is by the time we generate a deferred DELETE, newer statements for the same key could have been inserted into the index and dumped to disk, while the read iterator assumes that the newer the source the newer statements it stores for the same key. In order not to break the read iterator assumptions by inserting deferred DELETEs, we mark them with another special per-statement flag, VY_STMT_SKIP_READ, which renders them invisible to the read iterator. The flag doesn't affect the write iterator though so deferred DELETEs will purge garbage statements when the secondary index eventually gets compacted. The second problem concerns the recovery procedure. Since we write deferred DELETEs to the in-memory level, we need to recover them after restart somehow in case they didn't get dumped. To do that, we write them to WAL (along with LSN and space id) with the aid of a special system blackhole space, _vinyl_deferred_delete. The insertion of deferred DELETEs into in-memory trees is actually done by on_replace trigger installed on the space so deferred DELETEs are generated and recovered by the same code. In order not to recover statements that have been dumped, we account LSNs of WAL rows that generates deferred DELETEs to vy_lsm::dump_lsn and filter dumped statements with vy_is_committed(), just like normal statements. Finally, we may run out of memory while generating deferred DELETEs. This is manageable if happens during compaction - we simply throttle the compaction task until the memory level is dumped. However, we can't do that while generating deferred DELETEs during index dump. Solution: don't generate deferred DELETEs during dump. The thing is we can generate a deferred DELETE during dump only if the overwritten tuple is stored in memory, but if it is, the lookup is nearly free and so we can generate a DELETE when the transaction gets committed. So we introduce a special version of point lookup, vy_point_lookup_mem(), which look ups a tuple by the full key in cache and in memory. When a transaction is committed, we use this function to generate DELETEs. This should outline the pivotal points of the algorithm. More details, as usual, in the code. Closes #2129 --- src/box/vinyl.c | 227 ++++++++++-- src/box/vy_lsm.h | 5 + src/box/vy_mem.h | 6 + src/box/vy_point_lookup.c | 32 ++ src/box/vy_point_lookup.h | 18 + src/box/vy_scheduler.c | 306 +++++++++++++++- src/box/vy_tx.c | 133 +++++++ test/unit/vy_point_lookup.c | 2 + test/vinyl/deferred_delete.result | 677 ++++++++++++++++++++++++++++++++++++ test/vinyl/deferred_delete.test.lua | 261 ++++++++++++++ test/vinyl/info.result | 18 +- test/vinyl/info.test.lua | 9 +- test/vinyl/layout.result | 46 ++- test/vinyl/quota.result | 2 +- test/vinyl/tx_gap_lock.result | 16 +- test/vinyl/tx_gap_lock.test.lua | 10 +- test/vinyl/write_iterator.result | 5 + test/vinyl/write_iterator.test.lua | 3 + 18 files changed, 1703 insertions(+), 73 deletions(-) create mode 100644 test/vinyl/deferred_delete.result create mode 100644 test/vinyl/deferred_delete.test.lua diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 18aa1ba5..1a45fac9 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1273,25 +1273,43 @@ vy_get_by_secondary_tuple(struct vy_lsm *lsm, struct vy_tx *tx, struct tuple *tuple, struct tuple **result) { assert(lsm->index_id > 0); - /* - * No need in vy_tx_track() as the tuple must already be - * tracked in the secondary index LSM tree. - */ + if (vy_point_lookup(lsm->pk, tx, rv, tuple, result) != 0) return -1; - if (*result == NULL) { + if (*result == NULL || + vy_tuple_compare(*result, tuple, lsm->key_def) != 0) { + /* + * If a tuple read from a secondary index doesn't + * match the tuple corresponding to it in the + * primary index, it must have been overwritten or + * deleted, but the DELETE statement hasn't been + * propagated to the secondary index yet. In this + * case silently skip this tuple. + */ + if (*result != NULL) { + tuple_unref(*result); + *result = NULL; + } /* - * All indexes of a space must be consistent, i.e. - * if a tuple is present in one index, it must be - * present in all other indexes as well, so we can - * get here only if there's a bug somewhere in vinyl. - * Don't abort as core dump won't really help us in - * this case. Just warn the user and proceed to the - * next tuple. + * Invalidate the cache entry so that we won't read + * the overwritten tuple again from the cache. */ - say_warn("%s: key %s missing in primary index", - vy_lsm_name(lsm), vy_stmt_str(tuple)); + vy_cache_on_write(&lsm->cache, tuple, NULL); + return 0; + } + + /* + * Even though the tuple is tracked in the secondary index + * read set, we still must track the full tuple read from + * the primary index, otherwise the transaction won't be + * aborted if this tuple is overwritten or deleted, because + * the DELETE statement is not written to secondary indexes + * immediately. + */ + if (tx != NULL && vy_tx_track_point(tx, lsm->pk, *result) != 0) { + tuple_unref(*result); + return -1; } if ((*rv)->vlsn == INT64_MAX) @@ -1604,7 +1622,6 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, struct vy_lsm *lsm = vy_lsm_find_unique(space, request->index_id); if (lsm == NULL) return -1; - bool has_secondary = space->index_count > 1; const char *key = request->key; uint32_t part_count = mp_decode_array(&key); if (vy_unique_key_validate(lsm, key, part_count)) @@ -1614,12 +1631,9 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, * before deletion. * - if the space has on_replace triggers and need to pass * to them the old tuple. - * - * - if the space has one or more secondary indexes, then - * we need to extract secondary keys from the old tuple - * and pass them to indexes for deletion. + * - if deletion is done by a secondary index. */ - if (has_secondary || !rlist_empty(&space->on_replace)) { + if (lsm->index_id > 0 || !rlist_empty(&space->on_replace)) { if (vy_get_by_raw_key(lsm, tx, vy_tx_read_view(tx), key, part_count, &stmt->old_tuple) != 0) return -1; @@ -1628,8 +1642,7 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, } int rc = 0; struct tuple *delete; - if (has_secondary) { - assert(stmt->old_tuple != NULL); + if (stmt->old_tuple != NULL) { delete = vy_stmt_new_surrogate_delete(pk->mem_format, stmt->old_tuple); if (delete == NULL) @@ -1642,12 +1655,14 @@ vy_delete(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, if (rc != 0) break; } - } else { /* Primary is the single index in the space. */ + } else { assert(lsm->index_id == 0); delete = vy_stmt_new_surrogate_delete_from_key(request->key, pk->key_def, pk->mem_format); if (delete == NULL) return -1; + if (space->index_count > 1) + vy_stmt_set_flags(delete, VY_STMT_DEFERRED_DELETE); rc = vy_tx_set(tx, pk, delete); } tuple_unref(delete); @@ -2166,11 +2181,9 @@ vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, /* * Get the overwritten tuple from the primary index if * the space has on_replace triggers, in which case we - * need to pass the old tuple to trigger callbacks, or - * if the space has secondary indexes and so we need - * the old tuple to delete it from them. + * need to pass the old tuple to trigger callbacks. */ - if (space->index_count > 1 || !rlist_empty(&space->on_replace)) { + if (!rlist_empty(&space->on_replace)) { if (vy_get(pk, tx, vy_tx_read_view(tx), stmt->new_tuple, &stmt->old_tuple) != 0) return -1; @@ -2181,6 +2194,8 @@ vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt, */ vy_stmt_set_type(stmt->new_tuple, IPROTO_INSERT); } + } else if (space->index_count > 1) { + vy_stmt_set_flags(stmt->new_tuple, VY_STMT_DEFERRED_DELETE); } /* * Replace in the primary index without explicit deletion @@ -4286,11 +4301,167 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, /* {{{ Deferred DELETE handling */ +/** + * Callback invoked after a deferred DELETE statement has been + * committed to _vinyl_deferred_delete system space. + */ +static void +vy_deferred_delete_on_commit(struct trigger *trigger, void *event) +{ + struct txn *txn = event; + struct vy_mem *mem = trigger->data; + /* + * Update dump_lsn so that we can skip dumped deferred + * DELETE statements on WAL recovery. + */ + assert(mem->dump_lsn <= txn->signature); + mem->dump_lsn = txn->signature; + vy_mem_unpin(mem); +} + +/** + * Callback invoked when a deferred DELETE statement is written + * to _vinyl_deferred_delete system space. It extracts the + * deleted tuple, its LSN, and the target space id from the + * system space row, then generates a deferred DELETE statement + * and inserts it into secondary indexes of the target space. + * + * Note, this callback is also invoked during local WAL recovery + * to restore deferred DELETE statements that haven't been dumped + * to disk. To skip deferred DELETEs that have been dumped, we + * use the same technique we employ for normal WAL statements, + * i.e. we filter them by LSN, see vy_is_committed_one(). To do + * that, we need to account the LSN of a WAL row that generated + * a deferred DELETE statement to vy_lsm::dump_lsn, so we install + * an on_commit trigger that propagates the LSN of the WAL row to + * vy_mem::dump_lsn, which in will contribute to vy_lsm::dump_lsn + * when the in-memory tree is dumped, see vy_task_dump_new(). + * + * This implies that we don't yield between statements of the + * same transaction, because if we did, two deferred DELETEs with + * the same WAL LSN could land in different in-memory trees: if + * one of the trees got dumped while the other didn't, we would + * mistakenly skip both statements on recovery. + */ static void vy_deferred_delete_on_replace(struct trigger *trigger, void *event) { (void)trigger; - (void)event; + + struct txn *txn = event; + struct txn_stmt *stmt = txn_current_stmt(txn); + bool is_first_statement = txn_is_first_statement(txn); + + if (stmt->new_tuple == NULL) + return; + /* + * Extract space id, LSN of the deferred DELETE statement, + * and the deleted tuple from the system space row. + */ + uint32_t space_id; + if (tuple_field_u32(stmt->new_tuple, 0, &space_id) != 0) + diag_raise(); + int64_t lsn; + if (tuple_field_i64(stmt->new_tuple, 1, &lsn) != 0) + diag_raise(); + const char *delete_data = tuple_field(stmt->new_tuple, 2); + if (delete_data == NULL) { + diag_set(ClientError, ER_NO_SUCH_FIELD, 2); + diag_raise(); + } + const char *delete_data_end = delete_data; + mp_next(&delete_data_end); + + /* Look up the space. */ + struct space *space = space_cache_find(space_id); + if (space == NULL) + diag_raise(); + if (space->index_count <= 1) + return; + /* + * Wait for memory quota if necessary before starting to + * process the batch (we can't yield between statements). + */ + struct vy_env *env = vy_env(space->engine); + if (is_first_statement) + vy_quota_wait(&env->quota); + + /* Create the deferred DELETE statement. */ + struct vy_lsm *pk = vy_lsm(space->index[0]); + struct tuple *delete = vy_stmt_new_surrogate_delete_raw(pk->mem_format, + delete_data, delete_data_end); + if (delete == NULL) + diag_raise(); + vy_stmt_set_lsn(delete, lsn); + /* + * A deferred DELETE may be generated after new statements + * were committed for the deleted key while the read iterator + * assumes that newer sources always store newer statements. + * Mark deferred DELETEs with the VY_STMT_SKIP_READ flag so + * as not to break the read iterator assumptions. + */ + vy_stmt_set_flags(delete, VY_STMT_SKIP_READ); + + /* Insert the deferred DELETE into secondary indexes. */ + int rc = 0; + size_t mem_used_before = lsregion_used(&env->mem_env.allocator); + const struct tuple *region_stmt = NULL; + for (uint32_t i = 1; i < space->index_count; i++) { + struct vy_lsm *lsm = vy_lsm(space->index[i]); + if (vy_is_committed_one(env, lsm)) + continue; + /* + * As usual, rotate the active in-memory index if + * schema was changed or dump was triggered. Do it + * only if processing the first statement, because + * dump may be triggered by one of the statements + * of this transaction (see vy_quota_force_use() + * below), in which case we must not do rotation + * as we want all statements to land in the same + * in-memory index. This is safe, as long as we + * don't yield between statements. + */ + struct vy_mem *mem = lsm->mem; + if (is_first_statement && + (mem->space_cache_version != space_cache_version || + mem->generation != *lsm->env->p_generation)) { + rc = vy_lsm_rotate_mem(lsm); + if (rc != 0) + break; + mem = lsm->mem; + } + rc = vy_lsm_set(lsm, mem, delete, ®ion_stmt); + if (rc != 0) + break; + vy_lsm_commit_stmt(lsm, mem, region_stmt); + + if (!is_first_statement) + continue; + /* + * If this is the first statement of this + * transaction, install on_commit trigger + * which will propagate the WAL row LSN to + * the LSM tree. + */ + struct trigger *on_commit = region_alloc(&fiber()->gc, + sizeof(*on_commit)); + if (on_commit == NULL) { + diag_set(OutOfMemory, sizeof(*on_commit), + "region", "struct trigger"); + rc = -1; + break; + } + vy_mem_pin(mem); + trigger_create(on_commit, vy_deferred_delete_on_commit, mem, NULL); + txn_on_commit(txn, on_commit); + } + size_t mem_used_after = lsregion_used(&env->mem_env.allocator); + assert(mem_used_after >= mem_used_before); + vy_quota_force_use(&env->quota, mem_used_after - mem_used_before); + + tuple_unref(delete); + if (rc != 0) + diag_raise(); } static struct trigger on_replace_vinyl_deferred_delete = { diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h index f0b7ec9c..d2aa0c43 100644 --- a/src/box/vy_lsm.h +++ b/src/box/vy_lsm.h @@ -50,6 +50,7 @@ extern "C" { #endif /* defined(__cplusplus) */ struct histogram; +struct index; struct tuple; struct tuple_format; struct vy_lsm; @@ -292,6 +293,10 @@ struct vy_lsm { vy_lsm_read_set_t read_set; }; +/** Extract vy_lsm from an index object. */ +struct vy_lsm * +vy_lsm(struct index *index); + /** Return LSM tree name. Used for logging. */ const char * vy_lsm_name(struct vy_lsm *lsm); diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h index 957f549f..29b60ac7 100644 --- a/src/box/vy_mem.h +++ b/src/box/vy_mem.h @@ -171,6 +171,12 @@ struct vy_mem { * * Once the tree is dumped to disk it will be used to update * vy_lsm::dump_lsn, see vy_task_dump_new(). + * + * Note, we account not only original LSN (vy_stmt_lsn()) + * in this variable, but also WAL LSN of deferred DELETE + * statements. This is needed to skip WAL recovery of both + * deferred and normal statements that have been dumped to + * disk. See vy_deferred_delete_on_replace() for more details. */ int64_t dump_lsn; /** diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c index 5e43340b..7b704b84 100644 --- a/src/box/vy_point_lookup.c +++ b/src/box/vy_point_lookup.c @@ -293,3 +293,35 @@ done: } return 0; } + +int +vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv, + struct tuple *key, struct tuple **ret) +{ + assert(tuple_field_count(key) >= lsm->cmp_def->part_count); + + int rc; + struct vy_history history; + vy_history_create(&history, &lsm->env->history_node_pool); + + rc = vy_point_lookup_scan_cache(lsm, rv, key, &history); + if (rc != 0 || vy_history_is_terminal(&history)) + goto done; + + rc = vy_point_lookup_scan_mems(lsm, rv, key, &history); + if (rc != 0 || vy_history_is_terminal(&history)) + goto done; + + *ret = NULL; + goto out; +done: + if (rc == 0) { + int upserts_applied; + rc = vy_history_apply(&history, lsm->cmp_def, lsm->mem_format, + true, &upserts_applied, ret); + lsm->stat.upsert.applied += upserts_applied; + } +out: + vy_history_cleanup(&history); + return rc; +} diff --git a/src/box/vy_point_lookup.h b/src/box/vy_point_lookup.h index 3b7c5a04..6d77ce9c 100644 --- a/src/box/vy_point_lookup.h +++ b/src/box/vy_point_lookup.h @@ -71,6 +71,24 @@ vy_point_lookup(struct vy_lsm *lsm, struct vy_tx *tx, const struct vy_read_view **rv, struct tuple *key, struct tuple **ret); +/** + * Look up a tuple by key in memory. + * + * This function works just like vy_point_lookup() except: + * + * - It only scans in-memory level and cache and hence doesn't yield. + * - It doesn't turn DELETE into NULL so it returns NULL if and only + * if no terminal statement matching the key is present in memory + * (there still may be statements stored on disk though). + * - It doesn't account the lookup to LSM tree stats (as it never + * descends to lower levels). + * + * The function returns 0 on success, -1 on memory allocation error. + */ +int +vy_point_lookup_mem(struct vy_lsm *lsm, const struct vy_read_view **rv, + struct tuple *key, struct tuple **ret); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 7d8961c4..64975f3a 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -49,6 +49,10 @@ #include "cbus.h" #include "salad/stailq.h" #include "say.h" +#include "txn.h" +#include "space.h" +#include "schema.h" +#include "xrow.h" #include "vy_lsm.h" #include "vy_log.h" #include "vy_mem.h" @@ -65,6 +69,8 @@ static int vy_worker_f(va_list); static int vy_scheduler_f(va_list); static void vy_task_execute_f(struct cmsg *); static void vy_task_complete_f(struct cmsg *); +static void vy_deferred_delete_batch_process_f(struct cmsg *); +static void vy_deferred_delete_batch_free_f(struct cmsg *); static const struct cmsg_hop vy_task_execute_route[] = { { vy_task_execute_f, NULL }, @@ -83,10 +89,42 @@ struct vy_worker { struct cpipe tx_pipe; /** Link in vy_scheduler::idle_workers. */ struct stailq_entry in_idle; + /** Route for sending deferred DELETEs back to tx. */ + struct cmsg_hop deferred_delete_route[2]; }; struct vy_task; +/** Max number of statements in a batch of deferred DELETEs. */ +enum { VY_DEFERRED_DELETE_BATCH_MAX = 100 }; + +/** Deferred DELETE statement. */ +struct vy_deferred_delete_stmt { + /** Overwritten tuple. */ + struct tuple *old_stmt; + /** Statement that overwrote @old_stmt. */ + struct tuple *new_stmt; +}; + +/** + * Batch of deferred DELETE statements generated during + * a primary index compaction. + */ +struct vy_deferred_delete_batch { + /** CBus messages for sending the batch to tx. */ + struct cmsg cmsg; + /** Task that generated this batch. */ + struct vy_task *task; + /** Set if the tx thread failed to process the batch. */ + bool is_failed; + /** In case of failure the error is stored here. */ + struct diag diag; + /** Number of elements actually stored in @stmt array. */ + int count; + /** Array of deferred DELETE statements. */ + struct vy_deferred_delete_stmt stmt[VY_DEFERRED_DELETE_BATCH_MAX]; +}; + struct vy_task_ops { /** * This function is called from a worker. It is supposed to do work @@ -159,10 +197,26 @@ struct vy_task { */ double bloom_fpr; int64_t page_size; + /** + * Deferred DELETE handler passed to the write iterator. + * It sends deferred DELETE statements generated during + * primary index compaction back to tx. + */ + struct vy_deferred_delete_handler deferred_delete_handler; + /** Batch of deferred deletes generated by this task. */ + struct vy_deferred_delete_batch *deferred_delete_batch; + /** + * Number of batches of deferred DELETEs sent to tx + * and not yet processed. + */ + int deferred_delete_in_progress; /** Link in vy_scheduler::processed_tasks. */ struct stailq_entry in_processed; }; +static const struct vy_deferred_delete_handler_iface +vy_task_deferred_delete_iface; + /** * Allocate a new task to be executed by a worker thread. * When preparing an asynchronous task, this function must @@ -197,6 +251,7 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, } vy_lsm_ref(lsm); diag_create(&task->diag); + task->deferred_delete_handler.iface = &vy_task_deferred_delete_iface; return task; } @@ -204,6 +259,8 @@ vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, static void vy_task_delete(struct vy_task *task) { + assert(task->deferred_delete_batch == NULL); + assert(task->deferred_delete_in_progress == 0); key_def_delete(task->cmp_def); key_def_delete(task->key_def); vy_lsm_unref(task->lsm); @@ -293,6 +350,12 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler) cpipe_create(&worker->worker_pipe, name); stailq_add_tail_entry(&scheduler->idle_workers, worker, in_idle); + + struct cmsg_hop *route = worker->deferred_delete_route; + route[0].f = vy_deferred_delete_batch_process_f; + route[0].pipe = &worker->worker_pipe; + route[1].f = vy_deferred_delete_batch_free_f; + route[1].pipe = NULL; } } @@ -652,6 +715,237 @@ vy_run_discard(struct vy_run *run) vy_log_tx_try_commit(); } +/** + * Encode and write a single deferred DELETE statement to + * _vinyl_deferred_delete system space. The rest will be + * done by the space trigger. + */ +static int +vy_deferred_delete_process_one(struct space *deferred_delete_space, + uint32_t space_id, struct tuple_format *format, + struct vy_deferred_delete_stmt *stmt) +{ + int64_t lsn = vy_stmt_lsn(stmt->new_stmt); + + struct tuple *delete; + delete = vy_stmt_new_surrogate_delete(format, stmt->old_stmt); + if (delete == NULL) + return -1; + + uint32_t delete_data_size; + const char *delete_data = tuple_data_range(delete, &delete_data_size); + + size_t buf_size = (mp_sizeof_array(3) + mp_sizeof_uint(space_id) + + mp_sizeof_uint(lsn) + delete_data_size); + char *data = region_alloc(&fiber()->gc, buf_size); + if (data == NULL) { + diag_set(OutOfMemory, buf_size, "region", "buf"); + tuple_unref(delete); + return -1; + } + + char *data_end = data; + data_end = mp_encode_array(data_end, 3); + data_end = mp_encode_uint(data_end, space_id); + data_end = mp_encode_uint(data_end, lsn); + memcpy(data_end, delete_data, delete_data_size); + data_end += delete_data_size; + assert(data_end <= data + buf_size); + + struct request request; + memset(&request, 0, sizeof(request)); + request.type = IPROTO_REPLACE; + request.space_id = BOX_VINYL_DEFERRED_DELETE_ID; + request.tuple = data; + request.tuple_end = data_end; + + tuple_unref(delete); + + struct txn *txn = txn_begin_stmt(deferred_delete_space); + if (txn == NULL) + return -1; + + struct tuple *unused; + if (space_execute_dml(deferred_delete_space, txn, + &request, &unused) != 0) { + txn_rollback_stmt(); + return -1; + } + return txn_commit_stmt(txn, &request); +} + +/** + * Callback invoked by the tx thread to process deferred DELETE + * statements generated during compaction. It writes deferred + * DELETEs to a special system space, _vinyl_deferred_delete. + * The system space has an on_replace trigger installed which + * propagates the DELETEs to secondary indexes. This way, even + * if a deferred DELETE isn't dumped to disk by vinyl, it still + * can be recovered from WAL. + */ +static void +vy_deferred_delete_batch_process_f(struct cmsg *cmsg) +{ + struct vy_deferred_delete_batch *batch = container_of(cmsg, + struct vy_deferred_delete_batch, cmsg); + struct vy_task *task = batch->task; + struct vy_lsm *pk = task->lsm; + + assert(pk->index_id == 0); + /* + * A space can be dropped while a compaction task + * is in progress. + */ + if (pk->is_dropped) + return; + + struct space *deferred_delete_space; + deferred_delete_space = space_by_id(BOX_VINYL_DEFERRED_DELETE_ID); + assert(deferred_delete_space != NULL); + + struct txn *txn = txn_begin(false); + if (txn == NULL) + goto fail; + + for (int i = 0; i < batch->count; i++) { + if (vy_deferred_delete_process_one(deferred_delete_space, + pk->space_id, pk->mem_format, + &batch->stmt[i]) != 0) + goto fail; + } + + if (txn_commit(txn) != 0) + goto fail; + + return; +fail: + batch->is_failed = true; + diag_move(diag_get(), &batch->diag); + txn_rollback(); +} + +/** + * Callback invoked by a worker thread to free processed deferred + * DELETE statements. It must be done on behalf the worker thread + * that generated those DELETEs, because a vinyl statement cannot + * be allocated and freed in different threads. + */ +static void +vy_deferred_delete_batch_free_f(struct cmsg *cmsg) +{ + struct vy_deferred_delete_batch *batch = container_of(cmsg, + struct vy_deferred_delete_batch, cmsg); + struct vy_task *task = batch->task; + for (int i = 0; i < batch->count; i++) { + struct vy_deferred_delete_stmt *stmt = &batch->stmt[i]; + vy_stmt_unref_if_possible(stmt->old_stmt); + vy_stmt_unref_if_possible(stmt->new_stmt); + } + /* + * Abort the task if the tx thread failed to process + * the batch unless it has already been aborted. + */ + if (batch->is_failed && !task->is_failed) { + assert(!diag_is_empty(&batch->diag)); + diag_move(&batch->diag, &task->diag); + task->is_failed = true; + fiber_cancel(task->fiber); + } + diag_destroy(&batch->diag); + free(batch); + /* Notify the caller if this is the last batch. */ + assert(task->deferred_delete_in_progress > 0); + if (--task->deferred_delete_in_progress == 0) + fiber_wakeup(task->fiber); +} + +/** + * Send all deferred DELETEs accumulated by a vinyl task to + * the tx thread where they will be processed. + */ +static void +vy_task_deferred_delete_flush(struct vy_task *task) +{ + struct vy_worker *worker = task->worker; + struct vy_deferred_delete_batch *batch = task->deferred_delete_batch; + + if (batch == NULL) + return; + + task->deferred_delete_batch = NULL; + task->deferred_delete_in_progress++; + + cmsg_init(&batch->cmsg, worker->deferred_delete_route); + cpipe_push(&worker->tx_pipe, &batch->cmsg); +} + +/** + * Add a deferred DELETE to a batch. Once the batch gets full, + * submit it to tx where it will get processed. + */ +static int +vy_task_deferred_delete_process(struct vy_deferred_delete_handler *handler, + struct tuple *old_stmt, struct tuple *new_stmt) +{ + enum { MAX_IN_PROGRESS = 10 }; + + struct vy_task *task = container_of(handler, struct vy_task, + deferred_delete_handler); + struct vy_deferred_delete_batch *batch = task->deferred_delete_batch; + + /* + * Throttle compaction task if there are too many batches + * being processed so as to limit memory consumption. + */ + while (task->deferred_delete_in_progress >= MAX_IN_PROGRESS) + fiber_sleep(TIMEOUT_INFINITY); + + /* Allocate a new batch on demand. */ + if (batch == NULL) { + batch = malloc(sizeof(*batch)); + if (batch == NULL) { + diag_set(OutOfMemory, sizeof(*batch), "malloc", + "struct vy_deferred_delete_batch"); + return -1; + } + memset(batch, 0, sizeof(*batch)); + batch->task = task; + diag_create(&batch->diag); + task->deferred_delete_batch = batch; + } + + assert(batch->count < VY_DEFERRED_DELETE_BATCH_MAX); + struct vy_deferred_delete_stmt *stmt = &batch->stmt[batch->count++]; + stmt->old_stmt = old_stmt; + vy_stmt_ref_if_possible(old_stmt); + stmt->new_stmt = new_stmt; + vy_stmt_ref_if_possible(new_stmt); + + if (batch->count == VY_DEFERRED_DELETE_BATCH_MAX) + vy_task_deferred_delete_flush(task); + return 0; +} + +/** + * Wait until all pending deferred DELETE statements have been + * processed by tx. Called when the write iterator stops. + */ +static void +vy_task_deferred_delete_destroy(struct vy_deferred_delete_handler *handler) +{ + struct vy_task *task = container_of(handler, struct vy_task, + deferred_delete_handler); + vy_task_deferred_delete_flush(task); + while (task->deferred_delete_in_progress > 0) + fiber_sleep(TIMEOUT_INFINITY); +} + +static const struct vy_deferred_delete_handler_iface +vy_task_deferred_delete_iface = { + .process = vy_task_deferred_delete_process, + .destroy = vy_task_deferred_delete_destroy, +}; + static int vy_task_write_run(struct vy_task *task) { @@ -1002,6 +1296,12 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, new_run->dump_lsn = dump_lsn; + /* + * Note, since deferred DELETE are generated on tx commit + * in case the overwritten tuple is found in-memory, no + * deferred DELETE statement should be generated during + * dump so we don't pass a deferred DELETE handler. + */ struct vy_stmt_stream *wi; bool is_last_level = (lsm->run_count == 0); wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format, @@ -1273,7 +1573,9 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm, bool is_last_level = (range->compact_priority == range->slice_count); wi = vy_write_iterator_new(task->cmp_def, lsm->disk_format, lsm->index_id == 0, is_last_level, - scheduler->read_views, NULL); + scheduler->read_views, + lsm->index_id > 0 ? NULL : + &task->deferred_delete_handler); if (wi == NULL) goto err_wi; @@ -1336,7 +1638,7 @@ static int vy_task_f(va_list va) { struct vy_task *task = va_arg(va, struct vy_task *); - if (task->ops->execute(task) != 0) { + if (task->ops->execute(task) != 0 && !task->is_failed) { struct diag *diag = diag_get(); assert(!diag_is_empty(diag)); task->is_failed = true; diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c index c7a6d9ff..27cd9bb7 100644 --- a/src/box/vy_tx.c +++ b/src/box/vy_tx.c @@ -46,6 +46,7 @@ #include "iterator_type.h" #include "salad/stailq.h" #include "schema.h" /* space_cache_version */ +#include "space.h" #include "trigger.h" #include "trivia/util.h" #include "tuple.h" @@ -58,6 +59,7 @@ #include "vy_history.h" #include "vy_read_set.h" #include "vy_read_view.h" +#include "vy_point_lookup.h" int write_set_cmp(struct txv *a, struct txv *b) @@ -468,6 +470,106 @@ vy_tx_write(struct vy_lsm *lsm, struct vy_mem *mem, return vy_lsm_set(lsm, mem, stmt, region_stmt); } +/** + * Try to generate a deferred DELETE statement on tx commit. + * + * This function is supposed to be called for a primary index + * statement which was executed without deletion of the overwritten + * tuple from secondary indexes. It looks up the overwritten tuple + * in memory and, if found, produces the deferred DELETEs and + * inserts them into the transaction log. + * + * Affects @tx->log, @v->stmt. + * + * Returns 0 on success, -1 on memory allocation error. + */ +static int +vy_tx_handle_deferred_delete(struct vy_tx *tx, struct txv *v) +{ + struct vy_lsm *pk = v->lsm; + struct tuple *stmt = v->stmt; + uint8_t flags = vy_stmt_flags(stmt); + + assert(pk->index_id == 0); + assert(flags & VY_STMT_DEFERRED_DELETE); + + struct space *space = space_cache_find(pk->space_id); + if (space == NULL) { + /* + * Space was dropped while transaction was + * in progress. Nothing to do. + */ + return 0; + } + + /* Look up the tuple overwritten by this statement. */ + struct tuple *tuple; + if (vy_point_lookup_mem(pk, &tx->xm->p_global_read_view, + stmt, &tuple) != 0) + return -1; + + if (tuple == NULL) { + /* + * Nothing's found, but there still may be + * matching statements stored on disk so we + * have to defer generation of DELETE until + * compaction. + */ + return 0; + } + + /* + * If a terminal statement is found, we can produce + * DELETE right away so clear the flag now. + */ + vy_stmt_set_flags(stmt, flags & ~VY_STMT_DEFERRED_DELETE); + + if (vy_stmt_type(tuple) == IPROTO_DELETE) { + /* The tuple's already deleted, nothing to do. */ + tuple_unref(tuple); + return 0; + } + + struct tuple *delete_stmt; + delete_stmt = vy_stmt_new_surrogate_delete(pk->mem_format, tuple); + tuple_unref(tuple); + if (delete_stmt == NULL) + return -1; + + if (vy_stmt_type(stmt) == IPROTO_DELETE) { + /* + * Since primary and secondary indexes of the + * same space share in-memory statements, we + * need to use the new DELETE in the primary + * index, because the original DELETE doesn't + * contain secondary key parts. + */ + vy_stmt_counter_acct_tuple(&pk->stat.txw.count, delete_stmt); + vy_stmt_counter_unacct_tuple(&pk->stat.txw.count, stmt); + v->stmt = delete_stmt; + tuple_ref(delete_stmt); + tuple_unref(stmt); + } + + /* + * Make DELETE statements for secondary indexes and + * insert them into the transaction log. + */ + int rc = 0; + for (uint32_t i = 1; i < space->index_count; i++) { + struct vy_lsm *lsm = vy_lsm(space->index[i]); + struct txv *delete_txv = txv_new(tx, lsm, delete_stmt); + if (delete_txv == NULL) { + rc = -1; + break; + } + stailq_insert_entry(&tx->log, delete_txv, v, next_in_log); + vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, delete_stmt); + } + tuple_unref(delete_stmt); + return rc; +} + int vy_tx_prepare(struct vy_tx *tx) { @@ -521,6 +623,22 @@ vy_tx_prepare(struct vy_tx *tx) if (v->is_overwritten) continue; + if (lsm->index_id > 0 && repsert == NULL && delete == NULL) { + /* + * This statement is for a secondary index, + * and the statement corresponding to it in + * the primary index was overwritten. This + * can only happen if insertion of DELETE + * into secondary indexes was postponed until + * primary index compaction. In this case + * the DELETE will not be generated, because + * the corresponding statement never made it + * to the primary index LSM tree. So we must + * skip it for secondary indexes as well. + */ + continue; + } + enum iproto_type type = vy_stmt_type(v->stmt); /* Optimize out INSERT + DELETE for the same key. */ @@ -535,6 +653,16 @@ vy_tx_prepare(struct vy_tx *tx) */ type = IPROTO_INSERT; vy_stmt_set_type(v->stmt, type); + /* + * In case of INSERT, no statement was actually + * overwritten so no need to generate a deferred + * DELETE for secondary indexes. + */ + uint8_t flags = vy_stmt_flags(v->stmt); + if (flags & VY_STMT_DEFERRED_DELETE) { + vy_stmt_set_flags(v->stmt, flags & + ~VY_STMT_DEFERRED_DELETE); + } } if (!v->is_first_insert && type == IPROTO_INSERT) { @@ -550,6 +678,11 @@ vy_tx_prepare(struct vy_tx *tx) return -1; assert(v->mem != NULL); + if (lsm->index_id == 0 && + vy_stmt_flags(v->stmt) & VY_STMT_DEFERRED_DELETE && + vy_tx_handle_deferred_delete(tx, v) != 0) + return -1; + /* In secondary indexes only REPLACE/DELETE can be written. */ vy_stmt_set_lsn(v->stmt, MAX_LSN + tx->psn); const struct tuple **region_stmt = diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c index 87f26900..eee25274 100644 --- a/test/unit/vy_point_lookup.c +++ b/test/unit/vy_point_lookup.c @@ -13,6 +13,8 @@ uint32_t schema_version; uint32_t space_cache_version; +struct space *space_by_id(uint32_t id) { return NULL; } +struct vy_lsm *vy_lsm(struct index *index) { return NULL; } static int write_run(struct vy_run *run, const char *dir_name, diff --git a/test/vinyl/deferred_delete.result b/test/vinyl/deferred_delete.result new file mode 100644 index 00000000..9811b6bc --- /dev/null +++ b/test/vinyl/deferred_delete.result @@ -0,0 +1,677 @@ +test_run = require('test_run').new() +--- +... +fiber = require('fiber') +--- +... +-- +-- Create a space with secondary indexes and check that REPLACE and +-- DELETE requests do not look up the old tuple in the primary index +-- to generate the DELETE statements for secondary indexes. Instead +-- DELETEs are generated when the primary index is compacted (gh-2129). +-- The optimization should work for both non-unique and unique indexes +-- so mark one of the indexes unique. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk', {run_count_per_level = 10}) +--- +... +i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) +--- +... +i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true}) +--- +... +for i = 1, 10 do s:replace{i, i, i} end +--- +... +box.snapshot() +--- +- ok +... +for i = 1, 10, 2 do s:delete{i} end +--- +... +for i = 2, 10, 2 do s:replace{i, i * 10, i * 100} end +--- +... +-- DELETE/REPLACE does not look up the old tuple in the primary index. +pk:stat().lookup -- 0 +--- +- 0 +... +-- DELETEs are not written to secondary indexes. +pk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 5 DELETEs +--- +- 20 +... +i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs +--- +- 15 +... +i2:stat().rows -- ditto +--- +- 15 +... +-- Although there are only 5 tuples in the space, we have to look up +-- overwritten tuples in the primary index hence 15 lookups per SELECT +-- in a secondary index. +i1:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i1:stat().get.rows -- 15 +--- +- 15 +... +pk:stat().lookup -- 15 +--- +- 15 +... +i2:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i2:stat().get.rows -- 15 +--- +- 15 +... +pk:stat().lookup -- 30 +--- +- 30 +... +-- Overwritten/deleted tuples are not stored in the cache so calling +-- SELECT for a second time does only 5 lookups. +box.stat.reset() +--- +... +i1:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i1:stat().get.rows -- 5 +--- +- 5 +... +pk:stat().lookup -- 5 +--- +- 5 +... +i2:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i2:stat().get.rows -- 5 +--- +- 5 +... +pk:stat().lookup -- 10 +--- +- 10 +... +-- Cleanup the cache. +vinyl_cache = box.cfg.vinyl_cache +--- +... +box.cfg{vinyl_cache = 0} +--- +... +box.cfg{vinyl_cache = vinyl_cache} +--- +... +-- Compact the primary index to generate deferred DELETEs. +box.snapshot() +--- +- ok +... +pk:compact() +--- +... +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +pk:stat().rows -- 5 new REPLACEs +--- +- 5 +... +i1:stat().rows -- 10 old REPLACE + 5 new REPLACEs + 10 deferred DELETEs +--- +- 25 +... +i2:stat().rows -- ditto +--- +- 25 +... +-- Deferred DELETEs must be ignored by the read iterator, because +-- they may break the read iterator invariant, so they don't reduce +-- the number of lookups. +box.stat.reset() +--- +... +i1:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i1:stat().get.rows -- 15 +--- +- 15 +... +pk:stat().lookup -- 15 +--- +- 15 +... +i2:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i2:stat().get.rows -- 15 +--- +- 15 +... +pk:stat().lookup -- 30 +--- +- 30 +... +-- Check that deferred DELETEs are not lost after restart. +test_run:cmd("restart server default") +fiber = require('fiber') +--- +... +s = box.space.test +--- +... +pk = s.index.pk +--- +... +i1 = s.index.i1 +--- +... +i2 = s.index.i2 +--- +... +i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 10 deferred DELETEs +--- +- 25 +... +i2:stat().rows -- ditto +--- +- 25 +... +-- Dump deferred DELETEs to disk and compact them. +-- Check that they cleanup garbage statements. +box.snapshot() +--- +- ok +... +i1:compact() +--- +... +while i1:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +i2:compact() +--- +... +while i2:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +i1:stat().rows -- 5 new REPLACEs +--- +- 5 +... +i2:stat().rows -- ditto +--- +- 5 +... +box.stat.reset() +--- +... +i1:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i1:stat().get.rows -- 5 +--- +- 5 +... +pk:stat().lookup -- 5 +--- +- 5 +... +i2:select() +--- +- - [2, 20, 200] + - [4, 40, 400] + - [6, 60, 600] + - [8, 80, 800] + - [10, 100, 1000] +... +i2:stat().get.rows -- 5 +--- +- 5 +... +pk:stat().lookup -- 10 +--- +- 10 +... +s:drop() +--- +... +-- +-- Check that if the old tuple is found in cache or in memory, then +-- the DELETE for secondary indexes is generated when the statement +-- is committed. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk', {run_count_per_level = 10}) +--- +... +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) +--- +... +for i = 1, 10 do s:replace{i, i} end +--- +... +box.snapshot() +--- +- ok +... +s:count() -- add tuples to the cache +--- +- 10 +... +box.stat.reset() +--- +... +for i = 1, 10, 2 do s:delete{i} end +--- +... +for i = 2, 10, 2 do s:replace{i, i * 10} end +--- +... +pk:stat().lookup -- 0 +--- +- 0 +... +pk:stat().cache.lookup -- 10 +--- +- 10 +... +pk:stat().cache.get.rows -- 10 +--- +- 10 +... +pk:stat().memory.iterator.lookup -- 0 +--- +- 0 +... +sk:stat().rows -- 10 old REPLACEs + 10 DELETEs + 5 new REPLACEs +--- +- 25 +... +box.stat.reset() +--- +... +for i = 1, 10 do s:replace{i, i * 100} end +--- +... +pk:stat().lookup -- 0 +--- +- 0 +... +pk:stat().cache.lookup -- 10 +--- +- 10 +... +pk:stat().cache.get.rows -- 0 +--- +- 0 +... +pk:stat().memory.iterator.lookup -- 10 +--- +- 10 +... +pk:stat().memory.iterator.get.rows -- 10 +--- +- 10 +... +sk:stat().rows -- 15 old REPLACEs + 15 DELETEs + 10 new REPLACEs +--- +- 40 +... +box.stat.reset() +--- +... +for i = 1, 10 do s:delete{i} end +--- +... +pk:stat().lookup -- 0 +--- +- 0 +... +pk:stat().cache.lookup -- 10 +--- +- 10 +... +pk:stat().cache.get.rows -- 0 +--- +- 0 +... +pk:stat().memory.iterator.lookup -- 10 +--- +- 10 +... +pk:stat().memory.iterator.get.rows -- 10 +--- +- 10 +... +sk:stat().rows -- 25 old REPLACEs + 25 DELETEs +--- +- 50 +... +sk:select() +--- +- [] +... +pk:stat().lookup -- 0 +--- +- 0 +... +box.snapshot() +--- +- ok +... +sk:compact() +--- +... +while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +sk:stat().run_count -- 0 +--- +- 0 +... +s:drop() +--- +... +-- +-- Check that a transaction is aborted if it read a tuple from +-- a secondary index that was overwritten in the primary index. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk') +--- +... +sk = s:create_index('sk', {parts = {2, 'unsigned'}, unique = false}) +--- +... +s:replace{1, 1} +--- +- [1, 1] +... +box.snapshot() +--- +- ok +... +box.begin() +--- +... +sk:select{1} +--- +- - [1, 1] +... +c = fiber.channel(1) +--- +... +_ = fiber.create(function() s:replace{1, 10} c:put(true) end) +--- +... +c:get() +--- +- true +... +sk:select{1} +--- +- - [1, 1] +... +s:replace{10, 10} +--- +- [10, 10] +... +box.commit() -- error +--- +- error: Transaction has been aborted by conflict +... +s:drop() +--- +... +-- +-- Check that if a tuple was overwritten in the transaction write set, +-- it won't be committed to secondary indexes. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk', {run_count_per_level = 10}) +--- +... +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) +--- +... +for i = 1, 10 do s:replace{i, i} end +--- +... +box.snapshot() +--- +- ok +... +box.begin() +--- +... +for i = 1, 10 do s:replace{i, i * 10} end +--- +... +for i = 1, 10, 2 do s:delete{i} end +--- +... +for i = 2, 10, 2 do s:replace{i, i * 100} end +--- +... +box.commit() +--- +... +sk:select() +--- +- - [2, 200] + - [4, 400] + - [6, 600] + - [8, 800] + - [10, 1000] +... +pk:stat().rows -- 10 old REPLACEs + 5 DELETEs + 5 new REPLACEs +--- +- 20 +... +sk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs +--- +- 15 +... +-- Compact the primary index to generate deferred DELETEs. +box.snapshot() +--- +- ok +... +pk:compact() +--- +... +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +-- Compact the secondary index to cleanup garbage. +box.snapshot() +--- +- ok +... +sk:compact() +--- +... +while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +sk:select() +--- +- - [2, 200] + - [4, 400] + - [6, 600] + - [8, 800] + - [10, 1000] +... +pk:stat().rows -- 5 new REPLACEs +--- +- 5 +... +sk:stat().rows -- ditto +--- +- 5 +... +s:drop() +--- +... +-- +-- Check that on recovery we do not apply deferred DELETEs that +-- have been dumped to disk. +-- +test_run:cmd("create server test with script='vinyl/low_quota.lua'") +--- +- true +... +test_run:cmd("start server test with args='1048576'") +--- +- true +... +test_run:cmd("switch test") +--- +- true +... +fiber = require('fiber') +--- +... +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +pk = s:create_index('pk', {run_count_per_level = 10}) +--- +... +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned', 3, 'string'}, unique = false}) +--- +... +pad = string.rep('x', 10 * 1024) +--- +... +for i = 1, 120 do s:replace{i, i, pad} end +--- +... +box.snapshot() +--- +- ok +... +pad = string.rep('y', 10 * 1024) +--- +... +for i = 1, 120 do s:replace{i, i, pad} end +--- +... +box.snapshot() +--- +- ok +... +sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs +--- +- 240 +... +box.stat.reset() +--- +... +-- Compact the primary index to generate deferred DELETEs. +-- Deferred DELETEs won't fit in memory and trigger dump +-- of the secondary index. +pk:compact() +--- +... +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +--- +... +sk:stat().disk.dump.count -- 1 +--- +- 1 +... +sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs + 120 deferred DELETEs +--- +- 360 +... +test_run:cmd("restart server test with args='1048576'") +s = box.space.test +--- +... +pk = s.index.pk +--- +... +sk = s.index.sk +--- +... +-- Should be 360, the same amount of statements as before restart. +-- If we applied all deferred DELETEs, including the dumped ones, +-- then there would be more. +sk:stat().rows +--- +- 360 +... +s:drop() +--- +... +test_run:cmd("switch default") +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... diff --git a/test/vinyl/deferred_delete.test.lua b/test/vinyl/deferred_delete.test.lua new file mode 100644 index 00000000..d18361a0 --- /dev/null +++ b/test/vinyl/deferred_delete.test.lua @@ -0,0 +1,261 @@ +test_run = require('test_run').new() +fiber = require('fiber') + +-- +-- Create a space with secondary indexes and check that REPLACE and +-- DELETE requests do not look up the old tuple in the primary index +-- to generate the DELETE statements for secondary indexes. Instead +-- DELETEs are generated when the primary index is compacted (gh-2129). +-- The optimization should work for both non-unique and unique indexes +-- so mark one of the indexes unique. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +pk = s:create_index('pk', {run_count_per_level = 10}) +i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) +i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true}) +for i = 1, 10 do s:replace{i, i, i} end +box.snapshot() +for i = 1, 10, 2 do s:delete{i} end +for i = 2, 10, 2 do s:replace{i, i * 10, i * 100} end + +-- DELETE/REPLACE does not look up the old tuple in the primary index. +pk:stat().lookup -- 0 + +-- DELETEs are not written to secondary indexes. +pk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 5 DELETEs +i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs +i2:stat().rows -- ditto + +-- Although there are only 5 tuples in the space, we have to look up +-- overwritten tuples in the primary index hence 15 lookups per SELECT +-- in a secondary index. +i1:select() +i1:stat().get.rows -- 15 +pk:stat().lookup -- 15 +i2:select() +i2:stat().get.rows -- 15 +pk:stat().lookup -- 30 + +-- Overwritten/deleted tuples are not stored in the cache so calling +-- SELECT for a second time does only 5 lookups. +box.stat.reset() +i1:select() +i1:stat().get.rows -- 5 +pk:stat().lookup -- 5 +i2:select() +i2:stat().get.rows -- 5 +pk:stat().lookup -- 10 + +-- Cleanup the cache. +vinyl_cache = box.cfg.vinyl_cache +box.cfg{vinyl_cache = 0} +box.cfg{vinyl_cache = vinyl_cache} + +-- Compact the primary index to generate deferred DELETEs. +box.snapshot() +pk:compact() +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +pk:stat().rows -- 5 new REPLACEs +i1:stat().rows -- 10 old REPLACE + 5 new REPLACEs + 10 deferred DELETEs +i2:stat().rows -- ditto + +-- Deferred DELETEs must be ignored by the read iterator, because +-- they may break the read iterator invariant, so they don't reduce +-- the number of lookups. +box.stat.reset() +i1:select() +i1:stat().get.rows -- 15 +pk:stat().lookup -- 15 +i2:select() +i2:stat().get.rows -- 15 +pk:stat().lookup -- 30 + +-- Check that deferred DELETEs are not lost after restart. +test_run:cmd("restart server default") +fiber = require('fiber') +s = box.space.test +pk = s.index.pk +i1 = s.index.i1 +i2 = s.index.i2 +i1:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + 10 deferred DELETEs +i2:stat().rows -- ditto + +-- Dump deferred DELETEs to disk and compact them. +-- Check that they cleanup garbage statements. +box.snapshot() +i1:compact() +while i1:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +i2:compact() +while i2:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +i1:stat().rows -- 5 new REPLACEs +i2:stat().rows -- ditto +box.stat.reset() +i1:select() +i1:stat().get.rows -- 5 +pk:stat().lookup -- 5 +i2:select() +i2:stat().get.rows -- 5 +pk:stat().lookup -- 10 + +s:drop() + +-- +-- Check that if the old tuple is found in cache or in memory, then +-- the DELETE for secondary indexes is generated when the statement +-- is committed. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +pk = s:create_index('pk', {run_count_per_level = 10}) +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) + +for i = 1, 10 do s:replace{i, i} end +box.snapshot() +s:count() -- add tuples to the cache + +box.stat.reset() +for i = 1, 10, 2 do s:delete{i} end +for i = 2, 10, 2 do s:replace{i, i * 10} end +pk:stat().lookup -- 0 +pk:stat().cache.lookup -- 10 +pk:stat().cache.get.rows -- 10 +pk:stat().memory.iterator.lookup -- 0 +sk:stat().rows -- 10 old REPLACEs + 10 DELETEs + 5 new REPLACEs + +box.stat.reset() +for i = 1, 10 do s:replace{i, i * 100} end +pk:stat().lookup -- 0 +pk:stat().cache.lookup -- 10 +pk:stat().cache.get.rows -- 0 +pk:stat().memory.iterator.lookup -- 10 +pk:stat().memory.iterator.get.rows -- 10 +sk:stat().rows -- 15 old REPLACEs + 15 DELETEs + 10 new REPLACEs + +box.stat.reset() +for i = 1, 10 do s:delete{i} end +pk:stat().lookup -- 0 +pk:stat().cache.lookup -- 10 +pk:stat().cache.get.rows -- 0 +pk:stat().memory.iterator.lookup -- 10 +pk:stat().memory.iterator.get.rows -- 10 +sk:stat().rows -- 25 old REPLACEs + 25 DELETEs + +sk:select() +pk:stat().lookup -- 0 + +box.snapshot() +sk:compact() +while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end +sk:stat().run_count -- 0 + +s:drop() + +-- +-- Check that a transaction is aborted if it read a tuple from +-- a secondary index that was overwritten in the primary index. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +pk = s:create_index('pk') +sk = s:create_index('sk', {parts = {2, 'unsigned'}, unique = false}) +s:replace{1, 1} +box.snapshot() + +box.begin() +sk:select{1} +c = fiber.channel(1) +_ = fiber.create(function() s:replace{1, 10} c:put(true) end) +c:get() +sk:select{1} +s:replace{10, 10} +box.commit() -- error + +s:drop() + +-- +-- Check that if a tuple was overwritten in the transaction write set, +-- it won't be committed to secondary indexes. +-- +s = box.schema.space.create('test', {engine = 'vinyl'}) +pk = s:create_index('pk', {run_count_per_level = 10}) +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) +for i = 1, 10 do s:replace{i, i} end +box.snapshot() + +box.begin() +for i = 1, 10 do s:replace{i, i * 10} end +for i = 1, 10, 2 do s:delete{i} end +for i = 2, 10, 2 do s:replace{i, i * 100} end +box.commit() + +sk:select() + +pk:stat().rows -- 10 old REPLACEs + 5 DELETEs + 5 new REPLACEs +sk:stat().rows -- 10 old REPLACEs + 5 new REPLACEs + +-- Compact the primary index to generate deferred DELETEs. +box.snapshot() +pk:compact() +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end + +-- Compact the secondary index to cleanup garbage. +box.snapshot() +sk:compact() +while sk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end + +sk:select() + +pk:stat().rows -- 5 new REPLACEs +sk:stat().rows -- ditto + +s:drop() + +-- +-- Check that on recovery we do not apply deferred DELETEs that +-- have been dumped to disk. +-- +test_run:cmd("create server test with script='vinyl/low_quota.lua'") +test_run:cmd("start server test with args='1048576'") +test_run:cmd("switch test") + +fiber = require('fiber') + +s = box.schema.space.create('test', {engine = 'vinyl'}) +pk = s:create_index('pk', {run_count_per_level = 10}) +sk = s:create_index('sk', {run_count_per_level = 10, parts = {2, 'unsigned', 3, 'string'}, unique = false}) + +pad = string.rep('x', 10 * 1024) +for i = 1, 120 do s:replace{i, i, pad} end +box.snapshot() + +pad = string.rep('y', 10 * 1024) +for i = 1, 120 do s:replace{i, i, pad} end +box.snapshot() + +sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs + +box.stat.reset() + +-- Compact the primary index to generate deferred DELETEs. +-- Deferred DELETEs won't fit in memory and trigger dump +-- of the secondary index. +pk:compact() +while pk:stat().disk.compact.count == 0 do fiber.sleep(0.001) end + +sk:stat().disk.dump.count -- 1 + +sk:stat().rows -- 120 old REPLACEs + 120 new REPLACEs + 120 deferred DELETEs + +test_run:cmd("restart server test with args='1048576'") +s = box.space.test +pk = s.index.pk +sk = s.index.sk + +-- Should be 360, the same amount of statements as before restart. +-- If we applied all deferred DELETEs, including the dumped ones, +-- then there would be more. +sk:stat().rows + +s:drop() + +test_run:cmd("switch default") +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") diff --git a/test/vinyl/info.result b/test/vinyl/info.result index 112ba85e..95e8cc60 100644 --- a/test/vinyl/info.result +++ b/test/vinyl/info.result @@ -1036,10 +1036,10 @@ s:bsize() --- - 0 ... -i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 1}) +i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 10}) --- ... -i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 1}) +i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 10}) --- ... s:bsize() @@ -1162,7 +1162,7 @@ s:bsize() i1:len(), i2:len() --- - 150 -- 150 +- 100 ... i1:bsize(), i2:bsize() --- @@ -1189,13 +1189,25 @@ i2:bsize() == st2.memory.index_size + st2.disk.index_size + st2.disk.bloom_size --- - true ... +-- Compact the primary index first to generate deferred DELETEs. +-- Then dump them and compact the secondary index. box.snapshot() --- - ok ... +i1:compact() +--- +... wait(function() return i1:stat() end, st1, 'disk.compact.count', 1) --- ... +box.snapshot() +--- +- ok +... +i2:compact() +--- +... wait(function() return i2:stat() end, st2, 'disk.compact.count', 1) --- ... diff --git a/test/vinyl/info.test.lua b/test/vinyl/info.test.lua index 863a8793..5aebd0a8 100644 --- a/test/vinyl/info.test.lua +++ b/test/vinyl/info.test.lua @@ -322,8 +322,8 @@ s:drop() s = box.schema.space.create('test', {engine = 'vinyl'}) s:bsize() -i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 1}) -i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 1}) +i1 = s:create_index('i1', {parts = {1, 'unsigned'}, run_count_per_level = 10}) +i2 = s:create_index('i2', {parts = {2, 'unsigned'}, run_count_per_level = 10}) s:bsize() i1:len(), i2:len() i1:bsize(), i2:bsize() @@ -365,8 +365,13 @@ i2:len() == st2.memory.rows + st2.disk.rows i1:bsize() == st1.memory.index_size + st1.disk.index_size + st1.disk.bloom_size i2:bsize() == st2.memory.index_size + st2.disk.index_size + st2.disk.bloom_size + st2.disk.bytes +-- Compact the primary index first to generate deferred DELETEs. +-- Then dump them and compact the secondary index. box.snapshot() +i1:compact() wait(function() return i1:stat() end, st1, 'disk.compact.count', 1) +box.snapshot() +i2:compact() wait(function() return i2:stat() end, st2, 'disk.compact.count', 1) st1 = i1:stat() st2 = i2:stat() diff --git a/test/vinyl/layout.result b/test/vinyl/layout.result index 49826302..ee14cd51 100644 --- a/test/vinyl/layout.result +++ b/test/vinyl/layout.result @@ -253,17 +253,17 @@ result - - 00000000000000000008.run - - HEADER: lsn: 10 - type: INSERT + type: REPLACE BODY: tuple: ['ёёё', null] - HEADER: lsn: 9 - type: INSERT + type: REPLACE BODY: tuple: ['эээ', null] - HEADER: lsn: 8 - type: INSERT + type: REPLACE BODY: tuple: ['ЭЭЭ', null] - HEADER: @@ -285,8 +285,8 @@ result BODY: row_index_offset: offset: - size: 90 - unpacked_size: 71 + size: 102 + unpacked_size: 83 row_count: 3 min_key: ['ёёё'] - - 00000000000000000012.run @@ -295,20 +295,23 @@ result type: REPLACE BODY: tuple: ['ёёё', 123] + tuple_meta: {1: 1} - HEADER: lsn: 13 - type: INSERT + type: REPLACE BODY: tuple: ['ююю', 789] + tuple_meta: {1: 1} - HEADER: lsn: 12 - type: INSERT + type: REPLACE BODY: tuple: ['ЮЮЮ', 456] + tuple_meta: {1: 1} - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0\"" + row_index: "\0\0\0\0\0\0\0\x14\0\0\0*" - - 00000000000000000006.index - - HEADER: type: RUNINFO @@ -331,17 +334,17 @@ result - - 00000000000000000006.run - - HEADER: lsn: 10 - type: INSERT + type: REPLACE BODY: tuple: [null, 'ёёё'] - HEADER: lsn: 9 - type: INSERT + type: REPLACE BODY: tuple: [null, 'эээ'] - HEADER: lsn: 8 - type: INSERT + type: REPLACE BODY: tuple: [null, 'ЭЭЭ'] - HEADER: @@ -357,41 +360,36 @@ result page_count: 1 bloom_filter: max_lsn: 13 - min_key: [null, 'ёёё'] + min_key: [123, 'ёёё'] - HEADER: type: PAGEINFO BODY: row_index_offset: offset: - size: 110 - unpacked_size: 91 - row_count: 4 - min_key: [null, 'ёёё'] + size: 90 + unpacked_size: 71 + row_count: 3 + min_key: [123, 'ёёё'] - - 00000000000000000010.run - - HEADER: lsn: 11 - type: DELETE - BODY: - key: [null, 'ёёё'] - - HEADER: - lsn: 11 type: REPLACE BODY: tuple: [123, 'ёёё'] - HEADER: lsn: 12 - type: INSERT + type: REPLACE BODY: tuple: [456, 'ЮЮЮ'] - HEADER: lsn: 13 - type: INSERT + type: REPLACE BODY: tuple: [789, 'ююю'] - HEADER: type: ROWINDEX BODY: - row_index: "\0\0\0\0\0\0\0\x10\0\0\0 \0\0\02" + row_index: "\0\0\0\0\0\0\0\x10\0\0\0\"" ... test_run:cmd("clear filter") --- diff --git a/test/vinyl/quota.result b/test/vinyl/quota.result index e323bc4e..48042185 100644 --- a/test/vinyl/quota.result +++ b/test/vinyl/quota.result @@ -89,7 +89,7 @@ _ = space:replace{1, 1, string.rep('a', 1024 * 1024 * 5)} ... box.stat.vinyl().quota.used --- -- 5341228 +- 5341267 ... space:drop() --- diff --git a/test/vinyl/tx_gap_lock.result b/test/vinyl/tx_gap_lock.result index 150826cb..a456c017 100644 --- a/test/vinyl/tx_gap_lock.result +++ b/test/vinyl/tx_gap_lock.result @@ -1194,8 +1194,8 @@ s:drop() --- ... ---------------------------------------------------------------- --- gh-2534: Iterator over a secondary index doesn't double track --- results in the primary index. +-- Iterator over a secondary index tracks all results in the +-- primary index. Needed for gh-2129. ---------------------------------------------------------------- s = box.schema.space.create('test', {engine = 'vinyl'}) --- @@ -1219,23 +1219,23 @@ gap_lock_count() -- 0 _ = s.index.sk:select({}, {limit = 50}) --- ... -gap_lock_count() -- 1 +gap_lock_count() -- 51 --- -- 1 +- 51 ... for i = 1, 100 do s.index.sk:get(i) end --- ... -gap_lock_count() -- 51 +gap_lock_count() -- 151 --- -- 51 +- 151 ... _ = s.index.sk:select() --- ... -gap_lock_count() -- 1 +gap_lock_count() -- 101 --- -- 1 +- 101 ... box.commit() --- diff --git a/test/vinyl/tx_gap_lock.test.lua b/test/vinyl/tx_gap_lock.test.lua index 4d8d21d8..4ad55860 100644 --- a/test/vinyl/tx_gap_lock.test.lua +++ b/test/vinyl/tx_gap_lock.test.lua @@ -380,8 +380,8 @@ c4:commit() s:drop() ---------------------------------------------------------------- --- gh-2534: Iterator over a secondary index doesn't double track --- results in the primary index. +-- Iterator over a secondary index tracks all results in the +-- primary index. Needed for gh-2129. ---------------------------------------------------------------- s = box.schema.space.create('test', {engine = 'vinyl'}) _ = s:create_index('pk', {parts = {1, 'unsigned'}}) @@ -390,11 +390,11 @@ for i = 1, 100 do s:insert{i, i} end box.begin() gap_lock_count() -- 0 _ = s.index.sk:select({}, {limit = 50}) -gap_lock_count() -- 1 -for i = 1, 100 do s.index.sk:get(i) end gap_lock_count() -- 51 +for i = 1, 100 do s.index.sk:get(i) end +gap_lock_count() -- 151 _ = s.index.sk:select() -gap_lock_count() -- 1 +gap_lock_count() -- 101 box.commit() gap_lock_count() -- 0 s:drop() diff --git a/test/vinyl/write_iterator.result b/test/vinyl/write_iterator.result index c38de5d3..cf1e426c 100644 --- a/test/vinyl/write_iterator.result +++ b/test/vinyl/write_iterator.result @@ -741,6 +741,11 @@ space:drop() s = box.schema.space.create('test', {engine = 'vinyl'}) --- ... +-- Install on_replace trigger to disable DELETE optimization +-- in the secondary index (gh-2129). +_ = s:on_replace(function() end) +--- +... pk = s:create_index('primary', {run_count_per_level = 1}) --- ... diff --git a/test/vinyl/write_iterator.test.lua b/test/vinyl/write_iterator.test.lua index 73c90c42..a1de240f 100644 --- a/test/vinyl/write_iterator.test.lua +++ b/test/vinyl/write_iterator.test.lua @@ -317,6 +317,9 @@ space:drop() -- gh-2875 INSERT+DELETE pairs are annihilated on compaction s = box.schema.space.create('test', {engine = 'vinyl'}) +-- Install on_replace trigger to disable DELETE optimization +-- in the secondary index (gh-2129). +_ = s:on_replace(function() end) pk = s:create_index('primary', {run_count_per_level = 1}) sk = s:create_index('secondary', {run_count_per_level = 1, parts = {2, 'unsigned'}}) PAD1 = 100 -- 2.11.0