Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal
@ 2019-01-04 10:34 Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
                   ` (5 more replies)
  0 siblings, 6 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

The patchset eliminates LSN gaps from journal what is needed for
synchronous replication to enforce data stream consistency.
Replicaset vclock is promoted only after write, replication row applying
is allowed only if previous row was processed. All conflicting rows to skip
are replacing with NOPs to fill gaps. After all vclock following
protected with assert to ensure that this is strict one step increase.

Needed for: #980

Georgy Kirichenko (5):
  Do not promote wal vclock for failed writes
  Update replicaset vclock from wal
  Enforce applier out of order protection
  Emit NOP if an applier skips row
  Disallow lsn gaps while vclock following

 src/box/applier.cc                | 113 ++++++++++++++++--------------
 src/box/box.cc                    |   3 +-
 src/box/replication.cc            |   2 +
 src/box/replication.h             |   3 +
 src/box/vclock.c                  |  16 ++++-
 src/box/vclock.h                  |   3 +
 src/box/wal.c                     |  57 ++++++---------
 src/box/xrow.c                    |   2 +-
 test/unit/vclock.cc               |  10 +--
 test/xlog-py/dup_key.result       |  12 +++-
 test/xlog-py/dup_key.test.py      |  18 +++--
 test/xlog/errinj.result           |   1 -
 test/xlog/panic_on_lsn_gap.result |  65 +++++++++--------
 13 files changed, 167 insertions(+), 138 deletions(-)

-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH 1/5] Do not promote wal vclock for failed writes
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
@ 2019-01-04 10:34 ` Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal Georgy Kirichenko
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Increase replica lsn only if corresponding row was successfully written
to disk. This prevents wal from lsn gaps in case of IO errors while applying
and enforces wal consistency.

Needed for: #980
---
 src/box/wal.c                     | 19 ++++++---
 test/xlog/errinj.result           |  1 -
 test/xlog/panic_on_lsn_gap.result | 65 +++++++++++++++----------------
 3 files changed, 45 insertions(+), 40 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 3b50d3629..a55b544aa 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 }
 
 static void
-wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 	       struct xrow_header **end)
 {
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
-			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
+			(*row)->lsn = vclock_inc(vclock, instance_id);
 			(*row)->replica_id = instance_id;
 		} else {
-			vclock_follow_xrow(&writer->vclock, *row);
+			vclock_follow_xrow(vclock, *row);
 		}
 	}
 }
@@ -922,6 +922,11 @@ wal_write_to_disk(struct cmsg *msg)
 	struct wal_msg *wal_msg = (struct wal_msg *) msg;
 	struct error *error;
 
+	/* Local vclock copy. */
+	struct vclock vclock;
+	vclock_create(&vclock);
+	vclock_copy(&vclock, &writer->vclock);
+
 	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
 	while (inj != NULL && inj->bparam)
 		usleep(10);
