Tarantool development patches archive
 help / color / mirror / Atom feed
* [PATCH] box: delay all writes before wal writer is initialized
@ 2019-02-08 16:51 Vladimir Davydov
  2019-02-11 12:34 ` [PATCH v2] box: enable WAL before making initial checkpoint Vladimir Davydov
  0 siblings, 1 reply; 4+ messages in thread
From: Vladimir Davydov @ 2019-02-08 16:51 UTC (permalink / raw)
  To: kostja; +Cc: tarantool-patches

While a replica is bootstrapped from a remote master, vinyl engine
may need to perform compaction, which means that it may write to
the _vinyl_deferred_delete system space. Compaction proceeds fully
asynchronously, i.e. a write may occur after the join stage is
complete, but before the WAL is initialized, in which case the new
replica will crash. To make sure a race like that won't happen, let's
setup a pseudo journal that will delay all WAL writes until the WAL
is initialized.

Closes #3968
---
https://github.com/tarantool/tarantool/issues/3968
https://github.com/tarantool/tarantool/commits/dv/gh-3968-vy-fix-replica-join-crash

 src/box/box.cc                    | 50 +++++++++++++++++++++++----------------
 src/box/engine.h                  | 14 +++++++++++
 src/box/journal.c                 | 42 ++++++++++++++++++++++++++++++++
 src/box/journal.h                 | 21 ++++++++++++++--
 src/box/vinyl.c                   |  3 ---
 test/vinyl/replica_quota.result   | 34 +++++++++++++++++++++-----
 test/vinyl/replica_quota.test.lua | 25 +++++++++++++++-----
 7 files changed, 152 insertions(+), 37 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index e12a1cba..3845f09d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1731,6 +1731,25 @@ engine_init()
 	box_set_vinyl_timeout();
 }
 
+static void
+finalize_bootstrap(void)
+{
+	/* Make the initial checkpoint. */
+	engine_begin_checkpoint_xc();
+
+	/*
+	 * Delay all WAL writes that engines may issue on their
+	 * own (e.g. vinyl's deferred DELETEs) until WAL writer
+	 * is initialized.
+	 */
+	journal_set(delay_journal_new_xc());
+
+	engine_commit_checkpoint_xc(&replicaset.vclock);
+
+	/* Apprise the garbage collector of the new checkpoint. */
+	gc_add_checkpoint(&replicaset.vclock);
+}
+
 /**
  * Initialize the first replica of a new replica set.
  */
@@ -1761,12 +1780,7 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
 	/* Set UUID of a new replica set */
 	box_set_replicaset_uuid(replicaset_uuid);
 
-	/* Make the initial checkpoint */
-	if (engine_begin_checkpoint() ||
-	    engine_commit_checkpoint(&replicaset.vclock))
-		panic("failed to create a checkpoint");
-
-	gc_add_checkpoint(&replicaset.vclock);
+	finalize_bootstrap();
 }
 
 /**
@@ -1813,22 +1827,13 @@ bootstrap_from_master(struct replica *master)
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
 
-	/* Clear the pointer to journal before it goes out of scope */
-	journal_set(NULL);
-
-	/* Finalize the new replica */
-	engine_end_recovery_xc();
-
 	/* Switch applier to initial state */
 	applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
 	assert(applier->state == APPLIER_READY);
 
-	/* Make the initial checkpoint */
-	if (engine_begin_checkpoint() ||
-	    engine_commit_checkpoint(&replicaset.vclock))
-		panic("failed to create a checkpoint");
+	finalize_bootstrap();
 
-	gc_add_checkpoint(&replicaset.vclock);
+	engine_end_recovery_xc();
 }
 
 /**
@@ -2000,7 +2005,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
 		box_sync_replication(false);
 	}
 	recovery_finalize(recovery);
-	engine_end_recovery_xc();
 
 	/* Check replica set UUID. */
 	if (!tt_uuid_is_nil(replicaset_uuid) &&
@@ -2010,8 +2014,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
 			  tt_uuid_str(&REPLICASET_UUID));
 	}
 
-	/* Clear the pointer to journal before it goes out of scope */
-	journal_set(NULL);
+	/*
+	 * Delay all WAL writes that engines may issue on their
+	 * own (e.g. vinyl's deferred DELETEs) until WAL writer
+	 * is initialized.
+	 */
+	journal_set(delay_journal_new_xc());
+
+	engine_end_recovery_xc();
 }
 
 static void
diff --git a/src/box/engine.h b/src/box/engine.h
index a6949d28..44799baa 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -435,6 +435,20 @@ engine_bootstrap_xc(void)
 }
 
 static inline void
+engine_begin_checkpoint_xc(void)
+{
+	if (engine_begin_checkpoint() != 0)
+		diag_raise();
+}
+
+static inline void
+engine_commit_checkpoint_xc(const struct vclock *vclock)
+{
+	if (engine_commit_checkpoint(vclock) != 0)
+		diag_raise();
+}
+
+static inline void
 engine_begin_initial_recovery_xc(const struct vclock *recovery_vclock)
 {
 	if (engine_begin_initial_recovery(recovery_vclock) != 0)
diff --git a/src/box/journal.c b/src/box/journal.c
index 7498ba19..3aa913ad 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -31,6 +31,7 @@
 #include "journal.h"
 #include <small/region.h>
 #include <fiber.h>
+#include <fiber_cond.h>
 #include <diag.h>
 
 /**
@@ -73,3 +74,44 @@ journal_entry_new(size_t n_rows)
 	return entry;
 }
 
+/**
+ * A pseudo journal that delays all writes until another journal
+ * is installed. On destruction forwards all pending records to
+ * the new journal.
+ */
+struct delay_journal {
+	struct journal base;
+	struct fiber_cond cond;
+};
+
+static int64_t
+delay_journal_write(struct journal *base, struct journal_entry *entry)
+{
+	struct delay_journal *journal = (struct delay_journal *)base;
+	fiber_cond_wait(&journal->cond);
+	return journal_write(entry);
+}
+
+static void
+delay_journal_destroy(struct journal *base)
+{
+	struct delay_journal *journal = (struct delay_journal *)base;
+	fiber_cond_broadcast(&journal->cond);
+	fiber_cond_destroy(&journal->cond);
+	free(journal);
+}
+
+struct journal *
+delay_journal_new(void)
+{
+	struct delay_journal *journal = malloc(sizeof(*journal));
+	if (journal == NULL) {
+		diag_set(OutOfMemory, sizeof(*journal),
+			 "malloc", "delay journal");
+		return NULL;
+	}
+	journal_create(&journal->base, delay_journal_write,
+		       delay_journal_destroy);
+	fiber_cond_create(&journal->cond);
+	return &journal->base;
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index e5231688..9304cd8d 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -133,9 +133,10 @@ journal_write(struct journal_entry *entry)
 static inline void
 journal_set(struct journal *new_journal)
 {
-	if (current_journal && current_journal->destroy)
-		current_journal->destroy(current_journal);
+	struct journal *old_journal = current_journal;
 	current_journal = new_journal;
+	if (old_journal != NULL && old_journal->destroy != NULL)
+		old_journal->destroy(old_journal);
 }
 
 static inline void
@@ -153,9 +154,25 @@ journal_is_initialized(struct journal *journal)
 	return journal->write != NULL;
 }
 
+/**
+ * Create a pseudo journal that will delay all writes until
+ * another journal is installed.
+ */
+struct journal *
+delay_journal_new(void);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 
+static inline struct journal *
+delay_journal_new_xc()
+{
+	struct journal *journal = delay_journal_new();
+	if (journal == NULL)
+		diag_raise();
+	return journal;
+}
+
 #endif /* defined(__cplusplus) */
 
 #endif /* TARANTOOL_JOURNAL_H_INCLUDED */
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 065a309f..ce198257 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2667,7 +2667,6 @@ static int
 vinyl_engine_begin_checkpoint(struct engine *engine)
 {
 	struct vy_env *env = vy_env(engine);
-	assert(env->status == VINYL_ONLINE);
 	/*
 	 * The scheduler starts worker threads upon the first wakeup.
 	 * To avoid starting the threads for nothing, do not wake it
@@ -2685,7 +2684,6 @@ vinyl_engine_wait_checkpoint(struct engine *engine,
 			     const struct vclock *vclock)
 {
 	struct vy_env *env = vy_env(engine);
-	assert(env->status == VINYL_ONLINE);
 	if (vy_scheduler_wait_checkpoint(&env->scheduler) != 0)
 		return -1;
 	if (vy_log_rotate(vclock) != 0)
@@ -2699,7 +2697,6 @@ vinyl_engine_commit_checkpoint(struct engine *engine,
 {
 	(void)vclock;
 	struct vy_env *env = vy_env(engine);
-	assert(env->status == VINYL_ONLINE);
 	vy_scheduler_end_checkpoint(&env->scheduler);
 }
 
diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result
index 50e39719..bd09e764 100644
--- a/test/vinyl/replica_quota.result
+++ b/test/vinyl/replica_quota.result
@@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' })
 _ = s:create_index('pk', {run_count_per_level = 1})
 ---
 ...
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
 ---
 ...
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+pad = string.rep('x', 10000);
+---
+...
+function fill()
+    for i = 1, 50 do
+        box.begin()
+        for j = 1, 10 do
+            s:replace{math.random(100), math.random(100), pad}
+        end
+        box.commit()
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Send > 1 MB to replica.
+fill()
 ---
 ...
 box.snapshot()
 ---
 - ok
 ...
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
 ---
 ...
 -- Replica has memory limit set to 1 MB so replication would hang
@@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default')
 _ = test_run:cmd("stop server replica")
 ---
 ...
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
 ---
 ...
 _ = test_run:cmd("start server replica")
@@ -67,7 +89,7 @@ box.snapshot()
 ---
 - ok
 ...
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
 ---
 ...
 _ = test_run:cmd("start server replica") -- join
diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua
index e04abbc2..1f373fd4 100644
--- a/test/vinyl/replica_quota.test.lua
+++ b/test/vinyl/replica_quota.test.lua
@@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication')
 
 s = box.schema.space.create('test', { engine = 'vinyl' })
 _ = s:create_index('pk', {run_count_per_level = 1})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
 
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+pad = string.rep('x', 10000);
+function fill()
+    for i = 1, 50 do
+        box.begin()
+        for j = 1, 10 do
+            s:replace{math.random(100), math.random(100), pad}
+        end
+        box.commit()
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Send > 1 MB to replica.
+fill()
 box.snapshot()
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
 
 -- Replica has memory limit set to 1 MB so replication would hang
 -- if the scheduler didn't work on the destination.
@@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default')
 
 -- Check vinyl_timeout is ignored on 'subscribe' (gh-3087).
 _ = test_run:cmd("stop server replica")
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
 _ = test_run:cmd("start server replica")
 _ = test_run:wait_lsn('replica', 'default')
 
@@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica")
 _ = test_run:cmd("cleanup server replica")
 
 box.snapshot()
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
 
 _ = test_run:cmd("start server replica") -- join
 _ = test_run:cmd("stop server replica")
-- 
2.11.0

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2019-02-11 14:10 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-02-08 16:51 [PATCH] box: delay all writes before wal writer is initialized Vladimir Davydov
2019-02-11 12:34 ` [PATCH v2] box: enable WAL before making initial checkpoint Vladimir Davydov
2019-02-11 12:41   ` Konstantin Osipov
2019-02-11 14:10     ` Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox