From: Vladimir Davydov <vdavydov.dev@gmail.com> To: kostja@tarantool.org Cc: tarantool-patches@freelists.org Subject: [PATCH] vinyl: flush transactions before setting trigger on altered space Date: Wed, 1 Aug 2018 13:44:10 +0300 [thread overview] Message-ID: <44e0fcd4d14ae056258c8a55a2b68caafac4f356.1533119343.git.vdavydov.dev@gmail.com> (raw) When building a new index or checking a space format, we propagate changes done to the space during the procedure with the aid of an on_replace trigger. The problem is there may be transactions with a non-empty write set when we install the trigger. Changes done by those transactions will not be seen by the trigger and so they won't make it to the new index, resulting in an inconsistency between the primary and secondary indexes. To fix this issue, let's flush all active transactions after installing the trigger. If a transaction fails to commit or rollback within box.cfg.vinyl_timeout seconds, DDL simply aborts it. Closes #3458 --- src/box/vinyl.c | 28 ++++--- src/box/vy_lsm.h | 11 +++ src/box/vy_tx.c | 76 +++++++++++++++++++ src/box/vy_tx.h | 36 +++++++++ test/vinyl/ddl.result | 1 - test/vinyl/ddl.test.lua | 1 - test/vinyl/errinj.result | 181 +++++++++++++++++++++++++++++++++++++++++++++ test/vinyl/errinj.test.lua | 82 ++++++++++++++++++++ test/vinyl/misc.result | 5 ++ test/vinyl/misc.test.lua | 4 + 10 files changed, 414 insertions(+), 11 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 5da1c4bc..ce211c47 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1084,9 +1084,16 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format) trigger_create(&on_replace, vy_check_format_on_replace, &ctx, NULL); trigger_add(&space->on_replace, &on_replace); + /* + * Wait for all transactions that began before we installed + * the on_replace trigger so that we don't miss their write + * set while checking the space format. + */ + tx_manager_flush(env->xm, env->timeout); + struct vy_read_iterator itr; vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key, - &env->xm->p_global_read_view); + &env->xm->p_committed_read_view); int rc; int loops = 0; struct tuple *tuple; @@ -4158,10 +4165,8 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, { struct vy_env *env = vy_env(src_space->engine); struct vy_lsm *pk = vy_lsm(src_space->index[0]); - bool is_empty = (pk->stat.disk.count.rows == 0 && - pk->stat.memory.count.rows == 0); - if (new_index->def->iid == 0 && !is_empty) { + if (new_index->def->iid == 0 && !vy_lsm_is_empty(pk)) { diag_set(ClientError, ER_UNSUPPORTED, "Vinyl", "rebuilding the primary index of a non-empty space"); return -1; @@ -4178,9 +4183,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, env->status == VINYL_FINAL_RECOVERY_LOCAL) return vy_build_recover(env, new_lsm, pk); - if (is_empty) - return 0; - /* * Iterate over all tuples stored in the space and insert * each of them into the new LSM tree. Since read iterator @@ -4203,6 +4205,13 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL); trigger_add(&src_space->on_replace, &on_replace); + /* + * Wait for all transactions that began before we installed + * the on_replace trigger so that we don't miss their write + * set while building the new index. + */ + tx_manager_flush(env->xm, env->timeout); + struct vy_read_iterator itr; vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key, &env->xm->p_committed_read_view); @@ -4253,9 +4262,10 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, /* * Dump the new index upon build completion so that we don't - * have to rebuild it on recovery. + * have to rebuild it on recovery. No need to trigger dump if + * the space happens to be empty. */ - if (rc == 0) + if (rc == 0 && !vy_lsm_is_empty(new_lsm)) rc = vy_scheduler_dump(&env->scheduler); if (rc == 0 && ctx.is_failed) { diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h index f0b7ec9c..08d30530 100644 --- a/src/box/vy_lsm.h +++ b/src/box/vy_lsm.h @@ -311,6 +311,17 @@ void vy_lsm_delete(struct vy_lsm *lsm); /** + * Return true if the LSM tree has no statements, neither on disk + * nor in memory. + */ +static inline bool +vy_lsm_is_empty(struct vy_lsm *lsm) +{ + return (lsm->stat.disk.count.rows == 0 && + lsm->stat.memory.count.rows == 0); +} + +/** * Increment the reference counter of an LSM tree. * An LSM tree cannot be deleted if its reference * counter is elevated. diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c index 5c186b87..9ff03d94 100644 --- a/src/box/vy_tx.c +++ b/src/box/vy_tx.c @@ -42,6 +42,7 @@ #include "diag.h" #include "errcode.h" #include "fiber.h" +#include "fiber_cond.h" #include "iproto_constants.h" #include "iterator_type.h" #include "salad/stailq.h" @@ -106,6 +107,7 @@ tx_manager_new(void) return NULL; } + rlist_create(&xm->write_list); rlist_create(&xm->read_views); vy_global_read_view_create((struct vy_read_view *)&xm->global_read_view, INT64_MAX); @@ -121,6 +123,8 @@ tx_manager_new(void) sizeof(struct vy_read_interval)); mempool_create(&xm->read_view_mempool, slab_cache, sizeof(struct vy_read_view)); + + fiber_cond_create(&xm->cond); return xm; } @@ -134,6 +138,59 @@ tx_manager_delete(struct tx_manager *xm) free(xm); } +/** + * Wait until all transactions with the write identifier equal + * to the given one or older are completed or the timeout occurs. + */ +static int +tx_manager_wait_write(struct tx_manager *xm, + int64_t write_id, double timeout) +{ + double deadline = ev_monotonic_now(loop()) + timeout; + do { + if (rlist_empty(&xm->write_list)) + return 0; + struct vy_tx *tx = rlist_first_entry(&xm->write_list, + struct vy_tx, in_write_list); + if (tx->write_id > write_id) + return 0; + } while (fiber_cond_wait_deadline(&xm->cond, deadline) == 0); + return -1; +} + +void +tx_manager_flush(struct tx_manager *xm, double timeout) +{ + /* + * Wait for transactions started before this point + * to complete within the specified timeout. + */ + int64_t write_id = xm->last_write_id; + if (tx_manager_wait_write(xm, write_id, timeout) == 0) + return; /* all transactions have completed */ + + /* + * Abort transactions that failed to complete within + * the specified timeout. Note, transactions that have + * been submitted to WAL can't be aborted. + */ + struct vy_tx *tx, *next_tx; + rlist_foreach_entry_safe(tx, &xm->write_list, in_write_list, next_tx) { + if (tx->write_id > write_id) + break; + if (tx->state == VINYL_TX_COMMIT) + continue; + assert(tx->state == VINYL_TX_READY); + tx->state = VINYL_TX_ABORT; + rlist_del_entry(tx, in_write_list); + } + fiber_cond_broadcast(&xm->cond); + + /* Wait for transactions awaiting WAL write. */ + if (tx_manager_wait_write(xm, write_id, TIMEOUT_INFINITY) != 0) + unreachable(); +} + /** Create or reuse an instance of a read view. */ static struct vy_read_view * tx_manager_read_view(struct tx_manager *xm) @@ -288,11 +345,15 @@ vy_tx_create(struct tx_manager *xm, struct vy_tx *tx) vy_tx_read_set_new(&tx->read_set); tx->psn = 0; rlist_create(&tx->on_destroy); + tx->write_id = -1; + rlist_create(&tx->in_write_list); } void vy_tx_destroy(struct vy_tx *tx) { + assert(rlist_empty(&tx->in_write_list)); + trigger_run(&tx->on_destroy, NULL); trigger_destroy(&tx->on_destroy); @@ -346,7 +407,9 @@ vy_tx_send_to_read_view(struct vy_tx *tx, struct txv *v) if (rv == NULL) return -1; abort->read_view = rv; + rlist_del_entry(abort, in_write_list); } + fiber_cond_broadcast(&tx->xm->cond); return 0; } @@ -368,7 +431,9 @@ vy_tx_abort_readers(struct vy_tx *tx, struct txv *v) if (abort->state != VINYL_TX_READY) continue; abort->state = VINYL_TX_ABORT; + rlist_del_entry(abort, in_write_list); } + fiber_cond_broadcast(&tx->xm->cond); } struct vy_tx * @@ -594,6 +659,9 @@ vy_tx_commit(struct vy_tx *tx, int64_t lsn) if (tx->read_view != &xm->global_read_view) tx->read_view->vlsn = lsn; out: + rlist_del_entry(tx, in_write_list); + fiber_cond_broadcast(&xm->cond); + vy_tx_destroy(tx); mempool_free(&xm->tx_mempool, tx); } @@ -656,6 +724,9 @@ vy_tx_rollback(struct vy_tx *tx) if (tx->state == VINYL_TX_COMMIT) vy_tx_rollback_after_prepare(tx); + rlist_del_entry(tx, in_write_list); + fiber_cond_broadcast(&xm->cond); + vy_tx_destroy(tx); mempool_free(&xm->tx_mempool, tx); } @@ -852,6 +923,11 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt) tx->write_size += tuple_size(stmt); vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, stmt); stailq_add_tail_entry(&tx->log, v, next_in_log); + + if (tx->write_id < 0) { + tx->write_id = ++tx->xm->last_write_id; + rlist_add_tail_entry(&tx->xm->write_list, tx, in_write_list); + } return 0; } diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h index dcf6a739..1a157299 100644 --- a/src/box/vy_tx.h +++ b/src/box/vy_tx.h @@ -39,6 +39,7 @@ #include <small/rb.h> #include <small/rlist.h> +#include "fiber_cond.h" #include "iterator_type.h" #include "salad/stailq.h" #include "trivia/util.h" @@ -136,6 +137,14 @@ struct vy_tx { /** Transaction manager. */ struct tx_manager *xm; /** + * Unique, monotonically growing identifier assigned + * to a transaction when it performs its first write + * operation. See also tx_manager::last_write_id. + */ + int64_t write_id; + /** Link in tx_manager::write_list. */ + struct rlist in_write_list; + /** * In memory transaction log. Contains both reads * and writes. */ @@ -209,6 +218,19 @@ struct tx_manager { */ struct vy_tx *last_prepared_tx; /** + * Write identifier assigned to the most recent read-write + * transaction. See also vy_tx::write_id. + */ + int64_t last_write_id; + /** + * List of all transactions that have non-empty write list + * and may actually commit (i.e. haven't been sent to read + * view or aborted). Newer transactions (with greater write + * identifiers) are closer to the tail of the list. + * Linked by vy_tx::in_write_list. + */ + struct rlist write_list; + /** * The list of TXs with a read view in order of vlsn. */ struct rlist read_views; @@ -238,6 +260,12 @@ struct tx_manager { * transaction to use in such places. */ const struct vy_read_view *p_committed_read_view; + /** + * Condition variable signaled whenever a transaction + * is committed, rolled back, aborted, or sent to read + * view. + */ + struct fiber_cond cond; /** Transaction statistics. */ struct vy_tx_stat stat; /** Sum size of statements pinned by the write set. */ @@ -262,6 +290,14 @@ tx_manager_new(void); void tx_manager_delete(struct tx_manager *xm); +/** + * Wait until all currently active truncactions have been + * committed or rolled back. Abort transactions that have + * not completed within the specified timeout. + */ +void +tx_manager_flush(struct tx_manager *xm, double timeout); + /** Initialize a tx object. */ void vy_tx_create(struct tx_manager *xm, struct vy_tx *tx); diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result index 3e65e232..bb4fc984 100644 --- a/test/vinyl/ddl.result +++ b/test/vinyl/ddl.result @@ -656,7 +656,6 @@ last_val = 1000; --- ... function gen_load() - fiber.sleep(0.001) local s = box.space.test for i = 1, 200 do local op = math.random(4) diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua index 45c5cf8e..f7bf03bb 100644 --- a/test/vinyl/ddl.test.lua +++ b/test/vinyl/ddl.test.lua @@ -253,7 +253,6 @@ box.commit(); last_val = 1000; function gen_load() - fiber.sleep(0.001) local s = box.space.test for i = 1, 200 do local op = math.random(4) diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result index 28271fc9..fe3af69f 100644 --- a/test/vinyl/errinj.result +++ b/test/vinyl/errinj.result @@ -1844,3 +1844,184 @@ s.index.sk:stat().memory.rows s:drop() --- ... +-- +-- gh-3458: the write set of a transaction started before DDL and +-- committed during DDL is taken into account by space format check +-- and index build. +-- +fiber = require('fiber') +--- +... +errinj = box.error.injection +--- +... +vinyl_cache = box.cfg.vinyl_cache +--- +... +box.cfg{vinyl_cache = 0} +--- +... +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +_ = s:create_index('pk', {page_size = 16}) +--- +... +pad = string.rep('x', 16) +--- +... +for i = 101, 200 do s:replace{i, i, pad} end +--- +... +box.snapshot() +--- +- ok +... +ch = fiber.channel(1) +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function async_replace(tuple, timeout) + fiber.create(function() + box.begin() + s:replace(tuple) + fiber.sleep(timeout) + local status = pcall(box.commit) + ch:put(status) + end) +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +async_replace({1}, 0.01) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +--- +- ok +... +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +--- +- error: Tuple field count 1 is less than required by space format or defined indexes + (expected at least 2) +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) +--- +- ok +... +ch:get() +--- +- true +... +s:get{1} ~= nil +--- +- true +... +s:delete{1} +--- +... +s:format{} +--- +... +async_replace({1, 1}, 0.01) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +--- +- ok +... +_ = s:create_index('sk', {parts = {2, 'unsigned'}}) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) +--- +- ok +... +ch:get() +--- +- true +... +s:get{1} ~= nil +--- +- true +... +s.index.pk:count() == s.index.sk:count() +--- +- true +... +s.index.sk:drop() +--- +... +-- Transactions that failed to complete within timeout are aborted. +vinyl_timeout = box.cfg.vinyl_timeout +--- +... +box.cfg{vinyl_timeout = 0.001} +--- +... +async_replace({2}, 0.01) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +--- +- ok +... +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) +--- +- ok +... +ch:get() +--- +- false +... +s:get{2} == nil +--- +- true +... +s:format{} +--- +... +s:truncate() +--- +... +-- Transactions that reached WAL are never aborted. +errinj.set('ERRINJ_WAL_DELAY', true) +--- +- ok +... +_ = fiber.create(function() s:replace{1} end) +--- +... +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) +--- +... +fiber.sleep(0) +--- +... +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +--- +- error: Tuple field count 1 is less than required by space format or defined indexes + (expected at least 2) +... +s:select() +--- +- - [1] +... +s:drop() +--- +... +box.cfg{vinyl_timeout = vinyl_timeout} +--- +... +box.cfg{vinyl_cache = vinyl_cache} +--- +... diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua index 000067d3..0a872bc9 100644 --- a/test/vinyl/errinj.test.lua +++ b/test/vinyl/errinj.test.lua @@ -736,3 +736,85 @@ s.index.sk:select() s.index.sk:stat().memory.rows s:drop() + +-- +-- gh-3458: the write set of a transaction started before DDL and +-- committed during DDL is taken into account by space format check +-- and index build. +-- +fiber = require('fiber') +errinj = box.error.injection + +vinyl_cache = box.cfg.vinyl_cache +box.cfg{vinyl_cache = 0} + +s = box.schema.space.create('test', {engine = 'vinyl'}) +_ = s:create_index('pk', {page_size = 16}) + +pad = string.rep('x', 16) +for i = 101, 200 do s:replace{i, i, pad} end +box.snapshot() + +ch = fiber.channel(1) + +test_run:cmd("setopt delimiter ';'") +function async_replace(tuple, timeout) + fiber.create(function() + box.begin() + s:replace(tuple) + fiber.sleep(timeout) + local status = pcall(box.commit) + ch:put(status) + end) +end; +test_run:cmd("setopt delimiter ''"); + +async_replace({1}, 0.01) + +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) + +ch:get() +s:get{1} ~= nil +s:delete{1} +s:format{} + +async_replace({1, 1}, 0.01) + +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +_ = s:create_index('sk', {parts = {2, 'unsigned'}}) +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) + +ch:get() +s:get{1} ~= nil +s.index.pk:count() == s.index.sk:count() +s.index.sk:drop() + +-- Transactions that failed to complete within timeout are aborted. +vinyl_timeout = box.cfg.vinyl_timeout +box.cfg{vinyl_timeout = 0.001} + +async_replace({2}, 0.01) + +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) + +ch:get() +s:get{2} == nil +s:format{} +s:truncate() + +-- Transactions that reached WAL are never aborted. +errinj.set('ERRINJ_WAL_DELAY', true) +_ = fiber.create(function() s:replace{1} end) +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) + +fiber.sleep(0) +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +s:select() + +s:drop() +box.cfg{vinyl_timeout = vinyl_timeout} +box.cfg{vinyl_cache = vinyl_cache} diff --git a/test/vinyl/misc.result b/test/vinyl/misc.result index 409df3e0..d2a7ca68 100644 --- a/test/vinyl/misc.result +++ b/test/vinyl/misc.result @@ -140,6 +140,11 @@ i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}}) --- ... +-- space.create_index() does a lookup in the primary index +-- so reset the stats before calling space.insert(). +box.stat.reset() +--- +... s:insert{1, 1, 1, 1, 1, 1} --- - [1, 1, 1, 1, 1, 1] diff --git a/test/vinyl/misc.test.lua b/test/vinyl/misc.test.lua index 3dbfdb54..9f61ca0a 100644 --- a/test/vinyl/misc.test.lua +++ b/test/vinyl/misc.test.lua @@ -62,6 +62,10 @@ i5 = s:create_index('i5', {unique = true, parts = {4, 'unsigned', 5, 'unsigned', i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', 5, 'unsigned'}}) i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}}) +-- space.create_index() does a lookup in the primary index +-- so reset the stats before calling space.insert(). +box.stat.reset() + s:insert{1, 1, 1, 1, 1, 1} i1:stat().lookup -- 1 -- 2.11.0
next reply other threads:[~2018-08-01 10:44 UTC|newest] Thread overview: 5+ messages / expand[flat|nested] mbox.gz Atom feed top 2018-08-01 10:44 Vladimir Davydov [this message] 2018-08-03 15:17 ` Vladimir Davydov 2018-08-03 15:21 ` [PATCH] vinyl: abort rw transactions before DDL Vladimir Davydov 2018-08-23 21:10 ` Konstantin Osipov 2018-08-24 8:28 ` Vladimir Davydov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=44e0fcd4d14ae056258c8a55a2b68caafac4f356.1533119343.git.vdavydov.dev@gmail.com \ --to=vdavydov.dev@gmail.com \ --cc=kostja@tarantool.org \ --cc=tarantool-patches@freelists.org \ --subject='Re: [PATCH] vinyl: flush transactions before setting trigger on altered space' \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox