From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH v2] vinyl: abort rw transactions before DDL Date: Fri, 24 Aug 2018 13:46:53 +0300 Message-Id: To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: When building a new index or checking a space format, we propagate changes done to the space during the procedure with the aid of an on_replace trigger. The problem is there may be transactions with a non-empty write set when we install the trigger. Changes done by those transactions will not be seen by the trigger and so they won't make it to the new index, resulting in an inconsistency between the primary and secondary indexes. To fix this issue, let's abort all rw transactions affecting the space after setting the trigger. Note, transactions that reached WAL can't be aborted so we wait for them to complete. Closes #3458 --- https://github.com/tarantool/tarantool/issues/3458 https://github.com/tarantool/tarantool/commits/dv/gh-3458-vy-abort-tx-before-ddl Changes in v2: - Abort only transactions that affect the altered space. - Maintain the list of rw transactions, not all transactions. - Add some functional tests. v1: https://www.freelists.org/post/tarantool-patches/PATCH-vinyl-abort-rw-transactions-before-DDL src/box/vinyl.c | 52 +++++++--- src/box/vy_lsm.h | 11 ++ src/box/vy_tx.c | 24 +++++ src/box/vy_tx.h | 15 +++ test/vinyl/ddl.result | 1 - test/vinyl/ddl.test.lua | 1 - test/vinyl/deferred_delete.result | 5 + test/vinyl/deferred_delete.test.lua | 3 + test/vinyl/errinj.result | 196 ++++++++++++++++++++++++++++++++++++ test/vinyl/errinj.test.lua | 85 ++++++++++++++++ test/vinyl/misc.result | 5 + test/vinyl/misc.test.lua | 4 + 12 files changed, 387 insertions(+), 15 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index fb121402..798a37f8 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1017,6 +1017,29 @@ vinyl_space_prepare_alter(struct space *old_space, struct space *new_space) return 0; } +/** + * This function is called after installing on_replace trigger + * used for propagating changes done during DDL. It aborts all + * rw transactions affecting the given LSM tree that began + * before the trigger was installed so that DDL doesn't miss + * their working set. + */ +static int +vy_abort_writers_for_ddl(struct vy_env *env, struct vy_lsm *lsm) +{ + if (tx_manager_abort_writers(env->xm, lsm) != 0) + return -1; + /* + * Wait for prepared transactions to complete + * (we can't abort them as they reached WAL). + */ + struct vclock unused; + if (wal_checkpoint(&unused, false) != 0) + return -1; + + return 0; +} + /** Argument passed to vy_check_format_on_replace(). */ struct vy_check_format_ctx { /** Format to check new tuples against. */ @@ -1087,10 +1110,13 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format) trigger_create(&on_replace, vy_check_format_on_replace, &ctx, NULL); trigger_add(&space->on_replace, &on_replace); + int rc = vy_abort_writers_for_ddl(env, pk); + if (rc != 0) + goto out; + struct vy_read_iterator itr; vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key, - &env->xm->p_global_read_view); - int rc; + &env->xm->p_committed_read_view); int loops = 0; struct tuple *tuple; while ((rc = vy_read_iterator_next(&itr, &tuple)) == 0) { @@ -1114,6 +1140,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format) break; } vy_read_iterator_close(&itr); +out: diag_destroy(&ctx.diag); trigger_clear(&on_replace); tuple_unref(key); @@ -4190,10 +4217,8 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, { struct vy_env *env = vy_env(src_space->engine); struct vy_lsm *pk = vy_lsm(src_space->index[0]); - bool is_empty = (pk->stat.disk.count.rows == 0 && - pk->stat.memory.count.rows == 0); - if (new_index->def->iid == 0 && !is_empty) { + if (new_index->def->iid == 0 && !vy_lsm_is_empty(pk)) { diag_set(ClientError, ER_UNSUPPORTED, "Vinyl", "rebuilding the primary index of a non-empty space"); return -1; @@ -4210,9 +4235,6 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, env->status == VINYL_FINAL_RECOVERY_LOCAL) return vy_build_recover(env, new_lsm, pk); - if (is_empty) - return 0; - /* * Iterate over all tuples stored in the space and insert * each of them into the new LSM tree. Since read iterator @@ -4234,10 +4256,13 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, trigger_create(&on_replace, vy_build_on_replace, &ctx, NULL); trigger_add(&src_space->on_replace, &on_replace); + int rc = vy_abort_writers_for_ddl(env, pk); + if (rc != 0) + goto out; + struct vy_read_iterator itr; vy_read_iterator_open(&itr, pk, NULL, ITER_ALL, key, &env->xm->p_committed_read_view); - int rc; int loops = 0; struct tuple *tuple; int64_t build_lsn = env->xm->lsn; @@ -4280,22 +4305,23 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index, } } vy_read_iterator_close(&itr); - tuple_unref(key); /* * Dump the new index upon build completion so that we don't - * have to rebuild it on recovery. + * have to rebuild it on recovery. No need to trigger dump if + * the space happens to be empty. */ - if (rc == 0) + if (rc == 0 && !vy_lsm_is_empty(new_lsm)) rc = vy_scheduler_dump(&env->scheduler); if (rc == 0 && ctx.is_failed) { diag_move(&ctx.diag, diag_get()); rc = -1; } - +out: diag_destroy(&ctx.diag); trigger_clear(&on_replace); + tuple_unref(key); return rc; } diff --git a/src/box/vy_lsm.h b/src/box/vy_lsm.h index d2aa0c43..6917d475 100644 --- a/src/box/vy_lsm.h +++ b/src/box/vy_lsm.h @@ -316,6 +316,17 @@ void vy_lsm_delete(struct vy_lsm *lsm); /** + * Return true if the LSM tree has no statements, neither on disk + * nor in memory. + */ +static inline bool +vy_lsm_is_empty(struct vy_lsm *lsm) +{ + return (lsm->stat.disk.count.rows == 0 && + lsm->stat.memory.count.rows == 0); +} + +/** * Increment the reference counter of an LSM tree. * An LSM tree cannot be deleted if its reference * counter is elevated. diff --git a/src/box/vy_tx.c b/src/box/vy_tx.c index 590b4483..1e8775a0 100644 --- a/src/box/vy_tx.c +++ b/src/box/vy_tx.c @@ -108,6 +108,7 @@ tx_manager_new(void) return NULL; } + rlist_create(&xm->writers); rlist_create(&xm->read_views); vy_global_read_view_create((struct vy_read_view *)&xm->global_read_view, INT64_MAX); @@ -290,6 +291,7 @@ vy_tx_create(struct tx_manager *xm, struct vy_tx *tx) vy_tx_read_set_new(&tx->read_set); tx->psn = 0; rlist_create(&tx->on_destroy); + rlist_create(&tx->in_writers); } void @@ -308,6 +310,7 @@ vy_tx_destroy(struct vy_tx *tx) } vy_tx_read_set_iter(&tx->read_set, NULL, vy_tx_read_set_free_cb, NULL); + rlist_del_entry(tx, in_writers); } /** Return true if the transaction is read-only. */ @@ -819,6 +822,8 @@ vy_tx_rollback_to_savepoint(struct vy_tx *tx, void *svp) tx->write_set_version++; txv_delete(v); } + if (stailq_empty(&tx->log)) + rlist_del_entry(tx, in_writers); } int @@ -1018,10 +1023,29 @@ vy_tx_set(struct vy_tx *tx, struct vy_lsm *lsm, struct tuple *stmt) tx->write_set_version++; tx->write_size += tuple_size(stmt); vy_stmt_counter_acct_tuple(&lsm->stat.txw.count, stmt); + if (stailq_empty(&tx->log)) + rlist_add_entry(&tx->xm->writers, tx, in_writers); stailq_add_tail_entry(&tx->log, v, next_in_log); return 0; } +int +tx_manager_abort_writers(struct tx_manager *xm, struct vy_lsm *lsm) +{ + struct tuple *key = vy_stmt_new_select(lsm->env->key_format, NULL, 0); + if (key == NULL) + return -1; + struct vy_tx *tx; + rlist_foreach_entry(tx, &xm->writers, in_writers) { + assert(!vy_tx_is_ro(tx)); + if (tx->state == VINYL_TX_READY && + write_set_search_key(&tx->write_set, lsm, key) != NULL) + tx->state = VINYL_TX_ABORT; + } + tuple_unref(key); + return 0; +} + void vy_txw_iterator_open(struct vy_txw_iterator *itr, struct vy_txw_iterator_stat *stat, diff --git a/src/box/vy_tx.h b/src/box/vy_tx.h index dcf6a739..1d515c72 100644 --- a/src/box/vy_tx.h +++ b/src/box/vy_tx.h @@ -133,6 +133,8 @@ write_set_search_key(write_set_t *tree, struct vy_lsm *lsm, /** Transaction object. */ struct vy_tx { + /** Link in tx_manager::writers. */ + struct rlist in_writers; /** Transaction manager. */ struct tx_manager *xm; /** @@ -209,6 +211,10 @@ struct tx_manager { */ struct vy_tx *last_prepared_tx; /** + * List of rw transactions, linked by vy_tx::in_writers. + */ + struct rlist writers; + /** * The list of TXs with a read view in order of vlsn. */ struct rlist read_views; @@ -262,6 +268,15 @@ tx_manager_new(void); void tx_manager_delete(struct tx_manager *xm); +/** + * Abort all rw transactions that affect the given LSM tree + * and haven't reached WAL yet. + * + * Returns 0 on success, -1 on memory allocation error. + */ +int +tx_manager_abort_writers(struct tx_manager *xm, struct vy_lsm *lsm); + /** Initialize a tx object. */ void vy_tx_create(struct tx_manager *xm, struct vy_tx *tx); diff --git a/test/vinyl/ddl.result b/test/vinyl/ddl.result index 3e65e232..bb4fc984 100644 --- a/test/vinyl/ddl.result +++ b/test/vinyl/ddl.result @@ -656,7 +656,6 @@ last_val = 1000; --- ... function gen_load() - fiber.sleep(0.001) local s = box.space.test for i = 1, 200 do local op = math.random(4) diff --git a/test/vinyl/ddl.test.lua b/test/vinyl/ddl.test.lua index 45c5cf8e..f7bf03bb 100644 --- a/test/vinyl/ddl.test.lua +++ b/test/vinyl/ddl.test.lua @@ -253,7 +253,6 @@ box.commit(); last_val = 1000; function gen_load() - fiber.sleep(0.001) local s = box.space.test for i = 1, 200 do local op = math.random(4) diff --git a/test/vinyl/deferred_delete.result b/test/vinyl/deferred_delete.result index 9811b6bc..ee9c0212 100644 --- a/test/vinyl/deferred_delete.result +++ b/test/vinyl/deferred_delete.result @@ -24,6 +24,11 @@ i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, un i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true}) --- ... +-- space.create_index() does a lookup in the primary index +-- so reset the stats before filling up the space. +box.stat.reset() +--- +... for i = 1, 10 do s:replace{i, i, i} end --- ... diff --git a/test/vinyl/deferred_delete.test.lua b/test/vinyl/deferred_delete.test.lua index d18361a0..671a7ed1 100644 --- a/test/vinyl/deferred_delete.test.lua +++ b/test/vinyl/deferred_delete.test.lua @@ -13,6 +13,9 @@ s = box.schema.space.create('test', {engine = 'vinyl'}) pk = s:create_index('pk', {run_count_per_level = 10}) i1 = s:create_index('i1', {run_count_per_level = 10, parts = {2, 'unsigned'}, unique = false}) i2 = s:create_index('i2', {run_count_per_level = 10, parts = {3, 'unsigned'}, unique = true}) +-- space.create_index() does a lookup in the primary index +-- so reset the stats before filling up the space. +box.stat.reset() for i = 1, 10 do s:replace{i, i, i} end box.snapshot() for i = 1, 10, 2 do s:delete{i} end diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result index cdffa198..17e4dc8c 100644 --- a/test/vinyl/errinj.result +++ b/test/vinyl/errinj.result @@ -1922,3 +1922,199 @@ box.snapshot() -- ok s:drop() --- ... +-- +-- gh-3458: check that rw transactions that started before DDL are +-- aborted. +-- +vinyl_cache = box.cfg.vinyl_cache +--- +... +box.cfg{vinyl_cache = 0} +--- +... +s1 = box.schema.space.create('test1', {engine = 'vinyl'}) +--- +... +_ = s1:create_index('pk', {page_size = 16}) +--- +... +s2 = box.schema.space.create('test2', {engine = 'vinyl'}) +--- +... +_ = s2:create_index('pk') +--- +... +pad = string.rep('x', 16) +--- +... +for i = 101, 200 do s1:replace{i, i, pad} end +--- +... +box.snapshot() +--- +- ok +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function async_replace(space, tuple, timeout) + local c = fiber.channel(1) + fiber.create(function() + box.begin() + space:replace(tuple) + fiber.sleep(timeout) + local status = pcall(box.commit) + c:put(status) + end) + return c +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +c1 = async_replace(s1, {1}, 0.01) +--- +... +c2 = async_replace(s2, {1}, 0.01) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +--- +- ok +... +s1:format{{'key', 'unsigned'}, {'value', 'unsigned'}} +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) +--- +- ok +... +c1:get() -- false (transaction was aborted) +--- +- false +... +c2:get() -- true +--- +- true +... +s1:get(1) == nil +--- +- true +... +s2:get(1) ~= nil +--- +- true +... +s1:format() +--- +- [{'name': 'key', 'type': 'unsigned'}, {'name': 'value', 'type': 'unsigned'}] +... +s1:format{} +--- +... +c1 = async_replace(s1, {2}, 0.01) +--- +... +c2 = async_replace(s2, {2}, 0.01) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +--- +- ok +... +_ = s1:create_index('sk', {parts = {2, 'unsigned'}}) +--- +... +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) +--- +- ok +... +c1:get() -- false (transaction was aborted) +--- +- false +... +c2:get() -- true +--- +- true +... +s1:get(2) == nil +--- +- true +... +s2:get(2) ~= nil +--- +- true +... +s1.index.pk:count() == s1.index.sk:count() +--- +- true +... +s1:drop() +--- +... +s2:drop() +--- +... +box.cfg{vinyl_cache = vinyl_cache} +--- +... +-- Transactions that reached WAL must not be aborted. +s = box.schema.space.create('test', {engine = 'vinyl'}) +--- +... +_ = s:create_index('pk') +--- +... +errinj.set('ERRINJ_WAL_DELAY', true) +--- +- ok +... +_ = fiber.create(function() s:replace{1} end) +--- +... +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) +--- +... +fiber.sleep(0) +--- +... +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +--- +- error: Tuple field count 1 is less than required by space format or defined indexes + (expected at least 2) +... +s:select() +--- +- - [1] +... +s:truncate() +--- +... +errinj.set('ERRINJ_WAL_DELAY', true) +--- +- ok +... +_ = fiber.create(function() s:replace{1} end) +--- +... +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) +--- +... +fiber.sleep(0) +--- +... +s:create_index('sk', {parts = {2, 'unsigned'}}) +--- +- error: Tuple field count 1 is less than required by space format or defined indexes + (expected at least 2) +... +s:select() +--- +- - [1] +... +s:drop() +--- +... diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua index c2332a69..1b02c01c 100644 --- a/test/vinyl/errinj.test.lua +++ b/test/vinyl/errinj.test.lua @@ -765,3 +765,88 @@ errinj.set("ERRINJ_VY_SCHED_TIMEOUT", 0) box.snapshot() -- ok s:drop() + +-- +-- gh-3458: check that rw transactions that started before DDL are +-- aborted. +-- +vinyl_cache = box.cfg.vinyl_cache +box.cfg{vinyl_cache = 0} + +s1 = box.schema.space.create('test1', {engine = 'vinyl'}) +_ = s1:create_index('pk', {page_size = 16}) +s2 = box.schema.space.create('test2', {engine = 'vinyl'}) +_ = s2:create_index('pk') + +pad = string.rep('x', 16) +for i = 101, 200 do s1:replace{i, i, pad} end +box.snapshot() + +test_run:cmd("setopt delimiter ';'") +function async_replace(space, tuple, timeout) + local c = fiber.channel(1) + fiber.create(function() + box.begin() + space:replace(tuple) + fiber.sleep(timeout) + local status = pcall(box.commit) + c:put(status) + end) + return c +end; +test_run:cmd("setopt delimiter ''"); + +c1 = async_replace(s1, {1}, 0.01) +c2 = async_replace(s2, {1}, 0.01) + +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +s1:format{{'key', 'unsigned'}, {'value', 'unsigned'}} +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) + +c1:get() -- false (transaction was aborted) +c2:get() -- true + +s1:get(1) == nil +s2:get(1) ~= nil +s1:format() +s1:format{} + +c1 = async_replace(s1, {2}, 0.01) +c2 = async_replace(s2, {2}, 0.01) + +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0.001) +_ = s1:create_index('sk', {parts = {2, 'unsigned'}}) +errinj.set("ERRINJ_VY_READ_PAGE_TIMEOUT", 0) + +c1:get() -- false (transaction was aborted) +c2:get() -- true + +s1:get(2) == nil +s2:get(2) ~= nil +s1.index.pk:count() == s1.index.sk:count() + +s1:drop() +s2:drop() +box.cfg{vinyl_cache = vinyl_cache} + +-- Transactions that reached WAL must not be aborted. +s = box.schema.space.create('test', {engine = 'vinyl'}) +_ = s:create_index('pk') + +errinj.set('ERRINJ_WAL_DELAY', true) +_ = fiber.create(function() s:replace{1} end) +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) + +fiber.sleep(0) +s:format{{'key', 'unsigned'}, {'value', 'unsigned'}} -- must fail +s:select() +s:truncate() + +errinj.set('ERRINJ_WAL_DELAY', true) +_ = fiber.create(function() s:replace{1} end) +_ = fiber.create(function() fiber.sleep(0.01) errinj.set('ERRINJ_WAL_DELAY', false) end) + +fiber.sleep(0) +s:create_index('sk', {parts = {2, 'unsigned'}}) +s:select() +s:drop() diff --git a/test/vinyl/misc.result b/test/vinyl/misc.result index 409df3e0..d2a7ca68 100644 --- a/test/vinyl/misc.result +++ b/test/vinyl/misc.result @@ -140,6 +140,11 @@ i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}}) --- ... +-- space.create_index() does a lookup in the primary index +-- so reset the stats before calling space.insert(). +box.stat.reset() +--- +... s:insert{1, 1, 1, 1, 1, 1} --- - [1, 1, 1, 1, 1, 1] diff --git a/test/vinyl/misc.test.lua b/test/vinyl/misc.test.lua index 3dbfdb54..9f61ca0a 100644 --- a/test/vinyl/misc.test.lua +++ b/test/vinyl/misc.test.lua @@ -62,6 +62,10 @@ i5 = s:create_index('i5', {unique = true, parts = {4, 'unsigned', 5, 'unsigned', i6 = s:create_index('i6', {unique = true, parts = {4, 'unsigned', 6, 'unsigned', 5, 'unsigned'}}) i7 = s:create_index('i7', {unique = true, parts = {6, 'unsigned'}}) +-- space.create_index() does a lookup in the primary index +-- so reset the stats before calling space.insert(). +box.stat.reset() + s:insert{1, 1, 1, 1, 1, 1} i1:stat().lookup -- 1 -- 2.11.0