@@ -974,14 +979,15 @@ wal_write_to_disk(struct cmsg *msg)
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
-		entry->res = vclock_sum(&writer->vclock);
+		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+		entry->res = vclock_sum(&vclock);
 		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
 		if (rc > 0) {
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
+			vclock_copy(&writer->vclock, &vclock);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
@@ -991,6 +997,7 @@ wal_write_to_disk(struct cmsg *msg)
 
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
+	vclock_copy(&writer->vclock, &vclock);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1185,7 +1192,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
-	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
+	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
 	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
 	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
 	if (new_lsn > old_lsn) {
diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
index 390404b47..7f15bef35 100644
--- a/test/xlog/errinj.result
+++ b/test/xlog/errinj.result
@@ -43,7 +43,6 @@ require('fio').glob(name .. "/*.xlog")
 ---
 - - xlog/00000000000000000000.xlog
   - xlog/00000000000000000001.xlog
-  - xlog/00000000000000000002.xlog
 ...
 test_run:cmd('restart server default with cleanup=1')
 -- gh-881 iproto request with wal IO error
diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
index 4dd1291f8..8054baab4 100644
--- a/test/xlog/panic_on_lsn_gap.result
+++ b/test/xlog/panic_on_lsn_gap.result
@@ -105,7 +105,7 @@ test_run:cmd("restart server panic")
 --
 box.info.vclock
 ---
-- {1: 11}
+- {1: 1}
 ...
 box.space._schema:select{'key'}
 ---
@@ -153,7 +153,7 @@ t
 ...
 box.info.vclock
 ---
-- {1: 11}
+- {1: 1}
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", false)
 ---
@@ -176,12 +176,12 @@ s:replace{'key', 'test 2'}
 --
 box.info.vclock
 ---
-- {1: 22}
+- {1: 2}
 ...
 test_run:cmd("restart server panic")
 box.info.vclock
 ---
-- {1: 22}
+- {1: 2}
 ...
 box.space._schema:select{'key'}
 ---
@@ -194,8 +194,8 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
 ...
 -- now insert 10 rows - so that the next
 -- row will need to switch the WAL
@@ -217,8 +217,8 @@ test_run:cmd("setopt delimiter ''");
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", true)
 ---
@@ -230,14 +230,14 @@ box.space._schema:replace{"key", 'test 3'}
 ...
 box.info.vclock
 ---
-- {1: 32}
+- {1: 12}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 -- and the next one (just to be sure
 box.space._schema:replace{"key", 'test 3'}
@@ -246,14 +246,14 @@ box.space._schema:replace{"key", 'test 3'}
 ...
 box.info.vclock
 ---
-- {1: 32}
+- {1: 12}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", false)
 ---
@@ -266,14 +266,14 @@ box.space._schema:replace{"key", 'test 4'}
 ...
 box.info.vclock
 ---
-- {1: 35}
+- {1: 13}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 -- restart is ok
 test_run:cmd("restart server panic")
@@ -332,12 +332,12 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
-  - panic/00000000000000000035.xlog
-  - panic/00000000000000000037.xlog
-  - panic/00000000000000000039.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
+  - panic/00000000000000000013.xlog
+  - panic/00000000000000000014.xlog
+  - panic/00000000000000000015.xlog
 ...
 test_run:cmd("restart server panic")
 box.space._schema:select{'key'}
@@ -355,13 +355,12 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
-  - panic/00000000000000000035.xlog
-  - panic/00000000000000000037.xlog
-  - panic/00000000000000000039.xlog
-  - panic/00000000000000000040.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
+  - panic/00000000000000000013.xlog
+  - panic/00000000000000000014.xlog
+  - panic/00000000000000000015.xlog
 ...
 test_run:cmd('switch default')
 ---
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
@ 2019-01-04 10:34 ` Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 3/5] Enforce applier out of order protection Georgy Kirichenko
                   ` (3 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Journal maintains replicaset vclock for all recovery, local and
replication operations. Introduce replicaset.applier.vclock to
prevent appliers from races.

Needed for: #980
---
 src/box/applier.cc           | 61 +++++++++++++++++++-----------------
 src/box/box.cc               |  3 +-
 src/box/replication.cc       |  1 +
 src/box/replication.h        |  3 ++
 src/box/vclock.c             | 14 +++++++++
 src/box/vclock.h             |  3 ++
 src/box/wal.c                | 38 +++++-----------------
 test/xlog-py/dup_key.result  | 12 +++++--
 test/xlog-py/dup_key.test.py | 18 ++++++++---
 9 files changed, 86 insertions(+), 67 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ff4af95e5..1c6ed878d 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
 			coio_write_xrow(&io, &xrow);
 		} catch (SocketError *e) {
 			/*
@@ -300,7 +300,7 @@ applier_join(struct applier *applier)
 		 * Used to initialize the replica's initial
 		 * vclock in bootstrap_from_master()
 		 */
-		xrow_decode_vclock_xc(&row, &replicaset.vclock);
+		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
 	}
 
 	applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -326,7 +326,8 @@ applier_join(struct applier *applier)
 				 * vclock yet, do it now. In 1.7+
 				 * this vclock is not used.
 				 */
-				xrow_decode_vclock_xc(&row, &replicaset.vclock);
+				xrow_decode_vclock_xc(&row,
+						      &replicaset.applier.vclock);
 			}
 			break; /* end of stream */
 		} else if (iproto_type_is_error(row.type)) {
@@ -336,6 +337,7 @@ applier_join(struct applier *applier)
 				  (uint32_t) row.type);
 		}
 	}
