* [tarantool-patches] [PATCH v2 1/2] Do not promote wal vclock for failed writes
2019-02-07 17:27 [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Georgy Kirichenko
@ 2019-02-07 17:27 ` Georgy Kirichenko
2019-02-08 9:57 ` Vladimir Davydov
2019-02-07 17:27 ` [tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal Georgy Kirichenko
2019-02-08 10:09 ` [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Vladimir Davydov
2 siblings, 1 reply; 5+ messages in thread
From: Georgy Kirichenko @ 2019-02-07 17:27 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Wal used to promote vclock prior to write the row. This lead to a
situation when master's row would be skipped forever in case there is
an error trying to write it. However, some errors are transient, and we
might be able to successfully apply the same row later. So we do not
promote writer vclock in order to be able to restart replication from
failing point.
Obsoletes xlog/panic_on_lsn_gap.test.
Needed for #2283
---
src/box/wal.c | 24 +-
test/box/errinj.result | 23 ++
test/box/errinj.test.lua | 20 ++
test/xlog/errinj.result | 1 -
test/xlog/panic_on_lsn_gap.result | 377 ----------------------------
test/xlog/panic_on_lsn_gap.test.lua | 136 ----------
6 files changed, 61 insertions(+), 520 deletions(-)
delete mode 100644 test/xlog/panic_on_lsn_gap.result
delete mode 100644 test/xlog/panic_on_lsn_gap.test.lua
diff --git a/src/box/wal.c b/src/box/wal.c
index 3b50d3629..966f3bfb9 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
}
static void
-wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
struct xrow_header **end)
{
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
- (*row)->lsn = vclock_inc(&writer->vclock, instance_id);
+ (*row)->lsn = vclock_inc(vclock, instance_id);
(*row)->replica_id = instance_id;
} else {
- vclock_follow_xrow(&writer->vclock, *row);
+ vclock_follow_xrow(vclock, *row);
}
}
}
@@ -922,6 +922,16 @@ wal_write_to_disk(struct cmsg *msg)
struct wal_msg *wal_msg = (struct wal_msg *) msg;
struct error *error;
+ /*
+ * In order to not to promote writer's vclock in case of an error we
+ * create a copy to assign lsn's before rows were actually written.
+ * After successful xlog flash we update writer vclock to the
+ * last written vclock value.
+ */
+ struct vclock vclock;
+ vclock_create(&vclock);
+ vclock_copy(&vclock, &writer->vclock);
+
struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
while (inj != NULL && inj->bparam)
usleep(10);
@@ -974,14 +984,15 @@ wal_write_to_disk(struct cmsg *msg)
struct journal_entry *entry;
struct stailq_entry *last_committed = NULL;
stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
- wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
- entry->res = vclock_sum(&writer->vclock);
+ wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+ entry->res = vclock_sum(&vclock);
rc = xlog_write_entry(l, entry);
if (rc < 0)
goto done;
if (rc > 0) {
writer->checkpoint_wal_size += rc;
last_committed = &entry->fifo;
+ vclock_copy(&writer->vclock, &vclock);
}
/* rc == 0: the write is buffered in xlog_tx */
}
@@ -991,6 +1002,7 @@ wal_write_to_disk(struct cmsg *msg)
writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
+ vclock_copy(&writer->vclock, &vclock);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1185,7 +1197,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
struct journal_entry *entry)
{
struct wal_writer *writer = (struct wal_writer *) journal;
- wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
+ wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
if (new_lsn > old_lsn) {
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 12303670e..1d9a16d8d 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -141,6 +141,15 @@ errinj.set("ERRINJ_TESTING", false)
---
- ok
...
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+lsn1 = box.info.vclock[box.info.id]
+---
+...
-- Check how well we handle a failed log write
errinj.set("ERRINJ_WAL_IO", true)
---
@@ -161,6 +170,11 @@ space:insert{1}
---
- [1]
...
+-- Check vclock was promoted only one time
+box.info.vclock[box.info.id] == lsn1 + 1
+---
+- true
+...
errinj.set("ERRINJ_WAL_IO", true)
---
- ok
@@ -180,6 +194,15 @@ errinj.set("ERRINJ_WAL_IO", false)
---
- ok
...
+space:update(1, {{'=', 2, 2}})
+---
+- [1, 2]
+...
+-- Check vclock was promoted only two times
+box.info.vclock[box.info.id] == lsn1 + 2
+---
+- true
+...
space:truncate()
---
...
diff --git a/test/box/errinj.test.lua b/test/box/errinj.test.lua
index 6f491d623..39d73b57e 100644
--- a/test/box/errinj.test.lua
+++ b/test/box/errinj.test.lua
@@ -18,11 +18,31 @@ space:insert{1}
space:get{1}
errinj.set("ERRINJ_WAL_IO", false)
space:insert{1}
+-- Check vclock was promoted only one time
errinj.set("ERRINJ_WAL_IO", true)
space:update(1, {{'=', 2, 2}})
space:get{1}
space:get{2}
errinj.set("ERRINJ_WAL_IO", false)
+space:update(1, {{'=', 2, 2}})
+-- Check vclock was promoted only two times
+space:truncate()
+
+lsn1 = box.info.vclock[box.info.id]
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
+space:insert{1}
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
+space:insert{1}
+-- Check vclock was promoted only one time
+box.info.vclock[box.info.id] == lsn1 + 1
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
+space:update(1, {{'=', 2, 2}})
+space:get{1}
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
+space:update(1, {{'=', 2, 2}})
+-- Check vclock was promoted only two times
+box.info.vclock[box.info.id] == lsn1 + 2
space:truncate()
-- Check a failed log rotation
diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
index 390404b47..7f15bef35 100644
--- a/test/xlog/errinj.result
+++ b/test/xlog/errinj.result
@@ -43,7 +43,6 @@ require('fio').glob(name .. "/*.xlog")
---
- - xlog/00000000000000000000.xlog
- xlog/00000000000000000001.xlog
- - xlog/00000000000000000002.xlog
...
test_run:cmd('restart server default with cleanup=1')
-- gh-881 iproto request with wal IO error
diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
deleted file mode 100644
index 4dd1291f8..000000000
--- a/test/xlog/panic_on_lsn_gap.result
+++ /dev/null
@@ -1,377 +0,0 @@
---
--- we actually need to know what xlogs the server creates,
--- so start from a clean state
---
---
--- Check how the server is able to find the next
--- xlog if there are failed writes (lsn gaps).
---
-env = require('test_run')
----
-...
-test_run = env.new()
----
-...
-test_run:cmd("create server panic with script='xlog/panic.lua'")
----
-- true
-...
-test_run:cmd("start server panic")
----
-- true
-...
-test_run:cmd("switch panic")
----
-- true
-...
-box.info.vclock
----
-- {}
-...
-s = box.space._schema
----
-...
--- we need to have at least one record in the
--- xlog otherwise the server believes that there
--- is an lsn gap during recovery.
---
-s:replace{"key", 'test 1'}
----
-- ['key', 'test 1']
-...
-box.info.vclock
----
-- {1: 1}
-...
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
----
-- ok
-...
-t = {}
----
-...
---
--- Try to insert rows, so that it's time to
--- switch WALs. No switch will happen though,
--- since no writes were made.
---
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-for i=1,box.cfg.rows_per_wal do
- status, msg = pcall(s.replace, s, {"key"})
- table.insert(t, msg)
-end;
----
-...
-test_run:cmd("setopt delimiter ''");
----
-- true
-...
-t
----
-- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
-...
---
--- Before restart: our LSN is 1, because
--- LSN is promoted in tx only on successful
--- WAL write.
---
-name = string.match(arg[0], "([^,]+)%.lua")
----
-...
-box.info.vclock
----
-- {1: 1}
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
-...
-test_run:cmd("restart server panic")
---
--- After restart: our LSN is the LSN of the
--- last empty WAL created on shutdown, i.e. 11.
---
-box.info.vclock
----
-- {1: 11}
-...
-box.space._schema:select{'key'}
----
-- - ['key', 'test 1']
-...
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
----
-- ok
-...
-t = {}
----
-...
-s = box.space._schema
----
-...
---
--- now do the same
---
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-for i=1,box.cfg.rows_per_wal do
- status, msg = pcall(s.replace, s, {"key"})
- table.insert(t, msg)
-end;
----
-...
-test_run:cmd("setopt delimiter ''");
----
-- true
-...
-t
----
-- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
- - Failed to write to disk
-...
-box.info.vclock
----
-- {1: 11}
-...
-box.error.injection.set("ERRINJ_WAL_WRITE", false)
----
-- ok
-...
---
--- Write a good row after a series of failed
--- rows. There is a gap in LSN, correct,
--- but it's *inside* a single WAL, so doesn't
--- affect WAL search in recover_remaining_wals()
---
-s:replace{'key', 'test 2'}
----
-- ['key', 'test 2']
-...
---
--- notice that vclock before and after
--- server stop is the same -- because it's
--- recorded in the last row
---
-box.info.vclock
----
-- {1: 22}
-...
-test_run:cmd("restart server panic")
-box.info.vclock
----
-- {1: 22}
-...
-box.space._schema:select{'key'}
----
-- - ['key', 'test 2']
-...
--- list all the logs
-name = string.match(arg[0], "([^,]+)%.lua")
----
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
-...
--- now insert 10 rows - so that the next
--- row will need to switch the WAL
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-for i=1,box.cfg.rows_per_wal do
- box.space._schema:replace{"key", 'test 3'}
-end;
----
-...
-test_run:cmd("setopt delimiter ''");
----
-- true
-...
--- the next insert should switch xlog, but aha - it fails
--- a new xlog file is created but has 0 rows
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
-...
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
----
-- ok
-...
-box.space._schema:replace{"key", 'test 3'}
----
-- error: Failed to write to disk
-...
-box.info.vclock
----
-- {1: 32}
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
- - panic/00000000000000000032.xlog
-...
--- and the next one (just to be sure
-box.space._schema:replace{"key", 'test 3'}
----
-- error: Failed to write to disk
-...
-box.info.vclock
----
-- {1: 32}
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
- - panic/00000000000000000032.xlog
-...
-box.error.injection.set("ERRINJ_WAL_WRITE", false)
----
-- ok
-...
--- then a success
-box.space._schema:replace{"key", 'test 4'}
----
-- ['key', 'test 4']
-...
-box.info.vclock
----
-- {1: 35}
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
- - panic/00000000000000000032.xlog
-...
--- restart is ok
-test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
----
-- - ['key', 'test 4']
-...
---
--- Check that if there's an LSN gap between two WALs
--- that appeared due to a disk error and no files is
--- actually missing, we won't panic on recovery.
---
-box.space._schema:replace{'key', 'test 4'} -- creates new WAL
----
-- ['key', 'test 4']
-...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
----
-- ok
-...
-box.space._schema:replace{'key', 'test 5'} -- fails, makes gap
----
-- error: Failed to write to disk
-...
-box.snapshot() -- fails, rotates WAL
----
-- error: Error injection 'xlog write injection'
-...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", false)
----
-- ok
-...
-box.space._schema:replace{'key', 'test 5'} -- creates new WAL
----
-- ['key', 'test 5']
-...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
----
-- ok
-...
-box.space._schema:replace{'key', 'test 6'} -- fails, makes gap
----
-- error: Failed to write to disk
-...
-box.snapshot() -- fails, rotates WAL
----
-- error: Error injection 'xlog write injection'
-...
-box.space._schema:replace{'key', 'test 6'} -- fails, creates empty WAL
----
-- error: Failed to write to disk
-...
-name = string.match(arg[0], "([^,]+)%.lua")
----
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
- - panic/00000000000000000032.xlog
- - panic/00000000000000000035.xlog
- - panic/00000000000000000037.xlog
- - panic/00000000000000000039.xlog
-...
-test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
----
-- - ['key', 'test 5']
-...
--- Check that we don't create a WAL in the gap between the last two.
-box.space._schema:replace{'key', 'test 6'}
----
-- ['key', 'test 6']
-...
-name = string.match(arg[0], "([^,]+)%.lua")
----
-...
-require('fio').glob(name .. "/*.xlog")
----
-- - panic/00000000000000000000.xlog
- - panic/00000000000000000011.xlog
- - panic/00000000000000000022.xlog
- - panic/00000000000000000032.xlog
- - panic/00000000000000000035.xlog
- - panic/00000000000000000037.xlog
- - panic/00000000000000000039.xlog
- - panic/00000000000000000040.xlog
-...
-test_run:cmd('switch default')
----
-- true
-...
-test_run:cmd("stop server panic")
----
-- true
-...
-test_run:cmd("cleanup server panic")
----
-- true
-...
diff --git a/test/xlog/panic_on_lsn_gap.test.lua b/test/xlog/panic_on_lsn_gap.test.lua
deleted file mode 100644
index 6221261a7..000000000
--- a/test/xlog/panic_on_lsn_gap.test.lua
+++ /dev/null
@@ -1,136 +0,0 @@
---
--- we actually need to know what xlogs the server creates,
--- so start from a clean state
---
---
--- Check how the server is able to find the next
--- xlog if there are failed writes (lsn gaps).
---
-env = require('test_run')
-test_run = env.new()
-test_run:cmd("create server panic with script='xlog/panic.lua'")
-test_run:cmd("start server panic")
-test_run:cmd("switch panic")
-box.info.vclock
-s = box.space._schema
--- we need to have at least one record in the
--- xlog otherwise the server believes that there
--- is an lsn gap during recovery.
---
-s:replace{"key", 'test 1'}
-box.info.vclock
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
-t = {}
---
--- Try to insert rows, so that it's time to
--- switch WALs. No switch will happen though,
--- since no writes were made.
---
-test_run:cmd("setopt delimiter ';'")
-for i=1,box.cfg.rows_per_wal do
- status, msg = pcall(s.replace, s, {"key"})
- table.insert(t, msg)
-end;
-test_run:cmd("setopt delimiter ''");
-t
---
--- Before restart: our LSN is 1, because
--- LSN is promoted in tx only on successful
--- WAL write.
---
-name = string.match(arg[0], "([^,]+)%.lua")
-box.info.vclock
-require('fio').glob(name .. "/*.xlog")
-test_run:cmd("restart server panic")
---
--- After restart: our LSN is the LSN of the
--- last empty WAL created on shutdown, i.e. 11.
---
-box.info.vclock
-box.space._schema:select{'key'}
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
-t = {}
-s = box.space._schema
---
--- now do the same
---
-test_run:cmd("setopt delimiter ';'")
-for i=1,box.cfg.rows_per_wal do
- status, msg = pcall(s.replace, s, {"key"})
- table.insert(t, msg)
-end;
-test_run:cmd("setopt delimiter ''");
-t
-box.info.vclock
-box.error.injection.set("ERRINJ_WAL_WRITE", false)
---
--- Write a good row after a series of failed
--- rows. There is a gap in LSN, correct,
--- but it's *inside* a single WAL, so doesn't
--- affect WAL search in recover_remaining_wals()
---
-s:replace{'key', 'test 2'}
---
--- notice that vclock before and after
--- server stop is the same -- because it's
--- recorded in the last row
---
-box.info.vclock
-test_run:cmd("restart server panic")
-box.info.vclock
-box.space._schema:select{'key'}
--- list all the logs
-name = string.match(arg[0], "([^,]+)%.lua")
-require('fio').glob(name .. "/*.xlog")
--- now insert 10 rows - so that the next
--- row will need to switch the WAL
-test_run:cmd("setopt delimiter ';'")
-for i=1,box.cfg.rows_per_wal do
- box.space._schema:replace{"key", 'test 3'}
-end;
-test_run:cmd("setopt delimiter ''");
--- the next insert should switch xlog, but aha - it fails
--- a new xlog file is created but has 0 rows
-require('fio').glob(name .. "/*.xlog")
-box.error.injection.set("ERRINJ_WAL_WRITE", true)
-box.space._schema:replace{"key", 'test 3'}
-box.info.vclock
-require('fio').glob(name .. "/*.xlog")
--- and the next one (just to be sure
-box.space._schema:replace{"key", 'test 3'}
-box.info.vclock
-require('fio').glob(name .. "/*.xlog")
-box.error.injection.set("ERRINJ_WAL_WRITE", false)
--- then a success
-box.space._schema:replace{"key", 'test 4'}
-box.info.vclock
-require('fio').glob(name .. "/*.xlog")
--- restart is ok
-test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
---
--- Check that if there's an LSN gap between two WALs
--- that appeared due to a disk error and no files is
--- actually missing, we won't panic on recovery.
---
-box.space._schema:replace{'key', 'test 4'} -- creates new WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
-box.space._schema:replace{'key', 'test 5'} -- fails, makes gap
-box.snapshot() -- fails, rotates WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", false)
-box.space._schema:replace{'key', 'test 5'} -- creates new WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
-box.space._schema:replace{'key', 'test 6'} -- fails, makes gap
-box.snapshot() -- fails, rotates WAL
-box.space._schema:replace{'key', 'test 6'} -- fails, creates empty WAL
-name = string.match(arg[0], "([^,]+)%.lua")
-require('fio').glob(name .. "/*.xlog")
-test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
--- Check that we don't create a WAL in the gap between the last two.
-box.space._schema:replace{'key', 'test 6'}
-name = string.match(arg[0], "([^,]+)%.lua")
-require('fio').glob(name .. "/*.xlog")
-test_run:cmd('switch default')
-test_run:cmd("stop server panic")
-test_run:cmd("cleanup server panic")
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread
* [tarantool-patches] [PATCH v2 2/2] Promote replicaset.vclock only after wal
2019-02-07 17:27 [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Georgy Kirichenko
2019-02-07 17:27 ` [tarantool-patches] [PATCH v2 1/2] Do not promote wal vclock for failed writes Georgy Kirichenko
@ 2019-02-07 17:27 ` Georgy Kirichenko
2019-02-08 10:09 ` [tarantool-patches] [PATCH v2 0/2] Do not promote vclocks in case of failure Vladimir Davydov
2 siblings, 0 replies; 5+ messages in thread
From: Georgy Kirichenko @ 2019-02-07 17:27 UTC (permalink / raw)
To: tarantool-patches; +Cc: Georgy Kirichenko
Applier used to promote vclock prior to applying the row. This lead to
a situation when master's row would be skipped forever in case there is
an error trying to apply it. However, some errors are transient, and we
might be able to successfully apply the same row later.
While we're at it, make wal writer the only one responsible for
advancing replicaset vclock. It was already doing it for rows coming
from the local instance, besides, it makes the code cleaner since now we
want to advance vclock direct from wal batch reply and lets us get rid of
unnecessary checks whether applier or wal has already advanced the
vclock.
Closes #2283
Prerequisite #980
---
src/box/applier.cc | 46 ++++++---------
src/box/wal.c | 43 ++++----------
test/box/errinj.result | 56 ++++++++++++++----
test/replication/skip_conflict_row.result | 63 +++++++++++++++++++++
test/replication/skip_conflict_row.test.lua | 20 +++++++
5 files changed, 159 insertions(+), 69 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index 21d2e6bcb..cae71ec1c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -512,34 +512,20 @@ applier_subscribe(struct applier *applier)
applier->lag = ev_now(loop()) - row.tm;
applier->last_row_time = ev_monotonic_now(loop());
-
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- /**
- * Promote the replica set vclock before
- * applying the row. If there is an
- * exception (conflict) applying the row,
- * the row is skipped when the replication
- * is resumed.
- */
- vclock_follow_xrow(&replicaset.vclock, &row);
- struct replica *replica = replica_by_id(row.replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set
- * of changes may arrive via two
- * concurrently running appliers. Thanks
- * to vclock_follow() above, the first row
- * in the set will be skipped - but the
- * remaining may execute out of order,
- * when the following xstream_write()
- * yields on WAL. Hence we need a latch to
- * strictly order all changes which belong
- * to the same server id.
- */
- latch_lock(latch);
+ struct replica *replica = replica_by_id(row.replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ /*
+ * In a full mesh topology, the same set
+ * of changes may arrive via two
+ * concurrently running appliers.
+ * Hence we need a latch to strictly order all
+ * changes which belong to the same server id.
+ */
+ latch_lock(latch);
+ if (vclock_get(&replicaset.vclock,
+ row.replica_id) < row.lsn) {
int res = xstream_write(applier->subscribe_stream, &row);
- latch_unlock(latch);
if (res != 0) {
struct error *e = diag_last_error(diag_get());
/**
@@ -550,10 +536,14 @@ applier_subscribe(struct applier *applier)
box_error_code(e) == ER_TUPLE_FOUND &&
replication_skip_conflict)
diag_clear(diag_get());
- else
+ else {
+ latch_unlock(latch);
diag_raise();
+ }
}
}
+ latch_unlock(latch);
+
if (applier->state == APPLIER_SYNC ||
applier->state == APPLIER_FOLLOW)
fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/wal.c b/src/box/wal.c
index 966f3bfb9..6d6dda390 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
* be rolled back.
*/
struct stailq rollback;
+ /** vclock after the batch processed. */
+ struct vclock vclock;
};
/**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
batch->approx_len = 0;
stailq_create(&batch->commit);
stailq_create(&batch->rollback);
+ vclock_create(&batch->vclock);
}
static struct wal_msg *
@@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg)
/* Closes the input valve. */
stailq_concat(&writer->rollback, &batch->rollback);
}
+ /* Update the tx vclock to the latest written by wal. */
+ vclock_copy(&replicaset.vclock, &batch->vclock);
tx_schedule_queue(&batch->commit);
}
@@ -1033,6 +1038,12 @@ done:
error_log(error);
diag_clear(diag_get());
}
+ /*
+ * Remember the vclock of the last successfully written row so
+ * that we can update replicaset.vclock once this message gets
+ * back to tx.
+ */
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
/*
* We need to start rollback from the first request
* following the last committed request. If
@@ -1164,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
bool cancellable = fiber_set_cancellable(false);
fiber_yield(); /* Request was inserted. */
fiber_set_cancellable(cancellable);
- if (entry->res > 0) {
- struct xrow_header **last = entry->rows + entry->n_rows - 1;
- while (last >= entry->rows) {
- /*
- * Find last row from local instance id
- * and promote vclock.
- */
- if ((*last)->replica_id == instance_id) {
- /*
- * In master-master configuration, during sudden
- * power-loss, if the data have not been written
- * to WAL but have already been sent to others,
- * they will send the data back. In this case
- * vclock has already been promoted by applier.
- */
- if (vclock_get(&replicaset.vclock,
- instance_id) < (*last)->lsn) {
- vclock_follow_xrow(&replicaset.vclock,
- *last);
- }
- break;
- }
- --last;
- }
- }
return entry->res;
}
@@ -1198,12 +1184,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
{
struct wal_writer *writer = (struct wal_writer *) journal;
wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
- int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
- int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
- if (new_lsn > old_lsn) {
- /* There were local writes, promote vclock. */
- vclock_follow(&replicaset.vclock, instance_id, new_lsn);
- }
+ vclock_copy(&replicaset.vclock, &writer->vclock);
return vclock_sum(&writer->vclock);
}
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 1d9a16d8d..9a797916c 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -141,31 +141,70 @@ errinj.set("ERRINJ_TESTING", false)
---
- ok
...
-env = require('test_run')
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_IO", true)
---
+- ok
...
-test_run = env.new()
+space:insert{1}
---
+- error: Failed to write to disk
...
-lsn1 = box.info.vclock[box.info.id]
+space:get{1}
---
...
--- Check how well we handle a failed log write
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_IO", false)
---
- ok
...
space:insert{1}
---
+- [1]
+...
+-- Check vclock was promoted only one time
+errinj.set("ERRINJ_WAL_IO", true)
+---
+- ok
+...
+space:update(1, {{'=', 2, 2}})
+---
- error: Failed to write to disk
...
space:get{1}
---
+- [1]
+...
+space:get{2}
+---
...
errinj.set("ERRINJ_WAL_IO", false)
---
- ok
...
+space:update(1, {{'=', 2, 2}})
+---
+- [1, 2]
+...
+-- Check vclock was promoted only two times
+space:truncate()
+---
+...
+lsn1 = box.info.vclock[box.info.id]
+---
+...
+-- Check how well we handle a failed log write
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
+---
+- ok
+...
+space:insert{1}
+---
+- error: Failed to write to disk
+...
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
+---
+- ok
+...
space:insert{1}
---
- [1]
@@ -175,7 +214,7 @@ box.info.vclock[box.info.id] == lsn1 + 1
---
- true
...
-errinj.set("ERRINJ_WAL_IO", true)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", 0)
---
- ok
...
@@ -187,10 +226,7 @@ space:get{1}
---
- [1]
...
-space:get{2}
----
-...
-errinj.set("ERRINJ_WAL_IO", false)
+errinj.set("ERRINJ_WAL_WRITE_PARTIAL", -1)
---
- ok
...
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 6ca13b472..0c45e15e2 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -82,6 +82,69 @@ box.info.status
---
- running
...
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:insert{3}
+---
+- [3]
+...
+lsn1 = box.info.vclock[1]
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:insert{3, 3}
+---
+- [3, 3]
+...
+box.space.test:insert{4}
+---
+- [4]
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+test_run:cmd("switch default")
+---
+- true
+...
-- cleanup
test_run:cmd("stop server replica")
---
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced95..7eed4073c 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -28,6 +28,26 @@ box.space.test:select()
test_run:cmd("switch default")
box.info.status
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.space.test:insert{3}
+lsn1 = box.info.vclock[1]
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+test_run:cmd("switch replica")
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+test_run:cmd("switch default")
+
-- cleanup
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
--
2.20.1
^ permalink raw reply [flat|nested] 5+ messages in thread