From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH 11/12] vinyl: make read iterator always return newest tuple version Date: Sun, 15 Apr 2018 22:55:24 +0300 Message-Id: In-Reply-To: References: In-Reply-To: References: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: vy_read_iterator_next() first scans all sources to find the newest statement for the next key. If the statement turns out to be UPSERT, it retrieves older statements until a terminal statement is found. While retrieving older statements, it may yield, which opens a window for other fibers to insert newer statements for the same key. Those statements will be ignored by the read iterator even if they are visible from the iterator read view. To be able to use read iterator for building secondary indexes (i.e. iterate over tuples stored in pk and insert them into the new index), we need to lift that limitation, otherwise there's a good chance that while building a new index we will overwrite statements inserted by concurrent transactions. So this patch reworks the read iterator to guarantee that it always returns the newest tuple version. To achieve that, we now retrieve the whole key history instead of only the last statement first time we scan a source. Respectively, we store a vy_history instead of tuple in vy_read_src. All source iterators now return vy_history too. As a result, we don't need to iterate to the next LSN (and possibly yield) to apply UPSERTs - we simply splice all histories corresponding to the next key and call vy_history_apply() to return the resultant tuple. Note, this patch changes the output of vinyl/upsert.test.lua. This is OK as now UPSERTs are applied in the reverse order, from the oldest statement to the newest. --- src/box/vy_cache.c | 32 +++-- src/box/vy_cache.h | 31 +++-- src/box/vy_history.c | 11 +- src/box/vy_history.h | 31 ++++- src/box/vy_mem.c | 81 ++++++------ src/box/vy_mem.h | 33 ++--- src/box/vy_point_lookup.c | 29 ++--- src/box/vy_read_iterator.c | 306 +++++++++++++++------------------------------ src/box/vy_read_iterator.h | 2 - src/box/vy_run.c | 55 ++++++-- src/box/vy_run.h | 23 ++-- src/box/vy_tx.c | 43 ++++--- src/box/vy_tx.h | 29 +++-- test/unit/CMakeLists.txt | 2 + test/unit/vy_cache.c | 16 ++- test/unit/vy_mem.c | 24 +++- test/vinyl/upsert.result | 8 +- test/vinyl/upsert.test.lua | 6 +- 18 files changed, 378 insertions(+), 384 deletions(-) diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c index d4cdcdff..2b9e40fa 100644 --- a/src/box/vy_cache.c +++ b/src/box/vy_cache.c @@ -32,6 +32,7 @@ #include "diag.h" #include "fiber.h" #include "schema_def.h" +#include "vy_history.h" #ifndef CT_ASSERT_G #define CT_ASSERT_G(e) typedef char CONCAT(__ct_assert_, __LINE__)[(e) ? 1 :-1] @@ -658,12 +659,12 @@ vy_cache_iterator_seek(struct vy_cache_iterator *itr, *entry = *vy_cache_tree_iterator_get_elem(tree, &itr->curr_pos); } -void +NODISCARD int vy_cache_iterator_next(struct vy_cache_iterator *itr, - struct tuple **ret, bool *stop) + struct vy_history *history, bool *stop) { - *ret = NULL; *stop = false; + vy_history_cleanup(history); if (!itr->search_started) { assert(itr->curr_stmt == NULL); @@ -673,33 +674,34 @@ vy_cache_iterator_next(struct vy_cache_iterator *itr, vy_cache_iterator_seek(itr, itr->iterator_type, itr->key, &entry); if (entry == NULL) - return; + return 0; itr->curr_stmt = entry->stmt; *stop = vy_cache_iterator_is_stop(itr, entry); } else { assert(itr->version == itr->cache->version); if (itr->curr_stmt == NULL) - return; + return 0; tuple_unref(itr->curr_stmt); *stop = vy_cache_iterator_step(itr, &itr->curr_stmt); } vy_cache_iterator_skip_to_read_view(itr, stop); if (itr->curr_stmt != NULL) { - *ret = itr->curr_stmt; tuple_ref(itr->curr_stmt); vy_stmt_counter_acct_tuple(&itr->cache->stat.get, itr->curr_stmt); + return vy_history_append_stmt(history, itr->curr_stmt); } + return 0; } -void +NODISCARD int vy_cache_iterator_skip(struct vy_cache_iterator *itr, const struct tuple *last_stmt, - struct tuple **ret, bool *stop) + struct vy_history *history, bool *stop) { - *ret = NULL; *stop = false; + vy_history_cleanup(history); assert(!itr->search_started || itr->version == itr->cache->version); @@ -732,17 +734,18 @@ vy_cache_iterator_skip(struct vy_cache_iterator *itr, vy_cache_iterator_skip_to_read_view(itr, stop); if (itr->curr_stmt != NULL) { - *ret = itr->curr_stmt; tuple_ref(itr->curr_stmt); vy_stmt_counter_acct_tuple(&itr->cache->stat.get, itr->curr_stmt); + return vy_history_append_stmt(history, itr->curr_stmt); } + return 0; } -int +NODISCARD int vy_cache_iterator_restore(struct vy_cache_iterator *itr, const struct tuple *last_stmt, - struct tuple **ret, bool *stop) + struct vy_history *history, bool *stop) { struct key_def *def = itr->cache->cmp_def; int dir = iterator_direction(itr->iterator_type); @@ -818,11 +821,14 @@ vy_cache_iterator_restore(struct vy_cache_iterator *itr, break; } } - *ret = itr->curr_stmt; + + vy_history_cleanup(history); if (itr->curr_stmt != NULL) { tuple_ref(itr->curr_stmt); vy_stmt_counter_acct_tuple(&itr->cache->stat.get, itr->curr_stmt); + if (vy_history_append_stmt(history, itr->curr_stmt) != 0) + return -1; return prev_stmt != itr->curr_stmt; } return 0; diff --git a/src/box/vy_cache.h b/src/box/vy_cache.h index 54fd2892..d2545152 100644 --- a/src/box/vy_cache.h +++ b/src/box/vy_cache.h @@ -46,6 +46,8 @@ extern "C" { #endif /* defined(__cplusplus) */ +struct vy_history; + /** * A record in tuple cache */ @@ -263,34 +265,37 @@ vy_cache_iterator_open(struct vy_cache_iterator *itr, struct vy_cache *cache, const struct tuple *key, const struct vy_read_view **rv); /** - * Advance a cache iterator to the next statement. - * The next statement is returned in @ret (NULL if EOF). + * Advance a cache iterator to the next key. + * The key history is returned in @history (empty if EOF). * @stop flag is set if a chain was found in the cache * and so there shouldn't be statements preceding the - * returned statement in memory or on disk. + * returned statement in memory or on disk. The function + * returns 0 on success, -1 on memory allocation error. */ -void +NODISCARD int vy_cache_iterator_next(struct vy_cache_iterator *itr, - struct tuple **ret, bool *stop); + struct vy_history *history, bool *stop); /** - * Advance a cache iterator to the statement following @last_stmt. - * The statement is returned in @ret (NULL if EOF). + * Advance a cache iterator to the key following @last_stmt. + * The key history is returned in @history (empty if EOF). + * Returns 0 on success, -1 on memory allocation error. */ -void +NODISCARD int vy_cache_iterator_skip(struct vy_cache_iterator *itr, const struct tuple *last_stmt, - struct tuple **ret, bool *stop); + struct vy_history *history, bool *stop); /** * Check if a cache iterator was invalidated and needs to be restored. - * If it does, set the iterator position to the statement following - * @last_stmt and return 1, otherwise return 0. + * If it does, set the iterator position to the first key following + * @last_stmt and return 1, otherwise return 0. Returns -1 on memory + * allocation error. */ -int +NODISCARD int vy_cache_iterator_restore(struct vy_cache_iterator *itr, const struct tuple *last_stmt, - struct tuple **ret, bool *stop); + struct vy_history *history, bool *stop); /** * Close a cache iterator. diff --git a/src/box/vy_history.c b/src/box/vy_history.c index c1fe6c6a..79d80653 100644 --- a/src/box/vy_history.c +++ b/src/box/vy_history.c @@ -74,8 +74,8 @@ vy_history_cleanup(struct vy_history *history) int vy_history_apply(struct vy_history *history, const struct key_def *cmp_def, - struct tuple_format *format, int *upserts_applied, - struct tuple **ret) + struct tuple_format *format, bool keep_delete, + int *upserts_applied, struct tuple **ret) { *ret = NULL; *upserts_applied = 0; @@ -86,8 +86,11 @@ vy_history_apply(struct vy_history *history, const struct key_def *cmp_def, struct vy_history_node *node = rlist_last_entry(&history->stmts, struct vy_history_node, link); if (vy_history_is_terminal(history)) { - if (vy_stmt_type(node->stmt) == IPROTO_DELETE) { - /* Ignore terminal delete */ + if (!keep_delete && vy_stmt_type(node->stmt) == IPROTO_DELETE) { + /* + * Ignore terminal delete unless the caller + * explicitly asked to keep it. + */ } else if (!node->is_refable) { curr_stmt = vy_stmt_dup(node->stmt); } else { diff --git a/src/box/vy_history.h b/src/box/vy_history.h index bb4ed28e..1f8bb591 100644 --- a/src/box/vy_history.h +++ b/src/box/vy_history.h @@ -110,6 +110,31 @@ vy_history_is_terminal(struct vy_history *history) } /** + * Return the last (newest, having max LSN) statement of the given + * key history or NULL if the history is empty. + */ +static inline struct tuple * +vy_history_last_stmt(struct vy_history *history) +{ + if (rlist_empty(&history->stmts)) + return NULL; + /* Newest statement is at the head of the list. */ + struct vy_history_node *node = rlist_first_entry(&history->stmts, + struct vy_history_node, link); + return node->stmt; +} + +/** + * Append all statements of history @src to history @dst. + */ +static inline void +vy_history_splice(struct vy_history *dst, struct vy_history *src) +{ + assert(dst->pool == src->pool); + rlist_splice_tail(&dst->stmts, &src->stmts); +} + +/** * Append an (older) statement to a history list. * Returns 0 on success, -1 on memory allocation error. */ @@ -125,11 +150,13 @@ vy_history_cleanup(struct vy_history *history); /** * Get a resultant statement from collected history. + * If the resultant statement is a DELETE, the function + * will return NULL unless @keep_delete flag is set. */ int vy_history_apply(struct vy_history *history, const struct key_def *cmp_def, - struct tuple_format *format, int *upserts_applied, - struct tuple **ret); + struct tuple_format *format, bool keep_delete, + int *upserts_applied, struct tuple **ret); #if defined(__cplusplus) } /* extern "C" */ diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c index 9d40f76e..65e31ea5 100644 --- a/src/box/vy_mem.c +++ b/src/box/vy_mem.c @@ -39,6 +39,7 @@ #include "diag.h" #include "tuple.h" +#include "vy_history.h" /** {{{ vy_mem_env */ @@ -269,26 +270,6 @@ vy_mem_rollback_stmt(struct vy_mem *mem, const struct tuple *stmt) /* {{{ vy_mem_iterator support functions */ /** - * Copy current statement into the out parameter. It is necessary - * because vy_mem stores its tuples in the lsregion allocated - * area, and lsregion tuples can't be referenced or unreferenced. - */ -static int -vy_mem_iterator_copy_to(struct vy_mem_iterator *itr, struct tuple **ret) -{ - assert(itr->curr_stmt != NULL); - if (itr->last_stmt) - tuple_unref(itr->last_stmt); - itr->last_stmt = vy_stmt_dup(itr->curr_stmt); - *ret = itr->last_stmt; - if (itr->last_stmt != NULL) { - vy_stmt_counter_acct_tuple(&itr->stat->get, *ret); - return 0; - } - return -1; -} - -/** * Get a stmt by current position */ static const struct tuple * @@ -450,7 +431,6 @@ vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *s itr->curr_pos = vy_mem_tree_invalid_iterator(); itr->curr_stmt = NULL; - itr->last_stmt = NULL; itr->search_started = false; } @@ -461,7 +441,7 @@ vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *s * @retval 1 Not found */ static NODISCARD int -vy_mem_iterator_next_key_impl(struct vy_mem_iterator *itr) +vy_mem_iterator_next_key(struct vy_mem_iterator *itr) { if (!itr->search_started) return vy_mem_iterator_start(itr); @@ -488,22 +468,13 @@ vy_mem_iterator_next_key_impl(struct vy_mem_iterator *itr) return vy_mem_iterator_find_lsn(itr, itr->iterator_type, itr->key); } -NODISCARD int -vy_mem_iterator_next_key(struct vy_mem_iterator *itr, struct tuple **ret) -{ - *ret = NULL; - if (vy_mem_iterator_next_key_impl(itr) == 0) - return vy_mem_iterator_copy_to(itr, ret); - return 0; -} - /* * Find next (lower, older) record with the same key as current * @retval 0 Found * @retval 1 Not found */ static NODISCARD int -vy_mem_iterator_next_lsn_impl(struct vy_mem_iterator *itr) +vy_mem_iterator_next_lsn(struct vy_mem_iterator *itr) { assert(itr->search_started); if (!itr->curr_stmt) /* End of search. */ @@ -528,22 +499,45 @@ vy_mem_iterator_next_lsn_impl(struct vy_mem_iterator *itr) return 1; } +/** + * Append statements for the current key to a statement history + * until a terminal statement is found. Returns 0 on success, -1 + * on memory allocation error. + */ +static NODISCARD int +vy_mem_iterator_get_history(struct vy_mem_iterator *itr, + struct vy_history *history) +{ + do { + struct tuple *stmt = (struct tuple *)itr->curr_stmt; + vy_stmt_counter_acct_tuple(&itr->stat->get, stmt); + if (vy_history_append_stmt(history, stmt) != 0) + return -1; + if (vy_history_is_terminal(history)) + break; + } while (vy_mem_iterator_next_lsn(itr) == 0); + return 0; +} + NODISCARD int -vy_mem_iterator_next_lsn(struct vy_mem_iterator *itr, struct tuple **ret) +vy_mem_iterator_next(struct vy_mem_iterator *itr, + struct vy_history *history) { - *ret = NULL; - if (vy_mem_iterator_next_lsn_impl(itr) == 0) - return vy_mem_iterator_copy_to(itr, ret); + vy_history_cleanup(history); + if (vy_mem_iterator_next_key(itr) == 0) + return vy_mem_iterator_get_history(itr, history); return 0; } NODISCARD int vy_mem_iterator_skip(struct vy_mem_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret) + const struct tuple *last_stmt, + struct vy_history *history) { - *ret = NULL; assert(!itr->search_started || itr->version == itr->mem->version); + vy_history_cleanup(history); + const struct tuple *key = itr->key; enum iterator_type iterator_type = itr->iterator_type; if (last_stmt != NULL) { @@ -561,13 +555,14 @@ vy_mem_iterator_skip(struct vy_mem_iterator *itr, itr->curr_stmt = NULL; if (itr->curr_stmt != NULL) - return vy_mem_iterator_copy_to(itr, ret); + return vy_mem_iterator_get_history(itr, history); return 0; } NODISCARD int vy_mem_iterator_restore(struct vy_mem_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret) + const struct tuple *last_stmt, + struct vy_history *history) { if (!itr->search_started || itr->version == itr->mem->version) return 0; @@ -590,9 +585,9 @@ vy_mem_iterator_restore(struct vy_mem_iterator *itr, if (prev_stmt == itr->curr_stmt) return 0; - *ret = NULL; + vy_history_cleanup(history); if (itr->curr_stmt != NULL && - vy_mem_iterator_copy_to(itr, ret) < 0) + vy_mem_iterator_get_history(itr, history) != 0) return -1; return 1; } @@ -600,8 +595,6 @@ vy_mem_iterator_restore(struct vy_mem_iterator *itr, void vy_mem_iterator_close(struct vy_mem_iterator *itr) { - if (itr->last_stmt != NULL) - tuple_unref(itr->last_stmt); TRASH(itr); } diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h index c2917efb..a35b140f 100644 --- a/src/box/vy_mem.h +++ b/src/box/vy_mem.h @@ -50,6 +50,8 @@ extern "C" { #endif /* defined(__cplusplus) */ +struct vy_history; + /** Vinyl memory environment. */ struct vy_mem_env { struct lsregion allocator; @@ -349,12 +351,6 @@ struct vy_mem_iterator { * valid statement. */ const struct tuple *curr_stmt; - /* - * Copy of the statement returned from one of public methods - * (restore/next_lsn/next_key). Need to store the copy, because can't - * return region allocated curr_stmt. - */ - struct tuple *last_stmt; /* data version from vy_mem */ uint32_t version; @@ -371,29 +367,23 @@ vy_mem_iterator_open(struct vy_mem_iterator *itr, struct vy_mem_iterator_stat *s const struct tuple *key, const struct vy_read_view **rv); /** - * Advance a mem iterator to the newest statement for the next key. - * The statement is returned in @ret (NULL if EOF). - * Returns 0 on success, -1 on memory allocation error. - */ -NODISCARD int -vy_mem_iterator_next_key(struct vy_mem_iterator *itr, struct tuple **ret); - -/** - * Advance a mem iterator to the older statement for the same key. - * The statement is returned in @ret (NULL if EOF). + * Advance a mem iterator to the next key. + * The key history is returned in @history (empty if EOF). * Returns 0 on success, -1 on memory allocation error. */ NODISCARD int -vy_mem_iterator_next_lsn(struct vy_mem_iterator *itr, struct tuple **ret); +vy_mem_iterator_next(struct vy_mem_iterator *itr, + struct vy_history *history); /** - * Advance a mem iterator to the newest statement for the first key - * following @last_stmt. The statement is returned in @ret (NULL if EOF). + * Advance a mem iterator to the key following @last_stmt. + * The key history is returned in @history (empty if EOF). * Returns 0 on success, -1 on memory allocation error. */ NODISCARD int vy_mem_iterator_skip(struct vy_mem_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret); + const struct tuple *last_stmt, + struct vy_history *history); /** * Check if a mem iterator was invalidated and needs to be restored. @@ -403,7 +393,8 @@ vy_mem_iterator_skip(struct vy_mem_iterator *itr, */ NODISCARD int vy_mem_iterator_restore(struct vy_mem_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret); + const struct tuple *last_stmt, + struct vy_history *history); /** * Close a mem iterator. diff --git a/src/box/vy_point_lookup.c b/src/box/vy_point_lookup.c index b48a2332..91dc1cca 100644 --- a/src/box/vy_point_lookup.c +++ b/src/box/vy_point_lookup.c @@ -159,15 +159,12 @@ vy_point_lookup_scan_mems(struct vy_lsm *lsm, const struct vy_read_view **rv, /** * Scan one particular slice. * Add found statements to the history list up to terminal statement. - * Set *terminal_found to true if the terminal statement (DELETE or REPLACE) - * was found. */ static int vy_point_lookup_scan_slice(struct vy_lsm *lsm, struct vy_slice *slice, const struct vy_read_view **rv, struct tuple *key, - struct vy_history *history, bool *terminal_found) + struct vy_history *history) { - int rc = 0; /* * The format of the statement must be exactly the space * format with the same identifier to fully match the @@ -177,19 +174,10 @@ vy_point_lookup_scan_slice(struct vy_lsm *lsm, struct vy_slice *slice, vy_run_iterator_open(&run_itr, &lsm->stat.disk.iterator, slice, ITER_EQ, key, rv, lsm->cmp_def, lsm->key_def, lsm->disk_format, lsm->index_id == 0); - struct tuple *stmt; - rc = vy_run_iterator_next_key(&run_itr, &stmt); - while (rc == 0 && stmt != NULL) { - if (vy_history_append_stmt(history, stmt) != 0) { - rc = -1; - break; - } - if (vy_stmt_type(stmt) != IPROTO_UPSERT) { - *terminal_found = true; - break; - } - rc = vy_run_iterator_next_lsn(&run_itr, &stmt); - } + struct vy_history slice_history; + vy_history_create(&slice_history, &lsm->env->history_node_pool); + int rc = vy_run_iterator_next(&run_itr, &slice_history); + vy_history_splice(history, &slice_history); vy_run_iterator_close(&run_itr); return rc; } @@ -223,11 +211,10 @@ vy_point_lookup_scan_slices(struct vy_lsm *lsm, const struct vy_read_view **rv, } assert(i == slice_count); int rc = 0; - bool terminal_found = false; for (i = 0; i < slice_count; i++) { - if (rc == 0 && !terminal_found) + if (rc == 0 && !vy_history_is_terminal(history)) rc = vy_point_lookup_scan_slice(lsm, slices[i], - rv, key, history, &terminal_found); + rv, key, history); vy_slice_unpin(slices[i]); } return rc; @@ -291,7 +278,7 @@ done: if (rc == 0) { int upserts_applied; rc = vy_history_apply(&history, lsm->cmp_def, lsm->mem_format, - &upserts_applied, ret); + false, &upserts_applied, ret); lsm->stat.upsert.applied += upserts_applied; } vy_history_cleanup(&history); diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c index 0695eedb..b171aa60 100644 --- a/src/box/vy_read_iterator.c +++ b/src/box/vy_read_iterator.c @@ -34,7 +34,7 @@ #include "vy_cache.h" #include "vy_tx.h" #include "fiber.h" -#include "vy_upsert.h" +#include "vy_history.h" #include "vy_lsm.h" #include "vy_stat.h" @@ -54,8 +54,8 @@ struct vy_read_src { bool is_started; /** See vy_read_iterator->front_id. */ uint32_t front_id; - /** Statement the iterator is at. */ - struct tuple *stmt; + /** History of the key the iterator is positioned at. */ + struct vy_history history; }; /** @@ -74,10 +74,13 @@ vy_read_iterator_reserve(struct vy_read_iterator *itr, uint32_t capacity) "calloc", "new_src"); return -1; } - if (itr->src_count > 0) { - memcpy(new_src, itr->src, itr->src_count * sizeof(*new_src)); - free(itr->src); + memcpy(new_src, itr->src, itr->src_count * sizeof(*new_src)); + for (uint32_t i = 0; i < itr->src_count; i++) { + vy_history_create(&new_src[i].history, + &itr->lsm->env->history_node_pool); + vy_history_splice(&new_src[i].history, &itr->src[i].history); } + free(itr->src); itr->src = new_src; itr->src_capacity = capacity; return 0; @@ -94,9 +97,9 @@ vy_read_iterator_add_src(struct vy_read_iterator *itr) if (vy_read_iterator_reserve(itr, itr->src_count + 1) != 0) return NULL; } - itr->src[itr->src_count].front_id = 0; struct vy_read_src *src = &itr->src[itr->src_count++]; memset(src, 0, sizeof(*src)); + vy_history_create(&src->history, &itr->lsm->env->history_node_pool); return src; } @@ -221,14 +224,11 @@ vy_read_iterator_evaluate_src(struct vy_read_iterator *itr, struct vy_read_src *src, bool *stop) { uint32_t src_id = src - itr->src; - int cmp = vy_read_iterator_cmp_stmt(itr, src->stmt, itr->curr_stmt); + struct tuple *stmt = vy_history_last_stmt(&src->history); + int cmp = vy_read_iterator_cmp_stmt(itr, stmt, itr->curr_stmt); if (cmp < 0) { - assert(src->stmt != NULL); - tuple_ref(src->stmt); - if (itr->curr_stmt != NULL) - tuple_unref(itr->curr_stmt); - itr->curr_stmt = src->stmt; - itr->curr_src = src_id; + assert(stmt != NULL); + itr->curr_stmt = stmt; itr->front_id++; } if (cmp <= 0) @@ -236,7 +236,8 @@ vy_read_iterator_evaluate_src(struct vy_read_iterator *itr, itr->skipped_src = MAX(itr->skipped_src, src_id + 1); - if (cmp < 0 && vy_read_iterator_is_exact_match(itr, src->stmt)) { + if (cmp < 0 && vy_history_is_terminal(&src->history) && + vy_read_iterator_is_exact_match(itr, stmt)) { itr->skipped_src = src_id + 1; *stop = true; } @@ -254,14 +255,15 @@ vy_read_src_is_behind(struct vy_read_iterator *itr, struct vy_read_src *src) return true; if (src_id < itr->skipped_src) return false; - if (vy_read_iterator_cmp_stmt(itr, src->stmt, itr->last_stmt) > 0) + struct tuple *stmt = vy_history_last_stmt(&src->history); + if (vy_read_iterator_cmp_stmt(itr, stmt, itr->last_stmt) > 0) return false; return true; } /* * Each of the functions from the vy_read_iterator_scan_* family - * is used by vy_read_iterator_next_key() to: + * is used by vy_read_iterator_advance() to: * * 1. Update the position of a read source, which implies: * @@ -283,31 +285,36 @@ vy_read_src_is_behind(struct vy_read_iterator *itr, struct vy_read_src *src) * See also vy_read_iterator_evaluate_src(). */ -static void +static NODISCARD int vy_read_iterator_scan_txw(struct vy_read_iterator *itr, bool *stop) { struct vy_read_src *src = &itr->src[itr->txw_src]; struct vy_txw_iterator *src_itr = &src->txw_iterator; if (itr->tx == NULL) - return; + return 0; assert(itr->txw_src < itr->skipped_src); - int rc = vy_txw_iterator_restore(src_itr, itr->last_stmt, &src->stmt); + int rc = vy_txw_iterator_restore(src_itr, itr->last_stmt, + &src->history); if (rc == 0) { if (!src->is_started) { - vy_txw_iterator_skip(src_itr, itr->last_stmt, - &src->stmt); + rc = vy_txw_iterator_skip(src_itr, itr->last_stmt, + &src->history); } else if (src->front_id == itr->prev_front_id) { - vy_txw_iterator_next(src_itr, &src->stmt); + rc = vy_txw_iterator_next(src_itr, &src->history); } src->is_started = true; } + if (rc < 0) + return -1; + vy_read_iterator_evaluate_src(itr, src, stop); + return 0; } -static void +static NODISCARD int vy_read_iterator_scan_cache(struct vy_read_iterator *itr, bool *stop) { bool is_interval = false; @@ -315,23 +322,26 @@ vy_read_iterator_scan_cache(struct vy_read_iterator *itr, bool *stop) struct vy_cache_iterator *src_itr = &src->cache_iterator; int rc = vy_cache_iterator_restore(src_itr, itr->last_stmt, - &src->stmt, &is_interval); + &src->history, &is_interval); if (rc == 0) { if (vy_read_src_is_behind(itr, src)) { - vy_cache_iterator_skip(src_itr, itr->last_stmt, - &src->stmt, &is_interval); + rc = vy_cache_iterator_skip(src_itr, itr->last_stmt, + &src->history, &is_interval); } else if (src->front_id == itr->prev_front_id) { - vy_cache_iterator_next(src_itr, &src->stmt, - &is_interval); + rc = vy_cache_iterator_next(src_itr, &src->history, + &is_interval); } src->is_started = true; } - vy_read_iterator_evaluate_src(itr, src, stop); + if (rc < 0) + return -1; + vy_read_iterator_evaluate_src(itr, src, stop); if (is_interval) { itr->skipped_src = itr->cache_src + 1; *stop = true; } + return 0; } static NODISCARD int @@ -344,13 +354,13 @@ vy_read_iterator_scan_mem(struct vy_read_iterator *itr, assert(mem_src >= itr->mem_src && mem_src < itr->disk_src); - rc = vy_mem_iterator_restore(src_itr, itr->last_stmt, &src->stmt); + rc = vy_mem_iterator_restore(src_itr, itr->last_stmt, &src->history); if (rc == 0) { if (vy_read_src_is_behind(itr, src)) { rc = vy_mem_iterator_skip(src_itr, itr->last_stmt, - &src->stmt); + &src->history); } else if (src->front_id == itr->prev_front_id) { - rc = vy_mem_iterator_next_key(src_itr, &src->stmt); + rc = vy_mem_iterator_next(src_itr, &src->history); } src->is_started = true; } @@ -372,9 +382,10 @@ vy_read_iterator_scan_disk(struct vy_read_iterator *itr, assert(disk_src >= itr->disk_src && disk_src < itr->src_count); if (vy_read_src_is_behind(itr, src)) - rc = vy_run_iterator_skip(src_itr, itr->last_stmt, &src->stmt); + rc = vy_run_iterator_skip(src_itr, itr->last_stmt, + &src->history); else if (src->front_id == itr->prev_front_id) - rc = vy_run_iterator_next_key(src_itr, &src->stmt); + rc = vy_run_iterator_next(src_itr, &src->history); src->is_started = true; if (rc < 0) @@ -397,13 +408,14 @@ vy_read_iterator_restore_mem(struct vy_read_iterator *itr) struct vy_read_src *src = &itr->src[itr->mem_src]; rc = vy_mem_iterator_restore(&src->mem_iterator, - itr->last_stmt, &src->stmt); + itr->last_stmt, &src->history); if (rc < 0) return -1; /* memory allocation error */ if (rc == 0) return 0; /* nothing changed */ - cmp = vy_read_iterator_cmp_stmt(itr, src->stmt, itr->curr_stmt); + struct tuple *stmt = vy_history_last_stmt(&src->history); + cmp = vy_read_iterator_cmp_stmt(itr, stmt, itr->curr_stmt); if (cmp > 0) { /* * Memory trees are append-only so if the @@ -413,26 +425,21 @@ vy_read_iterator_restore_mem(struct vy_read_iterator *itr) assert(src->front_id < itr->front_id); return 0; } - if (cmp < 0 || itr->curr_src != itr->txw_src) { + if (cmp < 0) { /* * The new statement precedes the current - * candidate for the next key or it is a - * newer version of the same key. + * candidate for the next key. */ - tuple_ref(src->stmt); - if (itr->curr_stmt != NULL) - tuple_unref(itr->curr_stmt); - itr->curr_stmt = src->stmt; - itr->curr_src = itr->mem_src; + itr->curr_stmt = stmt; + itr->front_id++; } else { /* + * The new statement updates the next key. * Make sure we don't read the old value * from the cache while applying UPSERTs. */ itr->src[itr->cache_src].front_id = 0; } - if (cmp < 0) - itr->front_id++; src->front_id = itr->front_id; return 0; } @@ -444,12 +451,11 @@ static void vy_read_iterator_next_range(struct vy_read_iterator *itr); /** - * Iterate to the next key - * @retval 0 success or EOF (*ret == NULL) - * @retval -1 read error + * Advance the iterator to the next key. + * Returns 0 on success, -1 on error. */ static NODISCARD int -vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) +vy_read_iterator_advance(struct vy_read_iterator *itr) { if (itr->last_stmt != NULL && (itr->iterator_type == ITER_EQ || itr->iterator_type == ITER_REQ) && @@ -458,7 +464,7 @@ vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) * There may be one statement at max satisfying * EQ with a full key. */ - *ret = NULL; + itr->curr_stmt = NULL; return 0; } /* @@ -472,10 +478,7 @@ vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) vy_read_iterator_restore(itr); } restart: - if (itr->curr_stmt != NULL) - tuple_unref(itr->curr_stmt); itr->curr_stmt = NULL; - itr->curr_src = UINT32_MAX; itr->prev_front_id = itr->front_id; /* @@ -483,10 +486,12 @@ restart: * from the one that stores newest data. */ bool stop = false; - vy_read_iterator_scan_txw(itr, &stop); + if (vy_read_iterator_scan_txw(itr, &stop) != 0) + return -1; if (stop) goto done; - vy_read_iterator_scan_cache(itr, &stop); + if (vy_read_iterator_scan_cache(itr, &stop) != 0) + return -1; if (stop) goto done; @@ -562,138 +567,6 @@ done: vy_stmt_compare(itr->curr_stmt, itr->key, itr->lsm->cmp_def) != 0) itr->curr_stmt = NULL; - - *ret = itr->curr_stmt; - return 0; -} - -/** - * Iterate to the next (elder) version of the same key - * @retval 0 success or EOF (*ret == NULL) - * @retval -1 read error - */ -static NODISCARD int -vy_read_iterator_next_lsn(struct vy_read_iterator *itr, struct tuple **ret) -{ - uint32_t i; - bool unused; - struct vy_read_src *src; - - assert(itr->curr_stmt != NULL); - assert(itr->curr_src < itr->skipped_src); - - /* Cache stores only terminal statements. */ - assert(itr->curr_src != itr->cache_src); - - if (itr->curr_src == itr->txw_src) { - /* - * Write set does not store statement history. - * Look up the older statement in the cache and - * if it isn't there proceed to mems and runs. - */ - src = &itr->src[itr->cache_src]; - if (itr->cache_src >= itr->skipped_src) - vy_read_iterator_scan_cache(itr, &unused); - if (src->front_id == itr->front_id) - goto found; - } - - /* Look up the older statement in in-memory trees. */ - for (i = MAX(itr->curr_src, itr->mem_src); i < itr->disk_src; i++) { - src = &itr->src[i]; - if (i >= itr->skipped_src && - vy_read_iterator_scan_mem(itr, i, &unused) != 0) - return -1; - if (src->front_id != itr->front_id) - continue; - if (i == itr->curr_src && - vy_mem_iterator_next_lsn(&src->mem_iterator, - &src->stmt) != 0) - return -1; - if (src->stmt != NULL) - goto found; - } - - /* - * Look up the older statement in on-disk runs. - * - * Note, we don't need to check the LSM tree version after the yield - * caused by the disk read, because once we've come to this point, - * we won't read any source except run slices, which are pinned - * and hence cannot be removed during the yield. - */ - vy_read_iterator_pin_slices(itr); - for (i = MAX(itr->curr_src, itr->disk_src); i < itr->src_count; i++) { - src = &itr->src[i]; - if (i >= itr->skipped_src && - vy_read_iterator_scan_disk(itr, i, &unused) != 0) - goto err_disk; - if (src->front_id != itr->front_id) - continue; - if (i == itr->curr_src && - vy_run_iterator_next_lsn(&src->run_iterator, - &src->stmt) != 0) - goto err_disk; - if (src->stmt != NULL) - break; - } - vy_read_iterator_unpin_slices(itr); - - if (i < itr->src_count) - goto found; - - /* Searched everywhere, found nothing. */ - *ret = NULL; - return 0; -found: - tuple_ref(src->stmt); - if (itr->curr_stmt != NULL) - tuple_unref(itr->curr_stmt); - itr->curr_stmt = src->stmt; - itr->curr_src = src - itr->src; - *ret = itr->curr_stmt; - return 0; - -err_disk: - vy_read_iterator_unpin_slices(itr); - return -1; -} - -/** - * Squash in a single REPLACE all UPSERTs for the current key. - * - * @retval 0 success - * @retval -1 error - */ -static NODISCARD int -vy_read_iterator_squash_upsert(struct vy_read_iterator *itr, - struct tuple **ret) -{ - *ret = NULL; - struct vy_lsm *lsm = itr->lsm; - struct tuple *t = itr->curr_stmt; - - /* Upserts enabled only in the primary index LSM tree. */ - assert(vy_stmt_type(t) != IPROTO_UPSERT || lsm->index_id == 0); - tuple_ref(t); - while (vy_stmt_type(t) == IPROTO_UPSERT) { - struct tuple *next; - int rc = vy_read_iterator_next_lsn(itr, &next); - if (rc != 0) { - tuple_unref(t); - return rc; - } - struct tuple *applied = vy_apply_upsert(t, next, - lsm->cmp_def, lsm->mem_format, true); - lsm->stat.upsert.applied++; - tuple_unref(t); - if (applied == NULL) - return -1; - t = applied; - if (next == NULL) - break; - } - *ret = t; return 0; } @@ -792,25 +665,26 @@ vy_read_iterator_cleanup(struct vy_read_iterator *itr) if (itr->txw_src < itr->src_count) { src = &itr->src[itr->txw_src]; + vy_history_cleanup(&src->history); vy_txw_iterator_close(&src->txw_iterator); } if (itr->cache_src < itr->src_count) { src = &itr->src[itr->cache_src]; + vy_history_cleanup(&src->history); vy_cache_iterator_close(&src->cache_iterator); } for (i = itr->mem_src; i < itr->disk_src; i++) { src = &itr->src[i]; + vy_history_cleanup(&src->history); vy_mem_iterator_close(&src->mem_iterator); } for (i = itr->disk_src; i < itr->src_count; i++) { src = &itr->src[i]; + vy_history_cleanup(&src->history); vy_run_iterator_close(&src->run_iterator); } - if (itr->curr_stmt != NULL) - tuple_unref(itr->curr_stmt); itr->curr_stmt = NULL; - itr->curr_src = UINT32_MAX; itr->txw_src = UINT32_MAX; itr->cache_src = UINT32_MAX; itr->mem_src = UINT32_MAX; @@ -937,6 +811,41 @@ vy_read_iterator_next_range(struct vy_read_iterator *itr) } /** + * Get a resultant statement for the current key. + * Returns 0 on success, -1 on error. + */ +static NODISCARD int +vy_read_iterator_apply_history(struct vy_read_iterator *itr, + struct tuple **ret) +{ + if (itr->curr_stmt == NULL) { + *ret = NULL; + return 0; + } + + struct vy_lsm *lsm = itr->lsm; + struct vy_history history; + vy_history_create(&history, &lsm->env->history_node_pool); + + for (uint32_t i = 0; i < itr->src_count; i++) { + struct vy_read_src *src = &itr->src[i]; + if (src->front_id == itr->front_id) { + vy_history_splice(&history, &src->history); + if (vy_history_is_terminal(&history)) + break; + } + } + + int upserts_applied = 0; + int rc = vy_history_apply(&history, lsm->cmp_def, lsm->mem_format, + true, &upserts_applied, ret); + + lsm->stat.upsert.applied += upserts_applied; + vy_history_cleanup(&history); + return rc; +} + +/** * Track a read in the conflict manager. */ static int @@ -981,18 +890,11 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) else /* first iteration */ lsm->stat.lookup++; next_key: - if (vy_read_iterator_next_key(itr, &stmt) != 0) + if (vy_read_iterator_advance(itr) != 0) goto err; - - /* - * Fetching an older statement of the current key may yield - * so we must track the read before applying UPSERTs. - */ - if (vy_read_iterator_track_read(itr, stmt) != 0) + if (vy_read_iterator_apply_history(itr, &stmt) != 0) goto err; - - if (stmt != NULL && - vy_read_iterator_squash_upsert(itr, &stmt) != 0) + if (vy_read_iterator_track_read(itr, stmt) != 0) goto err; if (itr->last_stmt != NULL) diff --git a/src/box/vy_read_iterator.h b/src/box/vy_read_iterator.h index 6aa84540..07c6ef8f 100644 --- a/src/box/vy_read_iterator.h +++ b/src/box/vy_read_iterator.h @@ -92,8 +92,6 @@ struct vy_read_iterator { uint32_t src_count; /** Maximal capacity of the src array. */ uint32_t src_capacity; - /** Offset of the current merge source. */ - uint32_t curr_src; /** Statement returned by the current merge source. */ struct tuple *curr_stmt; /** Offset of the transaction write set source. */ diff --git a/src/box/vy_run.c b/src/box/vy_run.c index fa5670d3..587cb002 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -44,6 +44,7 @@ #include "tuple_compare.h" #include "xlog.h" #include "xrow.h" +#include "vy_history.h" static const uint64_t vy_page_info_key_map = (1 << VY_PAGE_INFO_OFFSET) | (1 << VY_PAGE_INFO_SIZE) | @@ -1401,7 +1402,12 @@ vy_run_iterator_open(struct vy_run_iterator *itr, itr->search_ended = false; } -NODISCARD int +/** + * Advance a run iterator to the newest statement for the next key. + * The statement is returned in @ret (NULL if EOF). + * Returns 0 on success, -1 on memory allocation or IO error. + */ +static NODISCARD int vy_run_iterator_next_key(struct vy_run_iterator *itr, struct tuple **ret) { *ret = NULL; @@ -1441,7 +1447,12 @@ vy_run_iterator_next_key(struct vy_run_iterator *itr, struct tuple **ret) return vy_run_iterator_find_lsn(itr, itr->iterator_type, itr->key, ret); } -NODISCARD int +/** + * Advance a run iterator to the newest statement for the first key + * following @last_stmt. The statement is returned in @ret (NULL if EOF). + * Returns 0 on success, -1 on memory allocation or IO error. + */ +static NODISCARD int vy_run_iterator_next_lsn(struct vy_run_iterator *itr, struct tuple **ret) { *ret = NULL; @@ -1478,10 +1489,30 @@ vy_run_iterator_next_lsn(struct vy_run_iterator *itr, struct tuple **ret) } NODISCARD int +vy_run_iterator_next(struct vy_run_iterator *itr, + struct vy_history *history) +{ + vy_history_cleanup(history); + struct tuple *stmt; + if (vy_run_iterator_next_key(itr, &stmt) != 0) + return -1; + while (stmt != NULL) { + if (vy_history_append_stmt(history, stmt) != 0) + return -1; + if (vy_history_is_terminal(history)) + break; + if (vy_run_iterator_next_lsn(itr, &stmt) != 0) + return -1; + } + return 0; +} + +NODISCARD int vy_run_iterator_skip(struct vy_run_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret) + const struct tuple *last_stmt, + struct vy_history *history) { - *ret = NULL; + vy_history_cleanup(history); if (itr->search_ended) return 0; @@ -1494,14 +1525,24 @@ vy_run_iterator_skip(struct vy_run_iterator *itr, } itr->search_started = true; - if (vy_run_iterator_seek(itr, iterator_type, key, ret) != 0) + struct tuple *stmt; + if (vy_run_iterator_seek(itr, iterator_type, key, &stmt) != 0) return -1; if (itr->iterator_type == ITER_EQ && last_stmt != NULL && - *ret != NULL && vy_stmt_compare(itr->key, *ret, + stmt != NULL && vy_stmt_compare(itr->key, stmt, itr->cmp_def) != 0) { vy_run_iterator_stop(itr); - *ret = NULL; + return 0; + } + + while (stmt != NULL) { + if (vy_history_append_stmt(history, stmt) != 0) + return -1; + if (vy_history_is_terminal(history)) + break; + if (vy_run_iterator_next_lsn(itr, &stmt) != 0) + return -1; } return 0; } diff --git a/src/box/vy_run.h b/src/box/vy_run.h index 85bb66a7..6551191b 100644 --- a/src/box/vy_run.h +++ b/src/box/vy_run.h @@ -49,6 +49,7 @@ extern "C" { #endif /* defined(__cplusplus) */ +struct vy_history; struct vy_run_reader; /** Part of vinyl environment for run read/write */ @@ -496,29 +497,23 @@ vy_run_iterator_open(struct vy_run_iterator *itr, struct tuple_format *format, bool is_primary); /** - * Advance a run iterator to the newest statement for the next key. - * The statement is returned in @ret (NULL if EOF). + * Advance a run iterator to the next key. + * The key history is returned in @history (empty if EOF). * Returns 0 on success, -1 on memory allocation or IO error. */ NODISCARD int -vy_run_iterator_next_key(struct vy_run_iterator *itr, struct tuple **ret); +vy_run_iterator_next(struct vy_run_iterator *itr, + struct vy_history *history); /** - * Advance a run iterator to the older statement for the same key. - * The statement is returned in @ret (NULL if EOF). - * Returns 0 on success, -1 on memory allocation or IO error. - */ -NODISCARD int -vy_run_iterator_next_lsn(struct vy_run_iterator *itr, struct tuple **ret); - -/** - * Advance a run iterator to the newest statement for the first key - * following @last_stmt. The statement is returned in @ret (NULL if EOF). + * Advance a run iterator to the key following @last_stmt. + * The key history is returned in @history (empty if EOF). * Returns 0 on success, -1 on memory allocation or IO error. */ NODISCARD int vy_run_iterator_skip(struct vy_run_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret); + const struct tuple *last_stmt, + struct vy_history *history); /** * Close a run iterator. diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c index 4812b794..04792ce7 100644 --- a/src/box/vy_tx.c +++ b/src/box/vy_tx.c @@ -55,6 +55,7 @@ #include "vy_stat.h" #include "vy_stmt.h" #include "vy_upsert.h" +#include "vy_history.h" #include "vy_read_set.h" #include "vy_read_view.h" @@ -942,10 +943,11 @@ vy_txw_iterator_seek(struct vy_txw_iterator *itr, itr->curr_txv = txv; } -void -vy_txw_iterator_next(struct vy_txw_iterator *itr, struct tuple **ret) +NODISCARD int +vy_txw_iterator_next(struct vy_txw_iterator *itr, + struct vy_history *history) { - *ret = NULL; + vy_history_cleanup(history); if (!itr->search_started) { itr->search_started = true; vy_txw_iterator_seek(itr, itr->iterator_type, itr->key); @@ -953,7 +955,7 @@ vy_txw_iterator_next(struct vy_txw_iterator *itr, struct tuple **ret) } assert(itr->version == itr->tx->write_set_version); if (itr->curr_txv == NULL) - return; + return 0; if (itr->iterator_type == ITER_LE || itr->iterator_type == ITER_LT) itr->curr_txv = write_set_prev(&itr->tx->write_set, itr->curr_txv); else @@ -966,19 +968,23 @@ vy_txw_iterator_next(struct vy_txw_iterator *itr, struct tuple **ret) itr->curr_txv = NULL; out: if (itr->curr_txv != NULL) { - *ret = itr->curr_txv->stmt; - vy_stmt_counter_acct_tuple(&itr->stat->get, *ret); + vy_stmt_counter_acct_tuple(&itr->stat->get, + itr->curr_txv->stmt); + return vy_history_append_stmt(history, itr->curr_txv->stmt); } + return 0; } -void +NODISCARD int vy_txw_iterator_skip(struct vy_txw_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret) + const struct tuple *last_stmt, + struct vy_history *history) { - *ret = NULL; assert(!itr->search_started || itr->version == itr->tx->write_set_version); + vy_history_cleanup(history); + const struct tuple *key = itr->key; enum iterator_type iterator_type = itr->iterator_type; if (last_stmt != NULL) { @@ -996,14 +1002,17 @@ vy_txw_iterator_skip(struct vy_txw_iterator *itr, itr->curr_txv = NULL; if (itr->curr_txv != NULL) { - *ret = itr->curr_txv->stmt; - vy_stmt_counter_acct_tuple(&itr->stat->get, *ret); + vy_stmt_counter_acct_tuple(&itr->stat->get, + itr->curr_txv->stmt); + return vy_history_append_stmt(history, itr->curr_txv->stmt); } + return 0; } -int +NODISCARD int vy_txw_iterator_restore(struct vy_txw_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret) + const struct tuple *last_stmt, + struct vy_history *history) { if (!itr->search_started || itr->version == itr->tx->write_set_version) return 0; @@ -1027,10 +1036,12 @@ vy_txw_iterator_restore(struct vy_txw_iterator *itr, if (prev_txv == itr->curr_txv) return 0; - *ret = NULL; + vy_history_cleanup(history); if (itr->curr_txv != NULL) { - *ret = itr->curr_txv->stmt; - vy_stmt_counter_acct_tuple(&itr->stat->get, *ret); + vy_stmt_counter_acct_tuple(&itr->stat->get, + itr->curr_txv->stmt); + if (vy_history_append_stmt(history, itr->curr_txv->stmt) != 0) + return -1; } return 1; } diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h index a59145be..9c35e2bd 100644 --- a/src/box/vy_tx.h +++ b/src/box/vy_tx.h @@ -55,6 +55,7 @@ struct tuple; struct tx_manager; struct vy_mem; struct vy_tx; +struct vy_history; /** Transaction state. */ enum tx_state { @@ -403,28 +404,34 @@ vy_txw_iterator_open(struct vy_txw_iterator *itr, const struct tuple *key); /** - * Advance a txw iterator to the next statement. - * The next statement is returned in @ret (NULL if EOF). + * Advance a txw iterator to the next key. + * The key history is returned in @history (empty if EOF). + * Returns 0 on success, -1 on memory allocation error. */ -void -vy_txw_iterator_next(struct vy_txw_iterator *itr, struct tuple **ret); +NODISCARD int +vy_txw_iterator_next(struct vy_txw_iterator *itr, + struct vy_history *history); /** - * Advance a txw iterator to the statement following @last_stmt. - * The statement is returned in @ret (NULL if EOF). + * Advance a txw iterator to the key following @last_stmt. + * The key history is returned in @history (empty if EOF). + * Returns 0 on success, -1 on memory allocation error. */ -void +NODISCARD int vy_txw_iterator_skip(struct vy_txw_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret); + const struct tuple *last_stmt, + struct vy_history *history); /** * Check if a txw iterator was invalidated and needs to be restored. - * If it does, set the iterator position to the statement following - * @last_stmt and return 1, otherwise return 0. + * If it does, set the iterator position to the first key following + * @last_stmt and return 1, otherwise return 0. Returns -1 on memory + * allocation error. */ int vy_txw_iterator_restore(struct vy_txw_iterator *itr, - const struct tuple *last_stmt, struct tuple **ret); + const struct tuple *last_stmt, + struct vy_history *history); /** * Close a txw iterator. diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index db270de6..9fc645a1 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -140,6 +140,8 @@ target_link_libraries(say.test core unit) set(ITERATOR_TEST_SOURCES vy_iterators_helper.c ${PROJECT_SOURCE_DIR}/src/box/vy_stmt.c + ${PROJECT_SOURCE_DIR}/src/box/vy_upsert.c + ${PROJECT_SOURCE_DIR}/src/box/vy_history.c ${PROJECT_SOURCE_DIR}/src/box/vy_mem.c ${PROJECT_SOURCE_DIR}/src/box/vy_cache.c) set(ITERATOR_TEST_LIBS core tuple xrow unit) diff --git a/test/unit/vy_cache.c b/test/unit/vy_cache.c index 6b543d85..5d296aa6 100644 --- a/test/unit/vy_cache.c +++ b/test/unit/vy_cache.c @@ -1,5 +1,7 @@ #include "trivia/util.h" #include "vy_iterators_helper.h" +#include "vy_history.h" +#include "fiber.h" #include "unit.h" const struct vy_stmt_template key_template = STMT_TEMPLATE(0, SELECT, vyend); @@ -19,6 +21,10 @@ test_basic() struct tuple *select_all = vy_new_simple_stmt(format, NULL, &key_template); + struct mempool history_node_pool; + mempool_create(&history_node_pool, cord_slab_cache(), + sizeof(struct vy_history_node)); + /* * Fill the cache with 3 chains. */ @@ -85,8 +91,11 @@ test_basic() /* Start iterator and make several steps. */ struct tuple *ret; bool unused; + struct vy_history history; + vy_history_create(&history, &history_node_pool); for (int i = 0; i < 4; ++i) - vy_cache_iterator_next(&itr, &ret, &unused); + vy_cache_iterator_next(&itr, &history, &unused); + ret = vy_history_last_stmt(&history); ok(vy_stmt_are_same(ret, &chain1[3], format, NULL), "next_key * 4"); @@ -107,14 +116,17 @@ test_basic() * must be chain1[1]. */ struct tuple *last_stmt = vy_new_simple_stmt(format, NULL, &chain1[0]); - ok(vy_cache_iterator_restore(&itr, last_stmt, &ret, &unused) >= 0, + ok(vy_cache_iterator_restore(&itr, last_stmt, &history, &unused) >= 0, "restore"); + ret = vy_history_last_stmt(&history); ok(vy_stmt_are_same(ret, &chain1[1], format, NULL), "restore on position after last"); tuple_unref(last_stmt); + vy_history_cleanup(&history); vy_cache_iterator_close(&itr); + mempool_destroy(&history_node_pool); tuple_unref(select_all); destroy_test_cache(&cache, key_def, format); check_plan(); diff --git a/test/unit/vy_mem.c b/test/unit/vy_mem.c index 60fcf084..6641dc0c 100644 --- a/test/unit/vy_mem.c +++ b/test/unit/vy_mem.c @@ -1,6 +1,7 @@ #include #include "memory.h" #include "fiber.h" +#include "vy_history.h" #include "vy_iterators_helper.h" static void @@ -91,6 +92,10 @@ test_iterator_restore_after_insertion() struct tuple *select_key = vy_stmt_new_select(format, "", 0); + struct mempool history_node_pool; + mempool_create(&history_node_pool, cord_slab_cache(), + sizeof(struct vy_history_node)); + uint64_t restore_on_value = 20; uint64_t restore_on_value_reverse = 60; char data[16]; @@ -189,7 +194,10 @@ test_iterator_restore_after_insertion() direct ? ITER_GE : ITER_LE, select_key, &prv); struct tuple *t; - int rc = vy_mem_iterator_next_key(&itr, &t); + struct vy_history history; + vy_history_create(&history, &history_node_pool); + int rc = vy_mem_iterator_next(&itr, &history); + t = vy_history_last_stmt(&history); assert(rc == 0); size_t j = 0; while (t != NULL) { @@ -209,7 +217,8 @@ test_iterator_restore_after_insertion() break; else if(!direct && val <= middle_value) break; - int rc = vy_mem_iterator_next_key(&itr, &t); + int rc = vy_mem_iterator_next(&itr, &history); + t = vy_history_last_stmt(&history); assert(rc == 0); } if (t == NULL && j != expected_count) @@ -261,9 +270,10 @@ test_iterator_restore_after_insertion() } if (direct) - rc = vy_mem_iterator_restore(&itr, restore_on_key, &t); + rc = vy_mem_iterator_restore(&itr, restore_on_key, &history); else - rc = vy_mem_iterator_restore(&itr, restore_on_key_reverse, &t); + rc = vy_mem_iterator_restore(&itr, restore_on_key_reverse, &history); + t = vy_history_last_stmt(&history); j = 0; while (t != NULL) { @@ -279,7 +289,8 @@ test_iterator_restore_after_insertion() break; } j++; - int rc = vy_mem_iterator_next_key(&itr, &t); + int rc = vy_mem_iterator_next(&itr, &history); + t = vy_history_last_stmt(&history); assert(rc == 0); } if (j != expected_count) @@ -289,6 +300,7 @@ test_iterator_restore_after_insertion() break; } + vy_history_cleanup(&history); vy_mem_delete(mem); lsregion_gc(&lsregion, 2); } @@ -296,6 +308,8 @@ test_iterator_restore_after_insertion() ok(!wrong_output, "check wrong_output %d", i_fail); /* Clean up */ + mempool_destroy(&history_node_pool); + tuple_unref(select_key); tuple_unref(restore_on_key); tuple_unref(restore_on_key_reverse); diff --git a/test/vinyl/upsert.result b/test/vinyl/upsert.result index cd6070a6..76728024 100644 --- a/test/vinyl/upsert.result +++ b/test/vinyl/upsert.result @@ -684,15 +684,15 @@ box.snapshot() --- - ok ... -s:upsert({1, 1}, {{'+', 1, 1}}) +s:upsert({1, 1}, {{'+', 1, 1}}) -- ignored due to primary key changed --- ... -s:upsert({1, 1}, {{'+', 2, 1}}) +s:upsert({1, 1}, {{'+', 2, 1}}) -- applied to the previous statement --- ... -s:select() --both upserts are ignored due to primary key change +s:select() --- -- - [1, 1] +- - [1, 2] ... -- -- gh-2520 use cache as a hint when applying upserts. diff --git a/test/vinyl/upsert.test.lua b/test/vinyl/upsert.test.lua index 50315ed3..a16d2cf2 100644 --- a/test/vinyl/upsert.test.lua +++ b/test/vinyl/upsert.test.lua @@ -276,9 +276,9 @@ i = s:create_index('test', { run_count_per_level = 20 }) s:replace({1, 1}) box.snapshot() -s:upsert({1, 1}, {{'+', 1, 1}}) -s:upsert({1, 1}, {{'+', 2, 1}}) -s:select() --both upserts are ignored due to primary key change +s:upsert({1, 1}, {{'+', 1, 1}}) -- ignored due to primary key changed +s:upsert({1, 1}, {{'+', 2, 1}}) -- applied to the previous statement +s:select() -- -- gh-2520 use cache as a hint when applying upserts. -- 2.11.0