* [PATCH] box: delay all writes before wal writer is initialized
@ 2019-02-08 16:51 Vladimir Davydov
2019-02-11 12:34 ` [PATCH v2] box: enable WAL before making initial checkpoint Vladimir Davydov
0 siblings, 1 reply; 4+ messages in thread
From: Vladimir Davydov @ 2019-02-08 16:51 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
While a replica is bootstrapped from a remote master, vinyl engine
may need to perform compaction, which means that it may write to
the _vinyl_deferred_delete system space. Compaction proceeds fully
asynchronously, i.e. a write may occur after the join stage is
complete, but before the WAL is initialized, in which case the new
replica will crash. To make sure a race like that won't happen, let's
setup a pseudo journal that will delay all WAL writes until the WAL
is initialized.
Closes #3968
---
https://github.com/tarantool/tarantool/issues/3968
https://github.com/tarantool/tarantool/commits/dv/gh-3968-vy-fix-replica-join-crash
src/box/box.cc | 50 +++++++++++++++++++++++----------------
src/box/engine.h | 14 +++++++++++
src/box/journal.c | 42 ++++++++++++++++++++++++++++++++
src/box/journal.h | 21 ++++++++++++++--
src/box/vinyl.c | 3 ---
test/vinyl/replica_quota.result | 34 +++++++++++++++++++++-----
test/vinyl/replica_quota.test.lua | 25 +++++++++++++++-----
7 files changed, 152 insertions(+), 37 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e12a1cba..3845f09d 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1731,6 +1731,25 @@ engine_init()
box_set_vinyl_timeout();
}
+static void
+finalize_bootstrap(void)
+{
+ /* Make the initial checkpoint. */
+ engine_begin_checkpoint_xc();
+
+ /*
+ * Delay all WAL writes that engines may issue on their
+ * own (e.g. vinyl's deferred DELETEs) until WAL writer
+ * is initialized.
+ */
+ journal_set(delay_journal_new_xc());
+
+ engine_commit_checkpoint_xc(&replicaset.vclock);
+
+ /* Apprise the garbage collector of the new checkpoint. */
+ gc_add_checkpoint(&replicaset.vclock);
+}
+
/**
* Initialize the first replica of a new replica set.
*/
@@ -1761,12 +1780,7 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
/* Set UUID of a new replica set */
box_set_replicaset_uuid(replicaset_uuid);
- /* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
- panic("failed to create a checkpoint");
-
- gc_add_checkpoint(&replicaset.vclock);
+ finalize_bootstrap();
}
/**
@@ -1813,22 +1827,13 @@ bootstrap_from_master(struct replica *master)
applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
-
- /* Finalize the new replica */
- engine_end_recovery_xc();
-
/* Switch applier to initial state */
applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
assert(applier->state == APPLIER_READY);
- /* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
- panic("failed to create a checkpoint");
+ finalize_bootstrap();
- gc_add_checkpoint(&replicaset.vclock);
+ engine_end_recovery_xc();
}
/**
@@ -2000,7 +2005,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
box_sync_replication(false);
}
recovery_finalize(recovery);
- engine_end_recovery_xc();
/* Check replica set UUID. */
if (!tt_uuid_is_nil(replicaset_uuid) &&
@@ -2010,8 +2014,14 @@ local_recovery(const struct tt_uuid *instance_uuid,
tt_uuid_str(&REPLICASET_UUID));
}
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
+ /*
+ * Delay all WAL writes that engines may issue on their
+ * own (e.g. vinyl's deferred DELETEs) until WAL writer
+ * is initialized.
+ */
+ journal_set(delay_journal_new_xc());
+
+ engine_end_recovery_xc();
}
static void
diff --git a/src/box/engine.h b/src/box/engine.h
index a6949d28..44799baa 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -435,6 +435,20 @@ engine_bootstrap_xc(void)
}
static inline void
+engine_begin_checkpoint_xc(void)
+{
+ if (engine_begin_checkpoint() != 0)
+ diag_raise();
+}
+
+static inline void
+engine_commit_checkpoint_xc(const struct vclock *vclock)
+{
+ if (engine_commit_checkpoint(vclock) != 0)
+ diag_raise();
+}
+
+static inline void
engine_begin_initial_recovery_xc(const struct vclock *recovery_vclock)
{
if (engine_begin_initial_recovery(recovery_vclock) != 0)
diff --git a/src/box/journal.c b/src/box/journal.c
index 7498ba19..3aa913ad 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -31,6 +31,7 @@
#include "journal.h"
#include <small/region.h>
#include <fiber.h>
+#include <fiber_cond.h>
#include <diag.h>
/**
@@ -73,3 +74,44 @@ journal_entry_new(size_t n_rows)
return entry;
}
+/**
+ * A pseudo journal that delays all writes until another journal
+ * is installed. On destruction forwards all pending records to
+ * the new journal.
+ */
+struct delay_journal {
+ struct journal base;
+ struct fiber_cond cond;
+};
+
+static int64_t
+delay_journal_write(struct journal *base, struct journal_entry *entry)
+{
+ struct delay_journal *journal = (struct delay_journal *)base;
+ fiber_cond_wait(&journal->cond);
+ return journal_write(entry);
+}
+
+static void
+delay_journal_destroy(struct journal *base)
+{
+ struct delay_journal *journal = (struct delay_journal *)base;
+ fiber_cond_broadcast(&journal->cond);
+ fiber_cond_destroy(&journal->cond);
+ free(journal);
+}
+
+struct journal *
+delay_journal_new(void)
+{
+ struct delay_journal *journal = malloc(sizeof(*journal));
+ if (journal == NULL) {
+ diag_set(OutOfMemory, sizeof(*journal),
+ "malloc", "delay journal");
+ return NULL;
+ }
+ journal_create(&journal->base, delay_journal_write,
+ delay_journal_destroy);
+ fiber_cond_create(&journal->cond);
+ return &journal->base;
+}
diff --git a/src/box/journal.h b/src/box/journal.h
index e5231688..9304cd8d 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -133,9 +133,10 @@ journal_write(struct journal_entry *entry)
static inline void
journal_set(struct journal *new_journal)
{
- if (current_journal && current_journal->destroy)
- current_journal->destroy(current_journal);
+ struct journal *old_journal = current_journal;
current_journal = new_journal;
+ if (old_journal != NULL && old_journal->destroy != NULL)
+ old_journal->destroy(old_journal);
}
static inline void
@@ -153,9 +154,25 @@ journal_is_initialized(struct journal *journal)
return journal->write != NULL;
}
+/**
+ * Create a pseudo journal that will delay all writes until
+ * another journal is installed.
+ */
+struct journal *
+delay_journal_new(void);
+
#if defined(__cplusplus)
} /* extern "C" */
+static inline struct journal *
+delay_journal_new_xc()
+{
+ struct journal *journal = delay_journal_new();
+ if (journal == NULL)
+ diag_raise();
+ return journal;
+}
+
#endif /* defined(__cplusplus) */
#endif /* TARANTOOL_JOURNAL_H_INCLUDED */
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 065a309f..ce198257 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -2667,7 +2667,6 @@ static int
vinyl_engine_begin_checkpoint(struct engine *engine)
{
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
/*
* The scheduler starts worker threads upon the first wakeup.
* To avoid starting the threads for nothing, do not wake it
@@ -2685,7 +2684,6 @@ vinyl_engine_wait_checkpoint(struct engine *engine,
const struct vclock *vclock)
{
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
if (vy_scheduler_wait_checkpoint(&env->scheduler) != 0)
return -1;
if (vy_log_rotate(vclock) != 0)
@@ -2699,7 +2697,6 @@ vinyl_engine_commit_checkpoint(struct engine *engine,
{
(void)vclock;
struct vy_env *env = vy_env(engine);
- assert(env->status == VINYL_ONLINE);
vy_scheduler_end_checkpoint(&env->scheduler);
}
diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result
index 50e39719..bd09e764 100644
--- a/test/vinyl/replica_quota.result
+++ b/test/vinyl/replica_quota.result
@@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
---
...
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
---
...
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+pad = string.rep('x', 10000);
+---
+...
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Send > 1 MB to replica.
+fill()
---
...
box.snapshot()
---
- ok
...
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
---
...
-- Replica has memory limit set to 1 MB so replication would hang
@@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default')
_ = test_run:cmd("stop server replica")
---
...
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica")
@@ -67,7 +89,7 @@ box.snapshot()
---
- ok
...
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica") -- join
diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua
index e04abbc2..1f373fd4 100644
--- a/test/vinyl/replica_quota.test.lua
+++ b/test/vinyl/replica_quota.test.lua
@@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication')
s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+pad = string.rep('x', 10000);
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Send > 1 MB to replica.
+fill()
box.snapshot()
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
-- Replica has memory limit set to 1 MB so replication would hang
-- if the scheduler didn't work on the destination.
@@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default')
-- Check vinyl_timeout is ignored on 'subscribe' (gh-3087).
_ = test_run:cmd("stop server replica")
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica")
_ = test_run:wait_lsn('replica', 'default')
@@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica")
_ = test_run:cmd("cleanup server replica")
box.snapshot()
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica") -- join
_ = test_run:cmd("stop server replica")
--
2.11.0
^ permalink raw reply [flat|nested] 4+ messages in thread
* [PATCH v2] box: enable WAL before making initial checkpoint
2019-02-08 16:51 [PATCH] box: delay all writes before wal writer is initialized Vladimir Davydov
@ 2019-02-11 12:34 ` Vladimir Davydov
2019-02-11 12:41 ` Konstantin Osipov
0 siblings, 1 reply; 4+ messages in thread
From: Vladimir Davydov @ 2019-02-11 12:34 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches
While a replica is bootstrapped from a remote master, vinyl engine
may need to perform compaction, which means that it may write to
the _vinyl_deferred_delete system space. Compaction proceeds fully
asynchronously, i.e. a write may occur after the join stage is
complete, but before the WAL is initialized, in which case the new
replica will crash. To make sure a race like that won't happen, let's
setup WAL before making the initial checkpoint. The WAL writer is now
initialized right before starting the WAL thread and so we don't need
to split WAL struct into the thread and the writer anymore.
Closes #3968
---
https://github.com/tarantool/tarantool/issues/3968
https://github.com/tarantool/tarantool/tree/dv/gh-3968-vy-fix-replica-join-crash
Changes in v2:
- Instead of setting up a fake delay journal, let's enable WAL
before making initial checkpoint.
v1: https://www.freelists.org/post/tarantool-patches/PATCH-box-delay-all-writes-before-wal-writer-is-initialized
src/box/box.cc | 68 ++++++++++--------
src/box/wal.c | 145 ++++++++++++++++++--------------------
src/box/wal.h | 18 +++--
test/vinyl/replica_quota.result | 34 +++++++--
test/vinyl/replica_quota.test.lua | 25 +++++--
5 files changed, 167 insertions(+), 123 deletions(-)
diff --git a/src/box/box.cc b/src/box/box.cc
index e12a1cba..fde3ecba 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1691,7 +1691,7 @@ box_free(void)
sequence_free();
gc_free();
engine_shutdown();
- wal_thread_stop();
+ wal_free();
}
}
@@ -1761,12 +1761,13 @@ bootstrap_master(const struct tt_uuid *replicaset_uuid)
/* Set UUID of a new replica set */
box_set_replicaset_uuid(replicaset_uuid);
+ /* Enable WAL subsystem. */
+ if (wal_enable() != 0)
+ diag_raise();
+
/* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
+ if (gc_checkpoint() != 0)
panic("failed to create a checkpoint");
-
- gc_add_checkpoint(&replicaset.vclock);
}
/**
@@ -1813,9 +1814,6 @@ bootstrap_from_master(struct replica *master)
applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
-
/* Finalize the new replica */
engine_end_recovery_xc();
@@ -1823,12 +1821,22 @@ bootstrap_from_master(struct replica *master)
applier_resume_to_state(applier, APPLIER_READY, TIMEOUT_INFINITY);
assert(applier->state == APPLIER_READY);
+ /*
+ * An engine may write to WAL on its own during the join
+ * stage (e.g. Vinyl's deferred DELETEs). That's OK - those
+ * records will pass through the recovery journal and wind
+ * up in the initial checkpoint. However, we must enable
+ * the WAL right before starting checkpointing so that
+ * records written during and after the initial checkpoint
+ * go to the real WAL and can be recovered after restart.
+ * This also clears the recovery journal created on stack.
+ */
+ if (wal_enable() != 0)
+ diag_raise();
+
/* Make the initial checkpoint */
- if (engine_begin_checkpoint() ||
- engine_commit_checkpoint(&replicaset.vclock))
+ if (gc_checkpoint() != 0)
panic("failed to create a checkpoint");
-
- gc_add_checkpoint(&replicaset.vclock);
}
/**
@@ -2000,6 +2008,16 @@ local_recovery(const struct tt_uuid *instance_uuid,
box_sync_replication(false);
}
recovery_finalize(recovery);
+
+ /*
+ * We must enable WAL before finalizing engine recovery,
+ * because an engine may start writing to WAL right after
+ * this point (e.g. deferred DELETE statements in Vinyl).
+ * This also clears the recovery journal created on stack.
+ */
+ if (wal_enable() != 0)
+ diag_raise();
+
engine_end_recovery_xc();
/* Check replica set UUID. */
@@ -2009,9 +2027,6 @@ local_recovery(const struct tt_uuid *instance_uuid,
tt_uuid_str(replicaset_uuid),
tt_uuid_str(&REPLICASET_UUID));
}
-
- /* Clear the pointer to journal before it goes out of scope */
- journal_set(NULL);
}
static void
@@ -2083,7 +2098,15 @@ box_cfg_xc(void)
port_init();
iproto_init();
sql_init();
- wal_thread_start();
+
+ int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
+ int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
+ enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
+ if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
+ wal_max_size, &INSTANCE_UUID, on_wal_garbage_collection,
+ on_wal_checkpoint_threshold) != 0) {
+ diag_raise();
+ }
title("loading");
@@ -2128,8 +2151,6 @@ box_cfg_xc(void)
/* Bootstrap a new master */
bootstrap(&instance_uuid, &replicaset_uuid,
&is_bootstrap_leader);
- checkpoint = gc_last_checkpoint();
- assert(checkpoint != NULL);
}
fiber_gc();
@@ -2143,17 +2164,6 @@ box_cfg_xc(void)
}
}
- /* Start WAL writer */
- int64_t wal_max_rows = box_check_wal_max_rows(cfg_geti64("rows_per_wal"));
- int64_t wal_max_size = box_check_wal_max_size(cfg_geti64("wal_max_size"));
- enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
- if (wal_init(wal_mode, cfg_gets("wal_dir"), wal_max_rows,
- wal_max_size, &INSTANCE_UUID, &replicaset.vclock,
- &checkpoint->vclock, on_wal_garbage_collection,
- on_wal_checkpoint_threshold) != 0) {
- diag_raise();
- }
-
rmean_cleanup(rmean_box);
/* Follow replica */
diff --git a/src/box/wal.c b/src/box/wal.c
index cdcaabc0..0b49548c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -66,19 +66,6 @@ wal_write(struct journal *, struct journal_entry *);
static int64_t
wal_write_in_wal_mode_none(struct journal *, struct journal_entry *);
-/* WAL thread. */
-struct wal_thread {
- /** 'wal' thread doing the writes. */
- struct cord cord;
- /** A pipe from 'tx' thread to 'wal' */
- struct cpipe wal_pipe;
- /**
- * Return pipe from 'wal' to tx'. This is a
- * priority pipe and DOES NOT support yield.
- */
- struct cpipe tx_prio_pipe;
-};
-
/*
* WAL writer - maintain a Write Ahead Log for every change
* in the data state.
@@ -100,6 +87,8 @@ struct wal_writer
* the wal-tx bus and are rolled back "on arrival".
*/
struct stailq rollback;
+ /** A pipe from 'tx' thread to 'wal' */
+ struct cpipe wal_pipe;
/* ----------------- wal ------------------- */
/** A setting from instance configuration - rows_per_wal */
int64_t wal_max_rows;
@@ -109,6 +98,13 @@ struct wal_writer
enum wal_mode wal_mode;
/** wal_dir, from the configuration file. */
struct xdir wal_dir;
+ /** 'wal' thread doing the writes. */
+ struct cord cord;
+ /**
+ * Return pipe from 'wal' to tx'. This is a
+ * priority pipe and DOES NOT support yield.
+ */
+ struct cpipe tx_prio_pipe;
/**
* The vector clock of the WAL writer. It's a bit behind
* the vector clock of the transaction thread, since it
@@ -184,7 +180,6 @@ struct vy_log_writer {
};
static struct vy_log_writer vy_log_writer;
-static struct wal_thread wal_thread;
static struct wal_writer wal_writer_singleton;
enum wal_mode
@@ -200,7 +195,7 @@ static void
tx_schedule_commit(struct cmsg *msg);
static struct cmsg_hop wal_request_route[] = {
- {wal_write_to_disk, &wal_thread.tx_prio_pipe},
+ {wal_write_to_disk, &wal_writer_singleton.tx_prio_pipe},
{tx_schedule_commit, NULL},
};
@@ -349,8 +344,6 @@ static void
wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
- const struct vclock *vclock,
- const struct vclock *checkpoint_vclock,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
@@ -372,8 +365,8 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->checkpoint_threshold = INT64_MAX;
writer->checkpoint_triggered = false;
- vclock_copy(&writer->vclock, vclock);
- vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
+ vclock_create(&writer->vclock);
+ vclock_create(&writer->checkpoint_vclock);
rlist_create(&writer->watchers);
writer->on_garbage_collection = on_garbage_collection;
@@ -387,21 +380,9 @@ wal_writer_destroy(struct wal_writer *writer)
xdir_destroy(&writer->wal_dir);
}
-/** WAL thread routine. */
+/** WAL writer thread routine. */
static int
-wal_thread_f(va_list ap);
-
-/** Start WAL thread and setup pipes to and from TX. */
-void
-wal_thread_start()
-{
- if (cord_costart(&wal_thread.cord, "wal", wal_thread_f, NULL) != 0)
- panic("failed to start WAL thread");
-
- /* Create a pipe to WAL thread. */
- cpipe_create(&wal_thread.wal_pipe, "wal");
- cpipe_set_max_input(&wal_thread.wal_pipe, IOV_MAX);
-}
+wal_writer_f(va_list ap);
static int
wal_open_f(struct cbus_call_msg *msg)
@@ -440,7 +421,7 @@ wal_open(struct wal_writer *writer)
* thread.
*/
struct cbus_call_msg msg;
- if (cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
+ if (cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
wal_open_f, NULL, TIMEOUT_INFINITY) == 0) {
/*
* Success: we can now append to
@@ -472,28 +453,38 @@ wal_open(struct wal_writer *writer)
return 0;
}
-/**
- * Initialize WAL writer.
- *
- * @pre The instance has completed recovery from a snapshot
- * and/or existing WALs. All WALs opened in read-only
- * mode are closed. WAL thread has been started.
- */
int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, const struct vclock *checkpoint_vclock,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold)
{
assert(wal_max_rows > 1);
+ /* Initialize the state. */
struct wal_writer *writer = &wal_writer_singleton;
wal_writer_create(writer, wal_mode, wal_dirname, wal_max_rows,
- wal_max_size, instance_uuid, vclock,
- checkpoint_vclock, on_garbage_collection,
+ wal_max_size, instance_uuid, on_garbage_collection,
on_checkpoint_threshold);
+ /* Start WAL thread. */
+ if (cord_costart(&writer->cord, "wal", wal_writer_f, NULL) != 0)
+ return -1;
+
+ /* Create a pipe to WAL thread. */
+ cpipe_create(&writer->wal_pipe, "wal");
+ cpipe_set_max_input(&writer->wal_pipe, IOV_MAX);
+ return 0;
+}
+
+int
+wal_enable(void)
+{
+ struct wal_writer *writer = &wal_writer_singleton;
+
+ /* Initialize the writer vclock from the recovery state. */
+ vclock_copy(&writer->vclock, &replicaset.vclock);
+
/*
* Scan the WAL directory to build an index of all
* existing WAL files. Required for garbage collection,
@@ -502,29 +493,28 @@ wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
if (xdir_scan(&writer->wal_dir))
return -1;
+ /* Open the most recent WAL file. */
if (wal_open(writer) != 0)
return -1;
+ /* Enable journalling. */
journal_set(&writer->base);
return 0;
}
-/**
- * Stop WAL thread, wait until it exits, and destroy WAL writer
- * if it was initialized. Called on shutdown.
- */
void
-wal_thread_stop()
+wal_free(void)
{
- cbus_stop_loop(&wal_thread.wal_pipe);
+ struct wal_writer *writer = &wal_writer_singleton;
- if (cord_join(&wal_thread.cord)) {
+ cbus_stop_loop(&writer->wal_pipe);
+
+ if (cord_join(&writer->cord)) {
/* We can't recover from this in any reasonable way. */
panic_syserror("WAL writer: thread join failed");
}
- if (journal_is_initialized(&wal_writer_singleton.base))
- wal_writer_destroy(&wal_writer_singleton);
+ wal_writer_destroy(writer);
}
void
@@ -533,7 +523,7 @@ wal_sync(void)
struct wal_writer *writer = &wal_writer_singleton;
if (writer->wal_mode == WAL_NONE)
return;
- cbus_flush(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, NULL);
+ cbus_flush(&writer->wal_pipe, &writer->tx_prio_pipe, NULL);
}
static int
@@ -588,7 +578,7 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
return -1;
}
bool cancellable = fiber_set_cancellable(false);
- int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
&checkpoint->base, wal_begin_checkpoint_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
@@ -628,7 +618,7 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
return;
}
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
&checkpoint->base, wal_commit_checkpoint_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
@@ -658,7 +648,7 @@ wal_set_checkpoint_threshold(int64_t threshold)
struct wal_set_checkpoint_threshold_msg msg;
msg.checkpoint_threshold = threshold;
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
&msg.base, wal_set_checkpoint_threshold_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
@@ -707,7 +697,7 @@ wal_collect_garbage(const struct vclock *vclock)
struct wal_gc_msg msg;
msg.vclock = vclock;
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
}
@@ -841,7 +831,7 @@ out:
&msg->vclock) < 0)
vclock_copy(&msg->vclock, &writer->vclock);
cmsg_init(&msg->base, route);
- cpipe_push(&wal_thread.tx_prio_pipe, &msg->base);
+ cpipe_push(&writer->tx_prio_pipe, &msg->base);
} else
say_warn("failed to allocate gc notification message");
}
@@ -874,14 +864,14 @@ wal_writer_begin_rollback(struct wal_writer *writer)
* valve is closed by non-empty writer->rollback
* list.
*/
- { wal_writer_clear_bus, &wal_thread.wal_pipe },
- { wal_writer_clear_bus, &wal_thread.tx_prio_pipe },
+ { wal_writer_clear_bus, &wal_writer_singleton.wal_pipe },
+ { wal_writer_clear_bus, &wal_writer_singleton.tx_prio_pipe },
/*
* Step 2: writer->rollback queue contains all
* messages which need to be rolled back,
* perform the rollback.
*/
- { tx_schedule_rollback, &wal_thread.wal_pipe },
+ { tx_schedule_rollback, &wal_writer_singleton.wal_pipe },
/*
* Step 3: re-open the WAL for writing.
*/
@@ -893,7 +883,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
* all input until rollback mode is off.
*/
cmsg_init(&writer->in_rollback, rollback_route);
- cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
+ cpipe_push(&writer->tx_prio_pipe, &writer->in_rollback);
}
static void
@@ -1013,7 +1003,7 @@ wal_write_to_disk(struct cmsg *msg)
struct cmsg *msg = malloc(sizeof(*msg));
if (msg != NULL) {
cmsg_init(msg, route);
- cpipe_push(&wal_thread.tx_prio_pipe, msg);
+ cpipe_push(&writer->tx_prio_pipe, msg);
writer->checkpoint_triggered = true;
} else {
say_warn("failed to allocate checkpoint "
@@ -1056,11 +1046,12 @@ done:
wal_notify_watchers(writer, WAL_EVENT_WRITE);
}
-/** WAL thread main loop. */
+/** WAL writer main loop. */
static int
-wal_thread_f(va_list ap)
+wal_writer_f(va_list ap)
{
(void) ap;
+ struct wal_writer *writer = &wal_writer_singleton;
/** Initialize eio in this thread */
coio_enable();
@@ -1072,12 +1063,10 @@ wal_thread_f(va_list ap)
* endpoint, to ensure that WAL messages are delivered
* even when tx fiber pool is used up by net messages.
*/
- cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio");
+ cpipe_create(&writer->tx_prio_pipe, "tx_prio");
cbus_loop(&endpoint);
- struct wal_writer *writer = &wal_writer_singleton;
-
/*
* Create a new empty WAL on shutdown so that we don't
* have to rescan the last WAL to find the instance vclock.
@@ -1101,7 +1090,7 @@ wal_thread_f(va_list ap)
if (xlog_is_open(&vy_log_writer.xlog))
xlog_close(&vy_log_writer.xlog, false);
- cpipe_destroy(&wal_thread.tx_prio_pipe);
+ cpipe_destroy(&writer->tx_prio_pipe);
return 0;
}
@@ -1131,8 +1120,8 @@ wal_write(struct journal *journal, struct journal_entry *entry)
}
struct wal_msg *batch;
- if (!stailq_empty(&wal_thread.wal_pipe.input) &&
- (batch = wal_msg(stailq_first_entry(&wal_thread.wal_pipe.input,
+ if (!stailq_empty(&writer->wal_pipe.input) &&
+ (batch = wal_msg(stailq_first_entry(&writer->wal_pipe.input,
struct cmsg, fifo)))) {
stailq_add_tail_entry(&batch->commit, entry, fifo);
@@ -1151,11 +1140,11 @@ wal_write(struct journal *journal, struct journal_entry *entry)
* thread right away.
*/
stailq_add_tail_entry(&batch->commit, entry, fifo);
- cpipe_push(&wal_thread.wal_pipe, &batch->base);
+ cpipe_push(&writer->wal_pipe, &batch->base);
}
batch->approx_len += entry->approx_len;
- wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
- cpipe_flush_input(&wal_thread.wal_pipe);
+ writer->wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
+ cpipe_flush_input(&writer->wal_pipe);
/**
* It's not safe to spuriously wakeup this fiber
* since in that case it will ignore a possible
@@ -1213,10 +1202,11 @@ wal_write_vy_log_f(struct cbus_call_msg *msg)
int
wal_write_vy_log(struct journal_entry *entry)
{
+ struct wal_writer *writer = &wal_writer_singleton;
struct wal_write_vy_log_msg msg;
msg.entry= entry;
bool cancellable = fiber_set_cancellable(false);
- int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+ int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
&msg.base, wal_write_vy_log_f, NULL,
TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
@@ -1235,9 +1225,10 @@ wal_rotate_vy_log_f(struct cbus_call_msg *msg)
void
wal_rotate_vy_log()
{
+ struct wal_writer *writer = &wal_writer_singleton;
struct cbus_call_msg msg;
bool cancellable = fiber_set_cancellable(false);
- cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
+ cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
fiber_set_cancellable(cancellable);
}
diff --git a/src/box/wal.h b/src/box/wal.h
index a9452f2b..4e500d2a 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -68,18 +68,26 @@ typedef void (*wal_on_garbage_collection_f)(const struct vclock *vclock);
*/
typedef void (*wal_on_checkpoint_threshold_f)(void);
-void
-wal_thread_start();
-
+/**
+ * Start WAL thread and initialize WAL writer.
+ */
int
wal_init(enum wal_mode wal_mode, const char *wal_dirname, int64_t wal_max_rows,
int64_t wal_max_size, const struct tt_uuid *instance_uuid,
- const struct vclock *vclock, const struct vclock *checkpoint_vclock,
wal_on_garbage_collection_f on_garbage_collection,
wal_on_checkpoint_threshold_f on_checkpoint_threshold);
+/**
+ * Setup WAL writer as journaling subsystem.
+ */
+int
+wal_enable(void);
+
+/**
+ * Stop WAL thread and free WAL writer resources.
+ */
void
-wal_thread_stop();
+wal_free(void);
struct wal_watcher_msg {
struct cmsg cmsg;
diff --git a/test/vinyl/replica_quota.result b/test/vinyl/replica_quota.result
index 50e39719..bd09e764 100644
--- a/test/vinyl/replica_quota.result
+++ b/test/vinyl/replica_quota.result
@@ -10,18 +10,40 @@ s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
---
...
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
---
...
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+pad = string.rep('x', 10000);
+---
+...
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Send > 1 MB to replica.
+fill()
---
...
box.snapshot()
---
- ok
...
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
---
...
-- Replica has memory limit set to 1 MB so replication would hang
@@ -46,7 +68,7 @@ _ = test_run:wait_lsn('replica', 'default')
_ = test_run:cmd("stop server replica")
---
...
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica")
@@ -67,7 +89,7 @@ box.snapshot()
---
- ok
...
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
---
...
_ = test_run:cmd("start server replica") -- join
diff --git a/test/vinyl/replica_quota.test.lua b/test/vinyl/replica_quota.test.lua
index e04abbc2..1f373fd4 100644
--- a/test/vinyl/replica_quota.test.lua
+++ b/test/vinyl/replica_quota.test.lua
@@ -4,12 +4,25 @@ box.schema.user.grant('guest', 'replication')
s = box.schema.space.create('test', { engine = 'vinyl' })
_ = s:create_index('pk', {run_count_per_level = 1})
+_ = s:create_index('sk', {unique = false, parts = {2, 'unsigned'}})
--- Send > 2 MB to replica.
-pad = string.rep('x', 1100)
-for i = 1,1000 do s:insert{i, pad} end
+test_run:cmd("setopt delimiter ';'")
+pad = string.rep('x', 10000);
+function fill()
+ for i = 1, 50 do
+ box.begin()
+ for j = 1, 10 do
+ s:replace{math.random(100), math.random(100), pad}
+ end
+ box.commit()
+ end
+end;
+test_run:cmd("setopt delimiter ''");
+
+-- Send > 1 MB to replica.
+fill()
box.snapshot()
-for i = 1001,2000 do s:insert{i, pad} end
+fill()
-- Replica has memory limit set to 1 MB so replication would hang
-- if the scheduler didn't work on the destination.
@@ -26,7 +39,7 @@ _ = test_run:wait_lsn('replica', 'default')
-- Check vinyl_timeout is ignored on 'subscribe' (gh-3087).
_ = test_run:cmd("stop server replica")
-for i = 2001,3000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica")
_ = test_run:wait_lsn('replica', 'default')
@@ -36,7 +49,7 @@ _ = test_run:cmd("stop server replica")
_ = test_run:cmd("cleanup server replica")
box.snapshot()
-for i = 3001,4000 do s:insert{i, pad} end
+fill()
_ = test_run:cmd("start server replica") -- join
_ = test_run:cmd("stop server replica")
--
2.11.0
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [PATCH v2] box: enable WAL before making initial checkpoint
2019-02-11 12:34 ` [PATCH v2] box: enable WAL before making initial checkpoint Vladimir Davydov
@ 2019-02-11 12:41 ` Konstantin Osipov
2019-02-11 14:10 ` Vladimir Davydov
0 siblings, 1 reply; 4+ messages in thread
From: Konstantin Osipov @ 2019-02-11 12:41 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches
* Vladimir Davydov <vdavydov.dev@gmail.com> [19/02/11 15:34]:
> While a replica is bootstrapped from a remote master, vinyl engine
> may need to perform compaction, which means that it may write to
> the _vinyl_deferred_delete system space. Compaction proceeds fully
> asynchronously, i.e. a write may occur after the join stage is
> complete, but before the WAL is initialized, in which case the new
> replica will crash. To make sure a race like that won't happen, let's
> setup WAL before making the initial checkpoint. The WAL writer is now
> initialized right before starting the WAL thread and so we don't need
> to split WAL struct into the thread and the writer anymore.
>
> Closes #3968
OK to push.
--
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [PATCH v2] box: enable WAL before making initial checkpoint
2019-02-11 12:41 ` Konstantin Osipov
@ 2019-02-11 14:10 ` Vladimir Davydov
0 siblings, 0 replies; 4+ messages in thread
From: Vladimir Davydov @ 2019-02-11 14:10 UTC (permalink / raw)
To: Konstantin Osipov; +Cc: tarantool-patches
On Mon, Feb 11, 2019 at 03:41:56PM +0300, Konstantin Osipov wrote:
> * Vladimir Davydov <vdavydov.dev@gmail.com> [19/02/11 15:34]:
> > While a replica is bootstrapped from a remote master, vinyl engine
> > may need to perform compaction, which means that it may write to
> > the _vinyl_deferred_delete system space. Compaction proceeds fully
> > asynchronously, i.e. a write may occur after the join stage is
> > complete, but before the WAL is initialized, in which case the new
> > replica will crash. To make sure a race like that won't happen, let's
> > setup WAL before making the initial checkpoint. The WAL writer is now
> > initialized right before starting the WAL thread and so we don't need
> > to split WAL struct into the thread and the writer anymore.
> >
> > Closes #3968
>
> OK to push.
Pushed to 2.1 and 1.10.
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2019-02-11 14:10 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-02-08 16:51 [PATCH] box: delay all writes before wal writer is initialized Vladimir Davydov
2019-02-11 12:34 ` [PATCH v2] box: enable WAL before making initial checkpoint Vladimir Davydov
2019-02-11 12:41 ` Konstantin Osipov
2019-02-11 14:10 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox