From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: From: Vladimir Davydov Subject: [PATCH] box: delay all writes before wal writer is initialized Date: Fri, 8 Feb 2019 19:51:10 +0300 Message-Id: <504138963321d4795b577d73f86e013fde9090b8.1549644534.git.vdavydov.dev@gmail.com> To: kostja@tarantool.org Cc: tarantool-patches@freelists.org List-ID: While a replica is bootstrapped from a remote master, vinyl engine may need to perform compaction, which means that it may write to the _vinyl_deferred_delete system space. Compaction proceeds fully asynchronously, i.e. a write may occur after the join stage is complete, but before the WAL is initialized, in which case the new replica will crash. To make sure a race like that won't happen, let's setup a pseudo journal that will delay all WAL writes until the WAL is initialized. Closes #3968 --- https://github.com/tarantool/tarantool/issues/3968 https://github.com/tarantool/tarantool/commits/dv/gh-3968-vy-fix-replica-join-crash src/box/box.cc | 50 +++++++++++++++++++++++---------------- src/box/engine.h | 14 +++++++++++ src/box/journal.c | 42 ++++++++++++++++++++++++++++++++ src/box/journal.h | 21 ++++++++++++++-- src/box/vinyl.c | 3 --- test/vinyl/replica_quota.result | 34 +++++++++++++++++++++----- test/vinyl/replica_quota.test.lua | 25 +++++++++++++++----- 7 files changed, 152 insertions(+), 37 deletions(-) diff --git a/src/box/box.cc b/src/box/box.cc index e12a1cba..3845f09d 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1731,6 +1731,25 @@ engine_init() box_set_vinyl_timeout(); } +static void +finalize_bootstrap(void) +{ + /* Make the initial checkpoint. */ + engine_begin_checkpoint_xc(); + + /* + * Delay all WAL writes that engines may issue on their + * own (e.g. vinyl's deferred DELETEs) until WAL writer + * is initialized. + */ + journal_set(delay_journal_new_xc()); + + engine_commit_checkpoint_xc(&replicaset.vclock); + + /* Apprise the garbage collector of the new checkpoint. */ + gc_add_checkpoint(&replicaset.vclock); +} + /** * Initialize the first replica of a new replica set. */ @@ -1761,12 +1780,7 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid) /* Set UUID of a new replica set */ box_set_replicaset_uuid(replicaset_uuid); - /* Make the initial checkpoint */ - if (engine_begin_checkpoint() || - engine_commit_checkpoint(&replicaset.vclock)) - panic("failed to create a checkpoint"); - - gc_add_checkpoint(&replicaset.vclock); + finalize_bootstrap(); } /** @@ -1813,22 +1827,13 @@ bootstrap_from_master(struct replica *master) applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY); - /* Clear the pointer to journal before it goes out of scope */ - journal_set(NULL); - - /* Finalize the new replica */ - engine_end_recovery_xc(); - /* Switch applier to initial state */ applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY); assert(applier->state == APPLIER_READY); - /* Make the initial checkpoint */ - if (engine_begin_checkpoint() || - engine_commit_checkpoint(&replicaset.vclock)) - panic("failed to create a checkpoint"); + finalize_bootstrap(); - gc_add_checkpoint(&replicaset.vclock); + engine_end_recovery_xc(); } /** @@ -2000,7 +2005,6 @@ local_recovery(const struct tt_uuid *instance_uuid, box_sync_replication(false); } recovery_finalize(recovery); - engine_end_recovery_xc(); /* Check replica set UUID. */ if (!tt_uuid_is_nil(replicaset_uuid) && @@ -2010,8 +2014,14 @@ local_recovery(const struct tt_uuid *instance_uuid, tt_uuid_str(&REPLICASET_UUID)); } - /* Clear the pointer to journal before it goes out of scope */ - journal_set(NULL); + /* + * Delay all WAL writes that engines may issue on their + * own (e.g. vinyl's deferred DELETEs) until WAL writer + * is initialized. + */ + journal_set(delay_journal_new_xc()); + + engine_end_recovery_xc(); } static void diff --git a/src/box/engine.h b/src/box/engine.h index a6949d28..44799baa 100644 --- a/src/box/engine.h +++ b/src/box/engine.h @@ -435,6 +435,20 @@ engine_bootstrap_xc(void) } static inline void +engine_begin_checkpoint_xc(void) +{ + if (engine_begin_checkpoint() != 0) + diag_raise(); +} + +static inline void +engine_commit_checkpoint_xc(const struct vclock *vclock) +{ + if (engine_commit_checkpoint(vclock) != 0) + diag_raise(); +} + +static inline void engine_begin_initial_recovery_xc(const struct vclock *recovery_vclock) { if (engine_begin_initial_recovery(recovery_vclock) != 0) diff --git a/src/box/journal.c b/src/box/journal.c index 7498ba19..3aa913ad 100644 --- a/src/box/journal.c +++ b/src/box/journal.c @@ -31,6 +31,7 @@ #include "journal.h" #include #include +#include #include /** @@ -73,3 +74,44 @@ journal_entry_new(size_t n_rows) return entry; } +/** + * A pseudo journal that delays all writes until another journal + * is installed. On destruction forwards all pending records to + * the new journal. + */ +struct delay_journal { + struct journal base; + struct fiber_cond cond; +}; + +static int64_t +delay_journal_write(struct journal *base, struct journal_entry *entry) +{ + struct delay_journal *journal = (struct delay_journal *)base; + fiber_cond_wait(&journal->cond); + return journal_write(entry); +} + +static void +delay_journal_destroy(struct journal *base) +{ + struct delay_journal *journal = (struct delay_journal *)base; + fiber_cond_broadcast(&journal->cond); + fiber_cond_destroy(&journal->cond); + free(journal); +} + +struct journal * +delay_journal_new(void) +{ + struct delay_journal *journal = malloc(sizeof(*journal)); + if (journal == NULL) { + diag_set(OutOfMemory, sizeof(*journal), + "malloc", "delay journal"); + return NULL; + } + journal_create(&journal->base, delay_journal_write, + delay_journal_destroy); + fiber_cond_create(&journal->cond); + return &journal->base; +} diff --git a/src/box/journal.h b/src/box/journal.h index e5231688..9304cd8d 100644 --- a/src/box/journal.h +++ b/src/box/journal.h @@ -133,9 +133,10 @@ journal_write(struct journal_entry *entry) static inline void journal_set(struct journal *new_journal) { - if (current_journal && current_journal->destroy) - current_journal->destroy(current_journal); + struct journal *old_journal = current_journal; current_journal = new_journal; + if (old_journal != NULL && old_journal->destroy != NULL) + old_journal->destroy(old_journal); } static inline void @@ -153,9 +154,25 @@ journal_is_initialized(struct journal *journal) return journal->write != NULL; } +/** + * Create a pseudo journal that will delay all writes until + * another journal is installed. + */ +struct journal * +delay_journal_new(void); + #if defined(__cplusplus) } /* extern "C" */ +static inline struct journal * +delay_journal_new_xc() +{ + struct journal *journal = delay_journal_new(); + if (journal == NULL) + diag_raise(); + return journal; +} + #endif /* defined(__cplusplus) */ #endif /* TARANTOOL_JOURNAL_H_INCLUDED */ diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 065a309f..ce198257 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -2667,7 +2667,6 @@ static int vinyl_engine_begin_checkpoint(struct engine *engine) { struct vy_env *env = vy_env(engine); - assert(env->status == VINYL_ONLINE); /* * The scheduler starts worker threads upon the first wakeup. * To avoid starting the threads for nothing, do not wake it @@ -2685,7 +2684,6 @@ vinyl_engine_wait_checkpoint(struct engine *engine, const struct vclock *vclock) { struct vy_env *env = vy_env(engine); - assert(env->status == VINYL_ONLINE); if (vy_scheduler_wait_checkpoint(&env->scheduler) != 0) return -1; if (vy_log_rotate(vclock) != 0) @@ -2699,7 +2697,6 @@ vinyl_engine_commit_checkpoint(struct engine *engine, { (void)vclock; struct vy_env *env = vy_env(engine); - assert(env->status == VINYL_ONLINE); vy_scheduler_end_checkpoint(&env->scheduler); } diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result index 50e39719..bd09e764 100644 --- a/test/vinyl/replica_quota.result +++ b/test/vinyl/replica_quota.result @@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' }) _ = s:create_index('pk', {run_count_per_level = 1}) --- ... --- Send > 2 MB to replica. -pad = string.rep('x', 1100) +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) --- ... -for i = 1,1000 do s:insert{i, pad} end +test_run:cmd("setopt delimiter ';'") +--- +- true +... +pad = string.rep('x', 10000); +--- +... +function fill() + for i = 1, 50 do + box.begin() + for j = 1, 10 do + s:replace{math.random(100), math.random(100), pad} + end + box.commit() + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Send > 1 MB to replica. +fill() --- ... box.snapshot() --- - ok ... -for i = 1001,2000 do s:insert{i, pad} end +fill() --- ... -- Replica has memory limit set to 1 MB so replication would hang @@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default') _ = test_run:cmd("stop server replica") --- ... -for i = 2001,3000 do s:insert{i, pad} end +fill() --- ... _ = test_run:cmd("start server replica") @@ -67,7 +89,7 @@ box.snapshot() --- - ok ... -for i = 3001,4000 do s:insert{i, pad} end +fill() --- ... _ = test_run:cmd("start server replica") -- join diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua index e04abbc2..1f373fd4 100644 --- a/test/vinyl/replica_quota.test.lua +++ b/test/vinyl/replica_quota.test.lua @@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication') s = box.schema.space.create('test', { engine = 'vinyl' }) _ = s:create_index('pk', {run_count_per_level = 1}) +_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}}) --- Send > 2 MB to replica. -pad = string.rep('x', 1100) -for i = 1,1000 do s:insert{i, pad} end +test_run:cmd("setopt delimiter ';'") +pad = string.rep('x', 10000); +function fill() + for i = 1, 50 do + box.begin() + for j = 1, 10 do + s:replace{math.random(100), math.random(100), pad} + end + box.commit() + end +end; +test_run:cmd("setopt delimiter ''"); + +-- Send > 1 MB to replica. +fill() box.snapshot() -for i = 1001,2000 do s:insert{i, pad} end +fill() -- Replica has memory limit set to 1 MB so replication would hang -- if the scheduler didn't work on the destination. @@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default') -- Check vinyl_timeout is ignored on 'subscribe' (gh-3087). _ = test_run:cmd("stop server replica") -for i = 2001,3000 do s:insert{i, pad} end +fill() _ = test_run:cmd("start server replica") _ = test_run:wait_lsn('replica', 'default') @@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica") _ = test_run:cmd("cleanup server replica") box.snapshot() -for i = 3001,4000 do s:insert{i, pad} end +fill() _ = test_run:cmd("start server replica") -- join _ = test_run:cmd("stop server replica") -- 2.11.0