* [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error
@ 2018-12-10 15:36 Serge Petrenko
2018-12-11 10:42 ` Vladimir Davydov
0 siblings, 1 reply; 4+ messages in thread
From: Serge Petrenko @ 2018-12-10 15:36 UTC (permalink / raw)
To: kostja; +Cc: tarantool-patches, Serge Petrenko
Applier used to promote vclock prior to applying the row. This lead to a
situation when master's row would be skipped forever in case there is an
error trying to apply it. However, some errors are transient, and we
might be able to successfully apply the same row later.
So only advance vclock if a corresponding row is successfully written to
WAL. This makes sure that rows are not skipped.
replication_skip_conflict works as before: if it is set an a conflict
occurs while applying the row, it is silently ignored and the vclock is
advanced, so the row is never applied.
While we're at it, make wal writer the only one responsible for
advancing replicaset vclock. It was already doing it for rows coming from
the local instance, besides, it makes the code cleaner since now we want
to advance vclock only after a successful write and lets us get rid of
unnecessary checks whether applier or wal has already advanced the
vclock.
Closes #2283
---
https://github.com/tarantool/tarantool/issues/2283
https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
src/box/applier.cc | 49 +++++-----
src/box/wal.c | 59 ++++++-----
src/errinj.h | 1 +
test/box/errinj.result | 2 +
test/replication/skip_conflict_row.result | 64 ++++++++++++
test/replication/skip_conflict_row.test.lua | 19 ++++
test/xlog/errinj.result | 17 +++-
test/xlog/errinj.test.lua | 4 +
test/xlog/panic_on_lsn_gap.result | 103 +++++++++++---------
test/xlog/panic_on_lsn_gap.test.lua | 46 +++++----
10 files changed, 241 insertions(+), 123 deletions(-)
diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff4af95e5..bdc303a7d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -355,6 +355,10 @@ applier_join(struct applier *applier)
coio_read_xrow(coio, ibuf, &row);
applier->last_row_time = ev_monotonic_now(loop());
if (iproto_type_is_dml(row.type)) {
+ /*
+ * Manually advance vclock, since no WAL writes
+ * are done during this stage of recovery.
+ */
vclock_follow_xrow(&replicaset.vclock, &row);
xstream_write_xc(applier->subscribe_stream, &row);
if (++row_count % 100000 == 0)
@@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
applier->lag = ev_now(loop()) - row.tm;
applier->last_row_time = ev_monotonic_now(loop());
+ /*
+ * In a full mesh topology, the same set
+ * of changes may arrive via two
+ * concurrently running appliers.
+ * Thus the rows may execute out of order,
+ * when the following xstream_write()
+ * yields on WAL. Hence we need a latch to
+ * strictly order all changes which belong
+ * to the same server id.
+ */
+ struct replica *replica = replica_by_id(row.replica_id);
+ struct latch *latch = (replica ? &replica->order_latch :
+ &replicaset.applier.order_latch);
+ latch_lock(latch);
if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- /**
- * Promote the replica set vclock before
- * applying the row. If there is an
- * exception (conflict) applying the row,
- * the row is skipped when the replication
- * is resumed.
- */
- vclock_follow_xrow(&replicaset.vclock, &row);
- struct replica *replica = replica_by_id(row.replica_id);
- struct latch *latch = (replica ? &replica->order_latch :
- &replicaset.applier.order_latch);
- /*
- * In a full mesh topology, the same set
- * of changes may arrive via two
- * concurrently running appliers. Thanks
- * to vclock_follow() above, the first row
- * in the set will be skipped - but the
- * remaining may execute out of order,
- * when the following xstream_write()
- * yields on WAL. Hence we need a latch to
- * strictly order all changes which belong
- * to the same server id.
- */
- latch_lock(latch);
int res = xstream_write(applier->subscribe_stream, &row);
latch_unlock(latch);
if (res != 0) {
@@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
*/
if (e->type == &type_ClientError &&
box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict)
+ replication_skip_conflict) {
diag_clear(diag_get());
- else
+ vclock_follow_xrow(&replicaset.vclock, &row);
+ } else
diag_raise();
}
- }
+ } else latch_unlock(latch);
if (applier->state == APPLIER_SYNC ||
applier->state == APPLIER_FOLLOW)
fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/wal.c b/src/box/wal.c
index 3b50d3629..c6c3c4ee5 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -121,6 +121,11 @@ struct wal_writer
* with this LSN and LSN becomes "real".
*/
struct vclock vclock;
+ /*
+ * Uncommitted vclock of the latest entry to be written,
+ * which will become "real" on a successful WAL write.
+ */
+ struct vclock req_vclock;
/**
* VClock of the most recent successfully created checkpoint.
* The WAL writer must not delete WAL files that are needed to
@@ -171,6 +176,11 @@ struct wal_msg {
* be rolled back.
*/
struct stailq rollback;
+ /**
+ * Vclock of latest successfully written request in batch
+ * used to advance replicaset vclock.
+ */
+ struct vclock vclock;
};
/**
@@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
/* Closes the input valve. */
stailq_concat(&writer->rollback, &batch->rollback);
}
+ vclock_copy(&replicaset.vclock, &batch->vclock);
tx_schedule_queue(&batch->commit);
}
@@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
writer->checkpoint_triggered = false;
vclock_copy(&writer->vclock, vclock);
+ vclock_copy(&writer->req_vclock, vclock);
vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
rlist_create(&writer->watchers);
@@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
/** Assign LSN to all local rows. */
for ( ; row < end; row++) {
if ((*row)->replica_id == 0) {
- (*row)->lsn = vclock_inc(&writer->vclock, instance_id);
+ (*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
(*row)->replica_id = instance_id;
} else {
- vclock_follow_xrow(&writer->vclock, *row);
+ vclock_follow_xrow(&writer->req_vclock, *row);
+ }
+ struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);
+ if (inj != NULL && inj->iparam > 0) {
+ (*row)->lsn += inj->iparam;
+ vclock_follow_xrow(&writer->req_vclock, *row);
}
}
}
@@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
struct stailq_entry *last_committed = NULL;
stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
- entry->res = vclock_sum(&writer->vclock);
- rc = xlog_write_entry(l, entry);
+ entry->res = vclock_sum(&writer->req_vclock);
+ int rc = xlog_write_entry(l, entry);
if (rc < 0)
goto done;
if (rc > 0) {
writer->checkpoint_wal_size += rc;
last_committed = &entry->fifo;
+ vclock_copy(&writer->vclock, &writer->req_vclock);
}
/* rc == 0: the write is buffered in xlog_tx */
}
@@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
writer->checkpoint_wal_size += rc;
last_committed = stailq_last(&wal_msg->commit);
+ vclock_copy(&writer->vclock, &writer->req_vclock);
/*
* Notify TX if the checkpoint threshold has been exceeded.
@@ -1021,6 +1040,10 @@ done:
error_log(error);
diag_clear(diag_get());
}
+ /* Latest successfully written row vclock. */
+ vclock_copy(&wal_msg->vclock, &writer->vclock);
+ /* Rollback request vclock to the latest committed. */
+ vclock_copy(&writer->req_vclock, &writer->vclock);
/*
* We need to start rollback from the first request
* following the last committed request. If
@@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
bool cancellable = fiber_set_cancellable(false);
fiber_yield(); /* Request was inserted. */
fiber_set_cancellable(cancellable);
- if (entry->res > 0) {
- struct xrow_header **last = entry->rows + entry->n_rows - 1;
- while (last >= entry->rows) {
- /*
- * Find last row from local instance id
- * and promote vclock.
- */
- if ((*last)->replica_id == instance_id) {
- /*
- * In master-master configuration, during sudden
- * power-loss, if the data have not been written
- * to WAL but have already been sent to others,
- * they will send the data back. In this case
- * vclock has already been promoted by applier.
- */
- if (vclock_get(&replicaset.vclock,
- instance_id) < (*last)->lsn) {
- vclock_follow_xrow(&replicaset.vclock,
- *last);
- }
- break;
- }
- --last;
- }
- }
return entry->res;
}
@@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
struct wal_writer *writer = (struct wal_writer *) journal;
wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
- int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
+ int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
if (new_lsn > old_lsn) {
/* There were local writes, promote vclock. */
vclock_follow(&replicaset.vclock, instance_id, new_lsn);
}
+ vclock_copy(&writer->vclock, &writer->req_vclock);
return vclock_sum(&writer->vclock);
}
diff --git a/src/errinj.h b/src/errinj.h
index 39de63d19..a425d809e 100644
--- a/src/errinj.h
+++ b/src/errinj.h
@@ -122,6 +122,7 @@ struct errinj {
_(ERRINJ_VY_INDEX_FILE_RENAME, ERRINJ_BOOL, {.bparam = false}) \
_(ERRINJ_RELAY_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
_(ERRINJ_WAL_BREAK_LSN, ERRINJ_INT, {.iparam = -1}) \
+ _(ERRINJ_WAL_LSN_GAP, ERRINJ_INT, {.iparam = 0}) \
_(ERRINJ_VY_COMPACTION_DELAY, ERRINJ_BOOL, {.bparam = false}) \
ENUM0(errinj_id, ERRINJ_LIST);
diff --git a/test/box/errinj.result b/test/box/errinj.result
index 12303670e..6bcfaabdb 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -48,6 +48,8 @@ errinj.info()
state: 0
ERRINJ_SNAP_COMMIT_DELAY:
state: false
+ ERRINJ_WAL_LSN_GAP:
+ state: 0
ERRINJ_TUPLE_ALLOC:
state: false
ERRINJ_VY_RUN_WRITE_DELAY:
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 6ca13b472..6ca5ad5ae 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -4,6 +4,7 @@ env = require('test_run')
test_run = env.new()
---
...
+test_run:cmd("restart server default with cleanup=1")
engine = test_run:get_cfg('engine')
---
...
@@ -82,6 +83,69 @@ box.info.status
---
- running
...
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.cfg{replication_skip_conflict=false}
+---
+...
+box.space.test:insert{3}
+---
+- [3]
+...
+box.info.vclock
+---
+- {2: 2, 1: 7}
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:insert{3, 3}
+---
+- [3, 3]
+...
+box.space.test:insert{4}
+---
+- [4]
+...
+box.info.vclock
+---
+- {1: 9}
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.info.vclock
+---
+- {2: 2, 1: 7}
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+box.space.test:select()
+---
+- - [1]
+ - [2]
+ - [3]
+...
+test_run:cmd("switch default")
+---
+- true
+...
-- cleanup
test_run:cmd("stop server replica")
---
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced95..c60999b9b 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -1,5 +1,6 @@
env = require('test_run')
test_run = env.new()
+test_run:cmd("restart server default with cleanup=1")
engine = test_run:get_cfg('engine')
box.schema.user.grant('guest', 'replication')
@@ -28,6 +29,24 @@ box.space.test:select()
test_run:cmd("switch default")
box.info.status
+-- test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.cfg{replication_skip_conflict=false}
+box.space.test:insert{3}
+box.info.vclock
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+box.info.vclock
+test_run:cmd("switch replica")
+box.info.vclock
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+box.space.test:select()
+test_run:cmd("switch default")
+
-- cleanup
test_run:cmd("stop server replica")
test_run:cmd("cleanup server replica")
diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
index 6243ac701..f23bcf2bc 100644
--- a/test/xlog/errinj.result
+++ b/test/xlog/errinj.result
@@ -21,12 +21,28 @@ box.space._schema:insert{"key"}
---
- error: Failed to write to disk
...
+box.info.lsn
+---
+- 0
+...
test_run:cmd('restart server default')
+box.info.lsn
+---
+- 0
+...
box.space._schema:insert{"key"}
---
- ['key']
...
+box.info.lsn
+---
+- 1
+...
test_run:cmd('restart server default')
+box.info.lsn
+---
+- 1
+...
box.space._schema:get{"key"}
---
- ['key']
@@ -43,7 +59,6 @@ require('fio').glob(name .. "/*.xlog")
---
- - xlog/00000000000000000000.xlog
- xlog/00000000000000000001.xlog
- - xlog/00000000000000000002.xlog
...
test_run:cmd('restart server default with cleanup=1')
-- gh-881 iproto request with wal IO error
diff --git a/test/xlog/errinj.test.lua b/test/xlog/errinj.test.lua
index 7a5a29cb6..a359f7145 100644
--- a/test/xlog/errinj.test.lua
+++ b/test/xlog/errinj.test.lua
@@ -12,9 +12,13 @@ test_run:cmd('restart server default with cleanup=1')
box.error.injection.set("ERRINJ_WAL_WRITE", true)
box.space._schema:insert{"key"}
+box.info.lsn
test_run:cmd('restart server default')
+box.info.lsn
box.space._schema:insert{"key"}
+box.info.lsn
test_run:cmd('restart server default')
+box.info.lsn
box.space._schema:get{"key"}
box.space._schema:delete{"key"}
-- list all the logs
diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
index 4dd1291f8..1bab21c98 100644
--- a/test/xlog/panic_on_lsn_gap.result
+++ b/test/xlog/panic_on_lsn_gap.result
@@ -35,13 +35,22 @@ s = box.space._schema
-- xlog otherwise the server believes that there
-- is an lsn gap during recovery.
--
+-- The first WAL will start with lsn 11.
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 10)
+---
+- ok
+...
s:replace{"key", 'test 1'}
---
- ['key', 'test 1']
...
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
+---
+- ok
+...
box.info.vclock
---
-- {1: 1}
+- {1: 11}
...
box.error.injection.set("ERRINJ_WAL_WRITE", true)
---
@@ -82,17 +91,14 @@ t
- Failed to write to disk
- Failed to write to disk
...
---
--- Before restart: our LSN is 1, because
--- LSN is promoted in tx only on successful
--- WAL write.
---
name = string.match(arg[0], "([^,]+)%.lua")
---
...
+-- Vclock is promoted on a successful WAL write.
+-- Last successfully written row {"key", "test 1"} has LSN 11.
box.info.vclock
---
-- {1: 1}
+- {1: 11}
...
require('fio').glob(name .. "/*.xlog")
---
@@ -165,10 +171,18 @@ box.error.injection.set("ERRINJ_WAL_WRITE", false)
-- but it's *inside* a single WAL, so doesn't
-- affect WAL search in recover_remaining_wals()
--
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 10)
+---
+- ok
+...
s:replace{'key', 'test 2'}
---
- ['key', 'test 2']
...
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
+---
+- ok
+...
--
-- notice that vclock before and after
-- server stop is the same -- because it's
@@ -259,11 +273,19 @@ box.error.injection.set("ERRINJ_WAL_WRITE", false)
---
- ok
...
--- then a success
+-- Now write a row after a gap.
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 2)
+---
+- ok
+...
box.space._schema:replace{"key", 'test 4'}
---
- ['key', 'test 4']
...
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
+---
+- ok
+...
box.info.vclock
---
- {1: 35}
@@ -286,45 +308,35 @@ box.space._schema:select{'key'}
-- that appeared due to a disk error and no files is
-- actually missing, we won't panic on recovery.
--
-box.space._schema:replace{'key', 'test 4'} -- creates new WAL
----
-- ['key', 'test 4']
-...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
----
-- ok
-...
-box.space._schema:replace{'key', 'test 5'} -- fails, makes gap
----
-- error: Failed to write to disk
-...
-box.snapshot() -- fails, rotates WAL
+test_run:cmd("setopt delimiter ';'")
---
-- error: Error injection 'xlog write injection'
+- true
...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", false)
+for i = 1, box.cfg.rows_per_wal do
+ box.space._schema:replace{"key", "test 5"}
+end;
---
-- ok
...
-box.space._schema:replace{'key', 'test 5'} -- creates new WAL
+test_run:cmd("setopt delimiter ''");
---
-- ['key', 'test 5']
+- true
...
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 1)
---
- ok
...
-box.space._schema:replace{'key', 'test 6'} -- fails, makes gap
+box.space._schema:replace{"key", "test 6"}
---
-- error: Failed to write to disk
+- ['key', 'test 6']
...
-box.snapshot() -- fails, rotates WAL
+test_run:cmd("restart server panic")
+box.space._schema:get{"key"}
---
-- error: Error injection 'xlog write injection'
+- ['key', 'test 6']
...
-box.space._schema:replace{'key', 'test 6'} -- fails, creates empty WAL
+box.info.vclock
---
-- error: Failed to write to disk
+- {1: 47}
...
name = string.match(arg[0], "([^,]+)%.lua")
---
@@ -336,18 +348,19 @@ require('fio').glob(name .. "/*.xlog")
- panic/00000000000000000022.xlog
- panic/00000000000000000032.xlog
- panic/00000000000000000035.xlog
- - panic/00000000000000000037.xlog
- - panic/00000000000000000039.xlog
+ - panic/00000000000000000045.xlog
+ - panic/00000000000000000047.xlog
...
-test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
+-- Make sure we don't create a WAL in the gap between
+-- the last two.
+box.space._schema:replace{"key", "test 7"}
---
-- - ['key', 'test 5']
+- ['key', 'test 7']
...
--- Check that we don't create a WAL in the gap between the last two.
-box.space._schema:replace{'key', 'test 6'}
+test_run:cmd("restart server panic")
+box.info.vclock
---
-- ['key', 'test 6']
+- {1: 48}
...
name = string.match(arg[0], "([^,]+)%.lua")
---
@@ -359,11 +372,11 @@ require('fio').glob(name .. "/*.xlog")
- panic/00000000000000000022.xlog
- panic/00000000000000000032.xlog
- panic/00000000000000000035.xlog
- - panic/00000000000000000037.xlog
- - panic/00000000000000000039.xlog
- - panic/00000000000000000040.xlog
+ - panic/00000000000000000045.xlog
+ - panic/00000000000000000047.xlog
+ - panic/00000000000000000048.xlog
...
-test_run:cmd('switch default')
+test_run:cmd("switch default")
---
- true
...
diff --git a/test/xlog/panic_on_lsn_gap.test.lua b/test/xlog/panic_on_lsn_gap.test.lua
index 6221261a7..1d77292bc 100644
--- a/test/xlog/panic_on_lsn_gap.test.lua
+++ b/test/xlog/panic_on_lsn_gap.test.lua
@@ -17,7 +17,10 @@ s = box.space._schema
-- xlog otherwise the server believes that there
-- is an lsn gap during recovery.
--
+-- The first WAL will start with lsn 11.
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 10)
s:replace{"key", 'test 1'}
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
box.info.vclock
box.error.injection.set("ERRINJ_WAL_WRITE", true)
t = {}
@@ -33,12 +36,9 @@ for i=1,box.cfg.rows_per_wal do
end;
test_run:cmd("setopt delimiter ''");
t
---
--- Before restart: our LSN is 1, because
--- LSN is promoted in tx only on successful
--- WAL write.
---
name = string.match(arg[0], "([^,]+)%.lua")
+-- Vclock is promoted on a successful WAL write.
+-- Last successfully written row {"key", "test 1"} has LSN 11.
box.info.vclock
require('fio').glob(name .. "/*.xlog")
test_run:cmd("restart server panic")
@@ -69,7 +69,9 @@ box.error.injection.set("ERRINJ_WAL_WRITE", false)
-- but it's *inside* a single WAL, so doesn't
-- affect WAL search in recover_remaining_wals()
--
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 10)
s:replace{'key', 'test 2'}
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
--
-- notice that vclock before and after
-- server stop is the same -- because it's
@@ -101,8 +103,10 @@ box.space._schema:replace{"key", 'test 3'}
box.info.vclock
require('fio').glob(name .. "/*.xlog")
box.error.injection.set("ERRINJ_WAL_WRITE", false)
--- then a success
+-- Now write a row after a gap.
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 2)
box.space._schema:replace{"key", 'test 4'}
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 0)
box.info.vclock
require('fio').glob(name .. "/*.xlog")
-- restart is ok
@@ -113,24 +117,26 @@ box.space._schema:select{'key'}
-- that appeared due to a disk error and no files is
-- actually missing, we won't panic on recovery.
--
-box.space._schema:replace{'key', 'test 4'} -- creates new WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
-box.space._schema:replace{'key', 'test 5'} -- fails, makes gap
-box.snapshot() -- fails, rotates WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", false)
-box.space._schema:replace{'key', 'test 5'} -- creates new WAL
-box.error.injection.set("ERRINJ_WAL_WRITE_DISK", true)
-box.space._schema:replace{'key', 'test 6'} -- fails, makes gap
-box.snapshot() -- fails, rotates WAL
-box.space._schema:replace{'key', 'test 6'} -- fails, creates empty WAL
+test_run:cmd("setopt delimiter ';'")
+for i = 1, box.cfg.rows_per_wal do
+ box.space._schema:replace{"key", "test 5"}
+end;
+test_run:cmd("setopt delimiter ''");
+box.error.injection.set("ERRINJ_WAL_LSN_GAP", 1)
+box.space._schema:replace{"key", "test 6"}
+test_run:cmd("restart server panic")
+box.space._schema:get{"key"}
+box.info.vclock
name = string.match(arg[0], "([^,]+)%.lua")
require('fio').glob(name .. "/*.xlog")
+-- Make sure we don't create a WAL in the gap between
+-- the last two.
+box.space._schema:replace{"key", "test 7"}
test_run:cmd("restart server panic")
-box.space._schema:select{'key'}
--- Check that we don't create a WAL in the gap between the last two.
-box.space._schema:replace{'key', 'test 6'}
+box.info.vclock
name = string.match(arg[0], "([^,]+)%.lua")
require('fio').glob(name .. "/*.xlog")
-test_run:cmd('switch default')
+
+test_run:cmd("switch default")
test_run:cmd("stop server panic")
test_run:cmd("cleanup server panic")
--
2.17.2 (Apple Git-113)
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error
2018-12-10 15:36 [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error Serge Petrenko
@ 2018-12-11 10:42 ` Vladimir Davydov
2018-12-24 10:45 ` Serge Petrenko
0 siblings, 1 reply; 4+ messages in thread
From: Vladimir Davydov @ 2018-12-11 10:42 UTC (permalink / raw)
To: Serge Petrenko; +Cc: kostja, tarantool-patches
On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote:
> Applier used to promote vclock prior to applying the row. This lead to a
> situation when master's row would be skipped forever in case there is an
> error trying to apply it. However, some errors are transient, and we
> might be able to successfully apply the same row later.
>
> So only advance vclock if a corresponding row is successfully written to
> WAL. This makes sure that rows are not skipped.
> replication_skip_conflict works as before: if it is set an a conflict
> occurs while applying the row, it is silently ignored and the vclock is
> advanced, so the row is never applied.
>
> While we're at it, make wal writer the only one responsible for
> advancing replicaset vclock. It was already doing it for rows coming from
> the local instance, besides, it makes the code cleaner since now we want
> to advance vclock only after a successful write and lets us get rid of
> unnecessary checks whether applier or wal has already advanced the
> vclock.
>
> Closes #2283
> ---
> https://github.com/tarantool/tarantool/issues/2283
> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
>
> src/box/applier.cc | 49 +++++-----
> src/box/wal.c | 59 ++++++-----
> src/errinj.h | 1 +
> test/box/errinj.result | 2 +
> test/replication/skip_conflict_row.result | 64 ++++++++++++
> test/replication/skip_conflict_row.test.lua | 19 ++++
> test/xlog/errinj.result | 17 +++-
> test/xlog/errinj.test.lua | 4 +
> test/xlog/panic_on_lsn_gap.result | 103 +++++++++++---------
> test/xlog/panic_on_lsn_gap.test.lua | 46 +++++----
> 10 files changed, 241 insertions(+), 123 deletions(-)
>
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index ff4af95e5..bdc303a7d 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -355,6 +355,10 @@ applier_join(struct applier *applier)
> coio_read_xrow(coio, ibuf, &row);
> applier->last_row_time = ev_monotonic_now(loop());
> if (iproto_type_is_dml(row.type)) {
> + /*
> + * Manually advance vclock, since no WAL writes
> + * are done during this stage of recovery.
> + */
> vclock_follow_xrow(&replicaset.vclock, &row);
> xstream_write_xc(applier->subscribe_stream, &row);
> if (++row_count % 100000 == 0)
> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
> applier->lag = ev_now(loop()) - row.tm;
> applier->last_row_time = ev_monotonic_now(loop());
>
> + /*
> + * In a full mesh topology, the same set
> + * of changes may arrive via two
> + * concurrently running appliers.
> + * Thus the rows may execute out of order,
> + * when the following xstream_write()
> + * yields on WAL. Hence we need a latch to
> + * strictly order all changes which belong
> + * to the same server id.
> + */
> + struct replica *replica = replica_by_id(row.replica_id);
> + struct latch *latch = (replica ? &replica->order_latch :
> + &replicaset.applier.order_latch);
> + latch_lock(latch);
> if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> - /**
> - * Promote the replica set vclock before
> - * applying the row. If there is an
> - * exception (conflict) applying the row,
> - * the row is skipped when the replication
> - * is resumed.
> - */
> - vclock_follow_xrow(&replicaset.vclock, &row);
> - struct replica *replica = replica_by_id(row.replica_id);
> - struct latch *latch = (replica ? &replica->order_latch :
> - &replicaset.applier.order_latch);
> - /*
> - * In a full mesh topology, the same set
> - * of changes may arrive via two
> - * concurrently running appliers. Thanks
> - * to vclock_follow() above, the first row
> - * in the set will be skipped - but the
> - * remaining may execute out of order,
> - * when the following xstream_write()
> - * yields on WAL. Hence we need a latch to
> - * strictly order all changes which belong
> - * to the same server id.
> - */
> - latch_lock(latch);
> int res = xstream_write(applier->subscribe_stream, &row);
> latch_unlock(latch);
> if (res != 0) {
> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
> */
> if (e->type == &type_ClientError &&
> box_error_code(e) == ER_TUPLE_FOUND &&
> - replication_skip_conflict)
> + replication_skip_conflict) {
> diag_clear(diag_get());
> - else
> + vclock_follow_xrow(&replicaset.vclock, &row);
Hmm, do we really need to advance the vclock here?
> + } else
> diag_raise();
> }
> - }
> + } else latch_unlock(latch);
Coding style: latch_unlock should be on the next line.
Anyway, could you please rewrite this part so that latch_lock and
latch_unlock stay on the same level?
> if (applier->state == APPLIER_SYNC ||
> applier->state == APPLIER_FOLLOW)
> fiber_cond_signal(&applier->writer_cond);
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 3b50d3629..c6c3c4ee5 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -121,6 +121,11 @@ struct wal_writer
> * with this LSN and LSN becomes "real".
> */
> struct vclock vclock;
> + /*
> + * Uncommitted vclock of the latest entry to be written,
> + * which will become "real" on a successful WAL write.
> + */
> + struct vclock req_vclock;
> /**
> * VClock of the most recent successfully created checkpoint.
> * The WAL writer must not delete WAL files that are needed to
> @@ -171,6 +176,11 @@ struct wal_msg {
> * be rolled back.
> */
> struct stailq rollback;
> + /**
> + * Vclock of latest successfully written request in batch
> + * used to advance replicaset vclock.
> + */
> + struct vclock vclock;
I don't understand this part. Why do you need to introduce these
wal_msg::vclock and wal_writer::req_vclock. Can't you simply use
the vclock of the last written row in wal_write()?
> };
>
> /**
> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
> /* Closes the input valve. */
> stailq_concat(&writer->rollback, &batch->rollback);
> }
> + vclock_copy(&replicaset.vclock, &batch->vclock);
> tx_schedule_queue(&batch->commit);
> }
>
> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
> writer->checkpoint_triggered = false;
>
> vclock_copy(&writer->vclock, vclock);
> + vclock_copy(&writer->req_vclock, vclock);
> vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
> rlist_create(&writer->watchers);
>
> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
> /** Assign LSN to all local rows. */
> for ( ; row < end; row++) {
> if ((*row)->replica_id == 0) {
> - (*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> + (*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
> (*row)->replica_id = instance_id;
> } else {
> - vclock_follow_xrow(&writer->vclock, *row);
> + vclock_follow_xrow(&writer->req_vclock, *row);
> + }
> + struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);
This error injection looks rather artifical to me. Can such a thing
happen in the real world?
Anyway, why do you need it at all? AFAICS you don't use it in the
replication test, which is supposed to test the feature. I don't
want to dive into the xlog tests so could you please elaborate here
or, even better, in the commit message?
> + if (inj != NULL && inj->iparam > 0) {
> + (*row)->lsn += inj->iparam;
> + vclock_follow_xrow(&writer->req_vclock, *row);
> }
> }
> }
> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
> struct stailq_entry *last_committed = NULL;
> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> - entry->res = vclock_sum(&writer->vclock);
> - rc = xlog_write_entry(l, entry);
> + entry->res = vclock_sum(&writer->req_vclock);
> + int rc = xlog_write_entry(l, entry);
> if (rc < 0)
> goto done;
> if (rc > 0) {
> writer->checkpoint_wal_size += rc;
> last_committed = &entry->fifo;
> + vclock_copy(&writer->vclock, &writer->req_vclock);
> }
> /* rc == 0: the write is buffered in xlog_tx */
> }
> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
>
> writer->checkpoint_wal_size += rc;
> last_committed = stailq_last(&wal_msg->commit);
> + vclock_copy(&writer->vclock, &writer->req_vclock);
>
> /*
> * Notify TX if the checkpoint threshold has been exceeded.
> @@ -1021,6 +1040,10 @@ done:
> error_log(error);
> diag_clear(diag_get());
> }
> + /* Latest successfully written row vclock. */
> + vclock_copy(&wal_msg->vclock, &writer->vclock);
> + /* Rollback request vclock to the latest committed. */
> + vclock_copy(&writer->req_vclock, &writer->vclock);
> /*
> * We need to start rollback from the first request
> * following the last committed request. If
> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
> bool cancellable = fiber_set_cancellable(false);
> fiber_yield(); /* Request was inserted. */
> fiber_set_cancellable(cancellable);
> - if (entry->res > 0) {
> - struct xrow_header **last = entry->rows + entry->n_rows - 1;
> - while (last >= entry->rows) {
> - /*
> - * Find last row from local instance id
> - * and promote vclock.
> - */
> - if ((*last)->replica_id == instance_id) {
> - /*
> - * In master-master configuration, during sudden
> - * power-loss, if the data have not been written
> - * to WAL but have already been sent to others,
> - * they will send the data back. In this case
> - * vclock has already been promoted by applier.
> - */
It'd be nice to leave this comment.
> - if (vclock_get(&replicaset.vclock,
> - instance_id) < (*last)->lsn) {
> - vclock_follow_xrow(&replicaset.vclock,
> - *last);
> - }
> - break;
> - }
> - --last;
> - }
> - }
> return entry->res;
> }
>
> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
> struct wal_writer *writer = (struct wal_writer *) journal;
> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> - int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> + int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
> if (new_lsn > old_lsn) {
> /* There were local writes, promote vclock. */
> vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> }
> + vclock_copy(&writer->vclock, &writer->req_vclock);
> return vclock_sum(&writer->vclock);
> }
>
^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error
2018-12-11 10:42 ` Vladimir Davydov
@ 2018-12-24 10:45 ` Serge Petrenko
2019-01-09 10:18 ` Vladimir Davydov
0 siblings, 1 reply; 4+ messages in thread
From: Serge Petrenko @ 2018-12-24 10:45 UTC (permalink / raw)
To: Vladimir Davydov; +Cc: tarantool-patches, Konstantin Osipov
[-- Attachment #1: Type: text/plain, Size: 16170 bytes --]
Hi! Thank you for review!
I pushed the new version of the patch on the branch and answered your comments below..
Incremental diff is below.
> 11 дек. 2018 г., в 13:42, Vladimir Davydov <vdavydov.dev@gmail.com> написал(а):
>
> On Mon, Dec 10, 2018 at 06:36:00PM +0300, Serge Petrenko wrote:
>> Applier used to promote vclock prior to applying the row. This lead to a
>> situation when master's row would be skipped forever in case there is an
>> error trying to apply it. However, some errors are transient, and we
>> might be able to successfully apply the same row later.
>>
>> So only advance vclock if a corresponding row is successfully written to
>> WAL. This makes sure that rows are not skipped.
>> replication_skip_conflict works as before: if it is set an a conflict
>> occurs while applying the row, it is silently ignored and the vclock is
>> advanced, so the row is never applied.
>>
>> While we're at it, make wal writer the only one responsible for
>> advancing replicaset vclock. It was already doing it for rows coming from
>> the local instance, besides, it makes the code cleaner since now we want
>> to advance vclock only after a successful write and lets us get rid of
>> unnecessary checks whether applier or wal has already advanced the
>> vclock.
>>
>> Closes #2283
>> ---
>> https://github.com/tarantool/tarantool/issues/2283
>> https://github.com/tarantool/tarantool/tree/sp/gh-2283-dont-skip-rows
>>
>> src/box/applier.cc | 49 +++++-----
>> src/box/wal.c | 59 ++++++-----
>> src/errinj.h | 1 +
>> test/box/errinj.result | 2 +
>> test/replication/skip_conflict_row.result | 64 ++++++++++++
>> test/replication/skip_conflict_row.test.lua | 19 ++++
>> test/xlog/errinj.result | 17 +++-
>> test/xlog/errinj.test.lua | 4 +
>> test/xlog/panic_on_lsn_gap.result | 103 +++++++++++---------
>> test/xlog/panic_on_lsn_gap.test.lua | 46 +++++----
>> 10 files changed, 241 insertions(+), 123 deletions(-)
>>
>> diff --git a/src/box/applier.cc b/src/box/applier.cc
>> index ff4af95e5..bdc303a7d 100644
>> --- a/src/box/applier.cc
>> +++ b/src/box/applier.cc
>> @@ -355,6 +355,10 @@ applier_join(struct applier *applier)
>> coio_read_xrow(coio, ibuf, &row);
>> applier->last_row_time = ev_monotonic_now(loop());
>> if (iproto_type_is_dml(row.type)) {
>> + /*
>> + * Manually advance vclock, since no WAL writes
>> + * are done during this stage of recovery.
>> + */
>> vclock_follow_xrow(&replicaset.vclock, &row);
>> xstream_write_xc(applier->subscribe_stream, &row);
>> if (++row_count % 100000 == 0)
>> @@ -513,31 +517,21 @@ applier_subscribe(struct applier *applier)
>> applier->lag = ev_now(loop()) - row.tm;
>> applier->last_row_time = ev_monotonic_now(loop());
>>
>> + /*
>> + * In a full mesh topology, the same set
>> + * of changes may arrive via two
>> + * concurrently running appliers.
>> + * Thus the rows may execute out of order,
>> + * when the following xstream_write()
>> + * yields on WAL. Hence we need a latch to
>> + * strictly order all changes which belong
>> + * to the same server id.
>> + */
>> + struct replica *replica = replica_by_id(row.replica_id);
>> + struct latch *latch = (replica ? &replica->order_latch :
>> + &replicaset.applier.order_latch);
>> + latch_lock(latch);
>> if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
>> - /**
>> - * Promote the replica set vclock before
>> - * applying the row. If there is an
>> - * exception (conflict) applying the row,
>> - * the row is skipped when the replication
>> - * is resumed.
>> - */
>> - vclock_follow_xrow(&replicaset.vclock, &row);
>> - struct replica *replica = replica_by_id(row.replica_id);
>> - struct latch *latch = (replica ? &replica->order_latch :
>> - &replicaset.applier.order_latch);
>> - /*
>> - * In a full mesh topology, the same set
>> - * of changes may arrive via two
>> - * concurrently running appliers. Thanks
>> - * to vclock_follow() above, the first row
>> - * in the set will be skipped - but the
>> - * remaining may execute out of order,
>> - * when the following xstream_write()
>> - * yields on WAL. Hence we need a latch to
>> - * strictly order all changes which belong
>> - * to the same server id.
>> - */
>> - latch_lock(latch);
>> int res = xstream_write(applier->subscribe_stream, &row);
>> latch_unlock(latch);
>> if (res != 0) {
>> @@ -548,12 +542,13 @@ applier_subscribe(struct applier *applier)
>> */
>> if (e->type == &type_ClientError &&
>> box_error_code(e) == ER_TUPLE_FOUND &&
>> - replication_skip_conflict)
>> + replication_skip_conflict) {
>> diag_clear(diag_get());
>> - else
>> + vclock_follow_xrow(&replicaset.vclock, &row);
>
> Hmm, do we really need to advance the vclock here?
if xstream_write above fails, vclock is not advanced anywhere, so we have to advance it manually
if we want to skip the row, as directed by replication_skip_conflict option.
>
>> + } else
>> diag_raise();
>> }
>> - }
>> + } else latch_unlock(latch);
>
> Coding style: latch_unlock should be on the next line.
>
> Anyway, could you please rewrite this part so that latch_lock and
> latch_unlock stay on the same level?
Rewrote this part, incremental diff is below.
>
>> if (applier->state == APPLIER_SYNC ||
>> applier->state == APPLIER_FOLLOW)
>> fiber_cond_signal(&applier->writer_cond);
>> diff --git a/src/box/wal.c b/src/box/wal.c
>> index 3b50d3629..c6c3c4ee5 100644
>> --- a/src/box/wal.c
>> +++ b/src/box/wal.c
>> @@ -121,6 +121,11 @@ struct wal_writer
>> * with this LSN and LSN becomes "real".
>> */
>> struct vclock vclock;
>> + /*
>> + * Uncommitted vclock of the latest entry to be written,
>> + * which will become "real" on a successful WAL write.
>> + */
>> + struct vclock req_vclock;
>> /**
>> * VClock of the most recent successfully created checkpoint.
>> * The WAL writer must not delete WAL files that are needed to
>> @@ -171,6 +176,11 @@ struct wal_msg {
>> * be rolled back.
>> */
>> struct stailq rollback;
>> + /**
>> + * Vclock of latest successfully written request in batch
>> + * used to advance replicaset vclock.
>> + */
>> + struct vclock vclock;
>
> I don't understand this part. Why do you need to introduce these
> wal_msg::vclock and wal_writer::req_vclock. Can't you simply use
> the vclock of the last written row in wal_write()?
A batch can have rows from different instances. Previously WAL writer
only advanced replicaset vclock for the rows from the local instance, so we
could use the last written row from an instance to advance vclock.
Now WAL writer advances replicaset vclock for all the rows coming from
every cluster member, so we would have to search the batch for the last
written row from every instance, which is rather cumbersome.
So we better get a separate vclock for that to advance it on every successful WAL write
and then copy it to replicaset vclock.
>
>> };
>>
>> /**
>> @@ -284,6 +294,7 @@ tx_schedule_commit(struct cmsg *msg)
>> /* Closes the input valve. */
>> stailq_concat(&writer->rollback, &batch->rollback);
>> }
>> + vclock_copy(&replicaset.vclock, &batch->vclock);
>> tx_schedule_queue(&batch->commit);
>> }
>>
>> @@ -368,6 +379,7 @@ wal_writer_create(struct wal_writer *writer, enum wal_mode wal_mode,
>> writer->checkpoint_triggered = false;
>>
>> vclock_copy(&writer->vclock, vclock);
>> + vclock_copy(&writer->req_vclock, vclock);
>> vclock_copy(&writer->checkpoint_vclock, checkpoint_vclock);
>> rlist_create(&writer->watchers);
>>
>> @@ -907,10 +919,15 @@ wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
>> /** Assign LSN to all local rows. */
>> for ( ; row < end; row++) {
>> if ((*row)->replica_id == 0) {
>> - (*row)->lsn = vclock_inc(&writer->vclock, instance_id);
>> + (*row)->lsn = vclock_inc(&writer->req_vclock, instance_id);
>> (*row)->replica_id = instance_id;
>> } else {
>> - vclock_follow_xrow(&writer->vclock, *row);
>> + vclock_follow_xrow(&writer->req_vclock, *row);
>> + }
>> + struct errinj *inj = errinj(ERRINJ_WAL_LSN_GAP, ERRINJ_INT);
>
> This error injection looks rather artifical to me. Can such a thing
> happen in the real world?
>
> Anyway, why do you need it at all? AFAICS you don't use it in the
> replication test, which is supposed to test the feature. I don't
> want to dive into the xlog tests so could you please elaborate here
> or, even better, in the commit message?
In a test xlog/panic_on_lsn_gap.test.lua we created an lsn gap in a file by
simulating a failed WAL write, which caused the writer to advance its vclock
The way to create an lsn gap in a WAL doesn’t work anymore, because this
patch alters WAL writer to only advance its vclock on successful journal writes.
This error injection was added to simulate the old behaviour.
I guess we can drop it together with xlog/panic_on_lsn_gap.test.lua.
AFAICS the bug this test is for is not reproducible anymore.
What do you think?
>
>> + if (inj != NULL && inj->iparam > 0) {
>> + (*row)->lsn += inj->iparam;
>> + vclock_follow_xrow(&writer->req_vclock, *row);
>> }
>> }
>> }
>> @@ -975,13 +992,14 @@ wal_write_to_disk(struct cmsg *msg)
>> struct stailq_entry *last_committed = NULL;
>> stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
>> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
>> - entry->res = vclock_sum(&writer->vclock);
>> - rc = xlog_write_entry(l, entry);
>> + entry->res = vclock_sum(&writer->req_vclock);
>> + int rc = xlog_write_entry(l, entry);
>> if (rc < 0)
>> goto done;
>> if (rc > 0) {
>> writer->checkpoint_wal_size += rc;
>> last_committed = &entry->fifo;
>> + vclock_copy(&writer->vclock, &writer->req_vclock);
>> }
>> /* rc == 0: the write is buffered in xlog_tx */
>> }
>> @@ -991,6 +1009,7 @@ wal_write_to_disk(struct cmsg *msg)
>>
>> writer->checkpoint_wal_size += rc;
>> last_committed = stailq_last(&wal_msg->commit);
>> + vclock_copy(&writer->vclock, &writer->req_vclock);
>>
>> /*
>> * Notify TX if the checkpoint threshold has been exceeded.
>> @@ -1021,6 +1040,10 @@ done:
>> error_log(error);
>> diag_clear(diag_get());
>> }
>> + /* Latest successfully written row vclock. */
>> + vclock_copy(&wal_msg->vclock, &writer->vclock);
>> + /* Rollback request vclock to the latest committed. */
>> + vclock_copy(&writer->req_vclock, &writer->vclock);
>> /*
>> * We need to start rollback from the first request
>> * following the last committed request. If
>> @@ -1152,31 +1175,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>> bool cancellable = fiber_set_cancellable(false);
>> fiber_yield(); /* Request was inserted. */
>> fiber_set_cancellable(cancellable);
>> - if (entry->res > 0) {
>> - struct xrow_header **last = entry->rows + entry->n_rows - 1;
>> - while (last >= entry->rows) {
>> - /*
>> - * Find last row from local instance id
>> - * and promote vclock.
>> - */
>> - if ((*last)->replica_id == instance_id) {
>> - /*
>> - * In master-master configuration, during sudden
>> - * power-loss, if the data have not been written
>> - * to WAL but have already been sent to others,
>> - * they will send the data back. In this case
>> - * vclock has already been promoted by applier.
>> - */
>
> It'd be nice to leave this comment.
Applier doesn’t advance vclock anymore. We can leave the
>> - * In master-master configuration, during sudden
>> - * power-loss, if the data have not been written
>> - * to WAL but have already been sent to others,
>> - * they will send the data back.
part, but I don’t know, where to put it now.
>
>> - if (vclock_get(&replicaset.vclock,
>> - instance_id) < (*last)->lsn) {
>> - vclock_follow_xrow(&replicaset.vclock,
>> - *last);
>> - }
>> - break;
>> - }
>> - --last;
>> - }
>> - }
>> return entry->res;
>> }
>>
>> @@ -1187,11 +1185,12 @@ wal_write_in_wal_mode_none(struct journal *journal,
>> struct wal_writer *writer = (struct wal_writer *) journal;
>> wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
>> int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
>> - int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
>> + int64_t new_lsn = vclock_get(&writer->req_vclock, instance_id);
>> if (new_lsn > old_lsn) {
>> /* There were local writes, promote vclock. */
>> vclock_follow(&replicaset.vclock, instance_id, new_lsn);
>> }
>> + vclock_copy(&writer->vclock, &writer->req_vclock);
>> return vclock_sum(&writer->vclock);
>> }
Incremental diff:
diff --git a/src/box/applier.cc b/src/box/applier.cc
index bdc303a7d..6fac2561a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -530,25 +530,32 @@ applier_subscribe(struct applier *applier)
struct replica *replica = replica_by_id(row.replica_id);
struct latch *latch = (replica ? &replica->order_latch :
&replicaset.applier.order_latch);
+ int res = 0;
+
latch_lock(latch);
- if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
- int res = xstream_write(applier->subscribe_stream, &row);
- latch_unlock(latch);
- if (res != 0) {
- struct error *e = diag_last_error(diag_get());
- /**
- * Silently skip ER_TUPLE_FOUND error if such
- * option is set in config.
- */
- if (e->type == &type_ClientError &&
- box_error_code(e) == ER_TUPLE_FOUND &&
- replication_skip_conflict) {
- diag_clear(diag_get());
- vclock_follow_xrow(&replicaset.vclock, &row);
- } else
- diag_raise();
- }
- } else latch_unlock(latch);
+ if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn)
+ res = xstream_write(applier->subscribe_stream, &row);
+ latch_unlock(latch);
+
+ if (res != 0) {
+ struct error *e = diag_last_error(diag_get());
+ /**
+ * Silently skip ER_TUPLE_FOUND error if such
+ * option is set in config.
+ * Also advance the vclock manually, since
+ * WAL writer only advances it on successful
+ * writes, which is not the case.
+ * ER_TUPLE_FOUND occurs even before WAL write.
+ */
+ if (e->type == &type_ClientError &&
+ box_error_code(e) == ER_TUPLE_FOUND &&
+ replication_skip_conflict) {
+ diag_clear(diag_get());
+ vclock_follow_xrow(&replicaset.vclock, &row);
+ } else
+ diag_raise();
+ }
+
if (applier->state == APPLIER_SYNC ||
applier->state == APPLIER_FOLLOW)
fiber_cond_signal(&applier->writer_cond);
[-- Attachment #2: Type: text/html, Size: 54162 bytes --]
^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2019-01-09 10:18 UTC | newest]
Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2018-12-10 15:36 [tarantool-patches] [PATCH] replication: do not skip master's rows in case of an error Serge Petrenko
2018-12-11 10:42 ` Vladimir Davydov
2018-12-24 10:45 ` Serge Petrenko
2019-01-09 10:18 ` Vladimir Davydov
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox