[PATCH] box: delay all writes before wal writer is initialized
Vladimir Davydov
vdavydov.dev at gmail.com
Fri Feb 8 19:51:10 MSK 2019
While a replica is bootstrapped from a remote master, vinyl engine
may need to perform compaction, which means that it may write to
the _vinyl_deferred_delete system space. Compaction proceeds fully
asynchronously, i.e. a write may occur after the join stage is
complete, but before the WAL is initialized, in which case the new
replica will crash. To make sure a race like that won't happen, let's
setup a pseudo journal that will delay all WAL writes until the WAL
is initialized.
Closes #3968
---
https://github.com/tarantool/tarantool/issues/3968
https://github.com/tarantool/tarantool/commits/dv/gh-3968-vy-fix-replica-join-crash
src/box/box.cc | 50 +++++++++++++++++++++++----------------
src/box/engine.h | 14 +++++++++++
src/box/journal.c | 42 ++++++++++++++++++++++++++++++++
src/box/journal.h | 21 ++++++++++++++--
src/box/vinyl.c | 3 ---
test/vinyl/replica_quota.result | 34 +++++++++++++++++++++-----
test/vinyl/replica_quota.test.lua | 25 +++++++++++++++-----
7 files changed, 152 insertions(+), 37 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e12a1cba..3845f09d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1731,6 +1731,25 @@ engine_init()
box_set_vinyl_timeout();
}
+static void
+finalize_bootstrap(void)
+{
+ /* Make the initial checkpoint. */
+ engine_begin_checkpoint_xc();
+
+ /*
+ * Delay all WAL writes that engines may issue on their
+ * own (e.g. vinyl's deferred DELETEs) until WAL writer
+ * is initialized.
+ */
+ journal_set(delay_journal_new_xc());
+
+ engine_commit_checkpoint_xc(&replicaset.vclock);
+
+ /* Apprise the garbage collector of the new checkpoint. */
+ gc_add_checkpoint(&replicaset.vclock);
+}
+
/**
* Initialize the first replica of a new replica set.
*/
@@ -1761,12 +1780,7 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
/* Set UUID of a new replica set */
box_set_replicaset_uuid(replicaset_uuid);
- /* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
- panic("failed to create a checkpoint");
-
- gc_add_checkpoint(&replicaset.vclock);
+ finalize_bootstrap();
}
/**
@@ -1813,22 +1827,13 @@ bootstrap_from_master(struct replica *master)
applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
-
- /* Finalize the new replica */
- engine_end_recovery_xc();
-
/* Switch applier to initial state */
applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
assert(applier->state == APPLIER_READY);
- /* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
- panic("failed to create a checkpoint");
+ finalize_bootstrap();
- gc_add_checkpoint(&replicaset.vclock);
+ engine_end_recovery_xc();
}
/**
@@ -2000,7 +2005,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
box_sync_replication(false);
}
recovery_finalize(recovery);
- engine_end_recovery_xc();
/* Check replica set UUID. */
if (!tt_uuid_is_nil(replicaset_uuid) &&
@@ -2010,8 +2014,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
tt_uuid_str(&REPLICASET_UUID));
}
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
+ /*
+ * Delay all WAL writes that engines may issue on their
+ * own (e.g. vinyl's deferred DELETEs) until WAL writer
+ * is initialized.
+ */
+ journal_set(delay_journal_new_xc());
+
+ engine_end_recovery_xc();
}
static void
diff --git a/src/box/engine.h b/src/box/engine.h
index a6949d28..44799baa 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -435,6 +435,20 @@ engine_bootstrap_xc(void)
}
static inline void
+engine_begin_checkpoint_xc(void)
+{
+ if (engine_begin_checkpoint() != 0)
+ diag_raise();
+}
+
+static inline void
+engine_commit_checkpoint_xc(const struct vclock *vclock)
+{
+ if (engine_commit_checkpoint(vclock) != 0)
+ diag_raise();
+}
+
+static inline void
engine_begin_initial_recovery_xc(const struct vclock *recovery_vclock)
{
if (engine_begin_initial_recovery(recovery_vclock) != 0)
diff --git a/src/box/journal.c b/src/box/journal.c
index 7498ba19..3aa913ad 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -31,6 +31,7 @@
#include "journal.h"
#include <small/region.h>
#include <fiber.h>
+#include <fiber_cond.h>
#include <diag.h>
/**
@@ -73,3 +74,44 @@ journal_entry_new(size_t n_rows)
return entry;
}
+/**
+ * A pseudo journal that delays all writes until another journal
+ * is installed. On destruction forwards all pending records to
+ * the new journal.
+ */
+struct delay_journal {
+ struct journal base;
+ struct fiber_cond cond;
+};
+
+static int64_t
+delay_journal_write(struct journal *base, struct journal_entry *entry)
+{
+ struct delay_journal *journal = (struct delay_journal *)base;
+ fiber_cond_wait(&journal->cond);
+ return journal_write(entry);
+}
+
+static void
+delay_journal_destroy(struct journal *base)
+{
+ struct delay_journal *journal = (struct delay_journal *)base;
+ fiber_cond_broadcast(&journal->cond);
+ fiber_cond_destroy(&journal->cond);
+ free(journal);
+}
+
+struct journal *
+delay_journal_new(void)
+{
+ struct delay_journal *journal = malloc(sizeof(*journal));
+ if (journal == NULL) {
+ diag_set(OutOfMemory, sizeof(*journal),
+ "malloc", "delay journal");
+ return NULL;
+ }
+ journal_create(&journal->base, delay_journal_write,
+ delay_journal_destroy);
+ fiber_cond_create(&journal->cond);
+ return &journal->base;
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index e5231688..9304cd8d 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -133,9 +133,10 @@ journal_write(struct journal_entry *entry)
static inline void
journal_set(struct journal *new_journal)
{
- if (current_journal && current_journal->destroy)
- current_journal->destroy(current_journal);
+ struct journal *old_journal = current_journal;
current_journal = new_journal;
+ if (old_journal != NULL && old_journal->destroy != NULL)
+ old_journal->destroy(old_journal);
}
static inline void
@@ -153,9 +154,25 @@ journal_is_initialized(struct journal *journal)
return journal->write != NULL;
}
+/**
+ * Create a pseudo journal that will delay all writes until
+ * another journal is installed.
+ */
+struct journal *
+delay_journal_new(void);
+
#if defined(__cplusplus)
} /* extern "C" */
+static inline struct journal *
+delay_journal_new_xc()
+{
+ struct journal *journal = delay_journal_new();
+ if (journal == NULL)
+ diag_raise();
+ return journal;
+}
+
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_JOURNAL_H_INCLUDED */
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 065a309f..ce198257 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2667,7 +2667,6 @@ static int
vinyl_engine_begin_checkpoint(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
/*
* The scheduler starts worker threads upon the first wakeup.
* To avoid starting the threads for nothing, do not wake it
@@ -2685,7 +2684,6 @@ vinyl_engine_wait_checkpoint(struct engine *engine,
const struct vclock *vclock)
{
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
if (vy_scheduler_wait_checkpoint(&env->scheduler) != 0)
return -1;
if (vy_log_rotate(vclock) != 0)
@@ -2699,7 +2697,6 @@ vinyl_engine_commit_checkpoint(struct engine *engine,
{
(void)vclock;
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
vy_scheduler_end_checkpoint(&env->scheduler);
}
diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result
index 50e39719..bd09e764 100644
--- a/test/vinyl/replica_quota.result
+++ b/test/vinyl/replica_quota.result
@@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
---
...
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
---
...
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+pad = string.rep('x', 10000);
+---
+...
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Send > 1 MB to replica.
+fill()
---
...
box.snapshot()
---
- ok
...
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
---
...
-- Replica has memory limit set to 1 MB so replication would hang
@@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default')
_ = test_run:cmd("stop server replica")
---
...
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica")
@@ -67,7 +89,7 @@ box.snapshot()
---
- ok
...
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica") -- join
diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua
index e04abbc2..1f373fd4 100644
--- a/test/vinyl/replica_quota.test.lua
+++ b/test/vinyl/replica_quota.test.lua
@@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication')
s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+pad = string.rep('x', 10000);
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Send > 1 MB to replica.
+fill()
box.snapshot()
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
-- Replica has memory limit set to 1 MB so replication would hang
-- if the scheduler didn't work on the destination.
@@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default')
-- Check vinyl_timeout is ignored on 'subscribe' (gh-3087).
_ = test_run:cmd("stop server replica")
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica")
_ = test_run:wait_lsn('replica', 'default')
@@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica")
_ = test_run:cmd("cleanup server replica")
box.snapshot()
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica") -- join
_ = test_run:cmd("stop server replica")
--
2.11.0
More information about the Tarantool-patches
mailing list