+	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
 	say_info("initial data received");
 
 	applier_set_state(applier, APPLIER_FINAL_JOIN);
@@ -355,7 +357,7 @@ applier_join(struct applier *applier)
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			xstream_write_xc(applier->subscribe_stream, &row);
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
@@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
 {
 	assert(applier->subscribe_stream != NULL);
 
+	if (!vclock_is_set(&replicaset.applier.vclock))
+		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Stay 'orphan' until appliers catch up with
-		 * the remote vclock at the time of SUBSCRIBE
-		 * and the lag is less than configured.
-		 */
-		if (applier->state == APPLIER_SYNC &&
-		    applier->lag <= replication_sync_lag &&
-		    vclock_compare(&remote_vclock_at_subscribe,
-				   &replicaset.vclock) <= 0) {
-			/* Applier is synced, switch to "follow". */
-			applier_set_state(applier, APPLIER_FOLLOW);
-		}
-
 		/*
 		 * Tarantool < 1.7.7 does not send periodic heartbeat
 		 * messages so we can't assume that if we haven't heard
@@ -512,16 +504,12 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
+		if (vclock_get(&replicaset.applier.vclock,
+			       row.replica_id) < row.lsn) {
+			/* Preserve old lsn value. */
+			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
+						     row.replica_id);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			struct replica *replica = replica_by_id(row.replica_id);
 			struct latch *latch = (replica ? &replica->order_latch :
 					       &replicaset.applier.order_latch);
@@ -550,10 +538,27 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					/* Rollback lsn to have a chance for a retry. */
+					vclock_set(&replicaset.applier.vclock,
+						   row.replica_id, old_lsn);
 					diag_raise();
+				}
 			}
 		}
+		/*
+		 * Stay 'orphan' until appliers catch up with
+		 * the remote vclock at the time of SUBSCRIBE
+		 * and the lag is less than configured.
+		 */
+		if (applier->state == APPLIER_SYNC &&
+		    applier->lag <= replication_sync_lag &&
+		    vclock_compare(&remote_vclock_at_subscribe,
+				   &replicaset.vclock) <= 0) {
+			/* Applier is synced, switch to "follow". */
+			applier_set_state(applier, APPLIER_FOLLOW);
+		}
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/box.cc b/src/box/box.cc
index 9f2fd6da1..0df0875dd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
 		       struct journal_entry * /* entry */)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
+	vclock_copy(&replicaset.vclock, journal->vclock);
 	return vclock_sum(journal->vclock);
 }
 
@@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
 	 */
 	engine_begin_final_recovery_xc();
 	struct recovery_journal journal;
-	recovery_journal_create(&journal, &replicaset.vclock);
+	recovery_journal_create(&journal, &replicaset.applier.vclock);
 	journal_set(&journal.base);
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 2cb4ec0f8..51e08886c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,7 @@ replication_init(void)
 	fiber_cond_create(&replicaset.applier.cond);
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
+	vclock_create(&replicaset.applier.vclock);
 }
 
 void
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ac620d86..b9aebed14 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -194,6 +194,9 @@ struct replicaset {
 	struct vclock vclock;
 	/** Applier state. */
 	struct {
+		/**
+		 * Vclock sent to process from appliers. */
+		struct vclock vclock;
 		/**
 		 * Total number of replicas with attached
 		 * appliers.
diff --git a/src/box/vclock.c b/src/box/vclock.c
index b5eb2800b..c297d1ff9 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -36,6 +36,20 @@
 
 #include "diag.h"
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
+{
+	assert(lsn >= 0);
+	assert(replica_id < VCLOCK_MAX);
+	int64_t prev_lsn = vclock->lsn[replica_id];
+	if (lsn > 0)
+		vclock->map |= 1 << replica_id;
+	else
+		vclock->map &= ~(1 << replica_id);
+	vclock->lsn[replica_id] = lsn;
+	vclock->signature += lsn - prev_lsn;
+}
+
 int64_t
 vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
 {
diff --git a/src/box/vclock.h b/src/box/vclock.h
index 111e29160..d6cb14c2a 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
 	return vclock->lsn[replica_id];
 }
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+
 static inline int64_t
 vclock_inc(struct vclock *vclock, uint32_t replica_id)
 {
diff --git a/src/box/wal.c b/src/box/wal.c
index a55b544aa..4c3537672 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
 	 * wal_msg memory disappears after the first
 	 * iteration of tx_schedule_queue loop.
 	 */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	if (! stailq_empty(&batch->rollback)) {
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
@@ -1028,6 +1032,8 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/* Set resulting vclock. */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..966fa1f4a 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
 ---
 - [2, 'second tuple']
 ...
-.xlog exists
+.xlog#1 exists
+box.space.test:insert{3, 'third tuple'}
+---
+- [3, 'third tuple']
+...
+box.space.test:insert{4, 'fourth tuple'}
+---
+- [4, 'fourth tuple']
+...
+.xlog#2 exists
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
@@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
 ---
 - [2, 'fourth tuple']
 ...
-.xlog does not exist
 check log line for 'Duplicate key'
 
 'Duplicate key' exists in server log
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 1c033da40..3dacde771 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -26,19 +26,27 @@ server.stop()
 
 # Save wal#1
 if os.access(wal, os.F_OK):
-    print ".xlog exists"
+    print ".xlog#1 exists"
     os.rename(wal, wal_old)
 
-# Write wal#2
+# Write wal#2 to bump lsn
+server.start()
+server.admin("box.space.test:insert{3, 'third tuple'}")
+server.admin("box.space.test:insert{4, 'fourth tuple'}")
+server.stop()
+
+if os.access(wal, os.F_OK):
+    print ".xlog#2 exists"
+
+# Write wal#3 - confliction with wal#1
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
 # Restore wal#1
-if not os.access(wal, os.F_OK):
-    print ".xlog does not exist"
-    os.rename(wal_old, wal)
+os.unlink(wal)
+os.rename(wal_old, wal)
 
 server.start()
 line = 'Duplicate key'
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH 3/5] Enforce applier out of order protection
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal Georgy Kirichenko
@ 2019-01-04 10:34 ` Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 4/5] Emit NOP if an applier skips row Georgy Kirichenko
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Do not skip replication row until the row is not processed by other appliers.

Needed for: #980
---
 src/box/applier.cc | 35 ++++++++++++++++++-----------------
 1 file changed, 18 insertions(+), 17 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 1c6ed878d..fbceadb2b 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -504,30 +504,29 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
+		struct replica *replica = replica_by_id(row.replica_id);
+		struct latch *latch = (replica ? &replica->order_latch :
+				       &replicaset.applier.order_latch);
+		/*
+		 * In a full mesh topology, the same set
+		 * of changes may arrive via two
+		 * concurrently running appliers. Thanks
+		 * to vclock_follow() above, the first row
+		 * in the set will be skipped - but the
+		 * remaining may execute out of order,
+		 * when the following xstream_write()
+		 * yields on WAL. Hence we need a latch to
+		 * strictly order all changes which belong
+		 * to the same server id.
+		 */
+		latch_lock(latch);
 		if (vclock_get(&replicaset.applier.vclock,
 			       row.replica_id) < row.lsn) {
 			/* Preserve old lsn value. */
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
 						     row.replica_id);
 			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			struct replica *replica = replica_by_id(row.replica_id);
-			struct latch *latch = (replica ? &replica->order_latch :
-					       &replicaset.applier.order_latch);
-			/*
-			 * In a full mesh topology, the same set
-			 * of changes may arrive via two
-			 * concurrently running appliers. Thanks
-			 * to vclock_follow() above, the first row
-			 * in the set will be skipped - but the
-			 * remaining may execute out of order,
-			 * when the following xstream_write()
-			 * yields on WAL. Hence we need a latch to
-			 * strictly order all changes which belong
-			 * to the same server id.
-			 */
-			latch_lock(latch);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
@@ -542,10 +541,12 @@ applier_subscribe(struct applier *applier)
 					/* Rollback lsn to have a chance for a retry. */
 					vclock_set(&replicaset.applier.vclock,
 						   row.replica_id, old_lsn);
+					latch_unlock(latch);
 					diag_raise();
 				}
 			}
 		}
+		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
 		 * the remote vclock at the time of SUBSCRIBE
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH 4/5] Emit NOP if an applier skips row
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 3/5] Enforce applier out of order protection Georgy Kirichenko
@ 2019-01-04 10:34 ` Georgy Kirichenko
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
  2019-01-11 13:31 ` [tarantool-patches] Re: [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Fill lsn gaps with NOPs if applier configured to skip conflicting
rows.

Needed for: #980
---
 src/box/applier.cc | 29 ++++++++++++++++-------------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index fbceadb2b..b05d72c73 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -527,23 +527,26 @@ applier_subscribe(struct applier *applier)
 						     row.replica_id);
 			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
+			struct error *e = diag_last_error(diag_get());
+			if (res != 0 && e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
 				/**
 				 * Silently skip ER_TUPLE_FOUND error if such
 				 * option is set in config.
 				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict)
-					diag_clear(diag_get());
-				else {
-					/* Rollback lsn to have a chance for a retry. */
-					vclock_set(&replicaset.applier.vclock,
-						   row.replica_id, old_lsn);
-					latch_unlock(latch);
-					diag_raise();
-				}
+				diag_clear(diag_get());
+				row.type = IPROTO_NOP;
+				row.bodycnt = 0;
+				res = xstream_write(applier->subscribe_stream,
+						    &row);
+			}
+			if (res != 0) {
+				/* Rollback lsn to have a chance for a retry. */
+				vclock_set(&replicaset.applier.vclock,
+					   row.replica_id, old_lsn);
+				latch_unlock(latch);
+				diag_raise();
 			}
 		}
 		latch_unlock(latch);
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] [PATCH 5/5] Disallow lsn gaps while vclock following
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 4/5] Emit NOP if an applier skips row Georgy Kirichenko
@ 2019-01-04 10:34 ` Georgy Kirichenko
  2019-01-11 13:31 ` [tarantool-patches] Re: [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-04 10:34 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Only one-step vclock following is allowed. This enforces wal and
replication consistency against out of order execution.

Needed for: #980
---
 src/box/replication.cc |  1 +
 src/box/vclock.c       |  2 +-
 src/box/xrow.c         |  2 +-
 test/unit/vclock.cc    | 10 +++++-----
 4 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/src/box/replication.cc b/src/box/replication.cc
index 51e08886c..ee92a941b 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,6 +91,7 @@ replication_init(void)
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
 	vclock_create(&replicaset.applier.vclock);
+	vclock_clear(&replicaset.applier.vclock);
 }
 
 void
diff --git a/src/box/vclock.c b/src/box/vclock.c
index c297d1ff9..807da9109 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -56,7 +56,7 @@ vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
 	assert(lsn >= 0);
 	assert(replica_id < VCLOCK_MAX);
 	int64_t prev_lsn = vclock->lsn[replica_id];
-	assert(lsn > prev_lsn);
+	assert(lsn == prev_lsn + 1);
 	/* Easier add each time than check. */
 	vclock->map |= 1 << replica_id;
 	vclock->lsn[replica_id] = lsn;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 67019a68d..ef3f81add 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -83,7 +83,7 @@ mp_decode_vclock(const char **data, struct vclock *vclock)
 			return -1;
 		int64_t lsn = mp_decode_uint(data);
 		if (lsn > 0)
-			vclock_follow(vclock, id, lsn);
+			vclock_set(vclock, id, lsn);
 	}
 	return 0;
 }
diff --git a/test/unit/vclock.cc b/test/unit/vclock.cc
index 8498eba3b..6a8d498ef 100644
--- a/test/unit/vclock.cc
+++ b/test/unit/vclock.cc
@@ -50,11 +50,11 @@ test_compare_one(uint32_t a_count, const int64_t *lsns_a,
 	vclock_create(&b);
 	for (uint32_t node_id = 0; node_id < a_count; node_id++) {
 		if (lsns_a[node_id] > 0)
-			vclock_follow(&a, node_id, lsns_a[node_id]);
+			vclock_set(&a, node_id, lsns_a[node_id]);
 	}
 	for (uint32_t node_id = 0; node_id < b_count; node_id++) {
 		if (lsns_b[node_id] > 0)
-			vclock_follow(&b, node_id, lsns_b[node_id]);
+			vclock_set(&b, node_id, lsns_b[node_id]);
 	}
 
 	return vclock_compare(&a, &b);
@@ -119,7 +119,7 @@ testset_create(vclockset_t *set, int64_t *files, int files_n, int node_n)
 			signature += lsn;
 
 			/* Update cluster hash */
-			vclock_follow(vclock, node_id, lsn);
+			vclock_set(vclock, node_id, lsn);
 		}
 		vclockset_insert(set, vclock);
 	}
@@ -225,7 +225,7 @@ test_isearch()
 			if (lsn <= 0)
 				continue;
 
-			vclock_follow(&vclock, node_id, lsn);
+			vclock_set(&vclock, node_id, lsn);
 		}
 
 		int64_t check = *(query + NODE_N);
@@ -247,7 +247,7 @@ test_tostring_one(uint32_t count, const int64_t *lsns, const char *res)
 	vclock_create(&vclock);
 	for (uint32_t node_id = 0; node_id < count; node_id++) {
 		if (lsns[node_id] > 0)
-			vclock_follow(&vclock, node_id, lsns[node_id]);
+			vclock_set(&vclock, node_id, lsns[node_id]);
 	}
 	char *str = vclock_to_string(&vclock);
 	int result = strcmp(str, res);
-- 
2.20.1

^ permalink raw reply	[flat|nested] 7+ messages in thread

* [tarantool-patches] Re: [PATCH 0/5] Strong sequentially LSN in journal
  2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-01-04 10:34 ` [tarantool-patches] [PATCH 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
@ 2019-01-11 13:31 ` Georgy Kirichenko
  5 siblings, 0 replies; 7+ messages in thread
From: Georgy Kirichenko @ 2019-01-11 13:31 UTC (permalink / raw)
  To: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 1604 bytes --]

Branch: https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-980-disable-lsn-gaps
Issue: https://github.com/tarantool/tarantool/issues/980

On Friday, January 4, 2019 1:34:10 PM MSK you wrote:
> The patchset eliminates LSN gaps from journal what is needed for
> synchronous replication to enforce data stream consistency.
> Replicaset vclock is promoted only after write, replication row applying
> is allowed only if previous row was processed. All conflicting rows to skip
> are replacing with NOPs to fill gaps. After all vclock following
> protected with assert to ensure that this is strict one step increase.
> 
> Needed for: #980
> 
> Georgy Kirichenko (5):
>   Do not promote wal vclock for failed writes
>   Update replicaset vclock from wal
>   Enforce applier out of order protection
>   Emit NOP if an applier skips row
>   Disallow lsn gaps while vclock following
> 
>  src/box/applier.cc                | 113 ++++++++++++++++--------------
>  src/box/box.cc                    |   3 +-
>  src/box/replication.cc            |   2 +
>  src/box/replication.h             |   3 +
>  src/box/vclock.c                  |  16 ++++-
>  src/box/vclock.h                  |   3 +
>  src/box/wal.c                     |  57 ++++++---------
>  src/box/xrow.c                    |   2 +-
>  test/unit/vclock.cc               |  10 +--
>  test/xlog-py/dup_key.result       |  12 +++-
>  test/xlog-py/dup_key.test.py      |  18 +++--
>  test/xlog/errinj.result           |   1 -
>  test/xlog/panic_on_lsn_gap.result |  65 +++++++++--------
>  13 files changed, 167 insertions(+), 138 deletions(-)


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2019-01-11 13:31 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-04 10:34 [tarantool-patches] [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko
2019-01-04 10:34 ` [tarantool-patches] [PATCH 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
2019-01-04 10:34 ` [tarantool-patches] [PATCH 2/5] Update replicaset vclock from wal Georgy Kirichenko
2019-01-04 10:34 ` [tarantool-patches] [PATCH 3/5] Enforce applier out of order protection Georgy Kirichenko
2019-01-04 10:34 ` [tarantool-patches] [PATCH 4/5] Emit NOP if an applier skips row Georgy Kirichenko
2019-01-04 10:34 ` [tarantool-patches] [PATCH 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
2019-01-11 13:31 ` [tarantool-patches] Re: [PATCH 0/5] Strong sequentially LSN in journal Georgy Kirichenko

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox