Tarantool development patches archive
 help / color / mirror / Atom feed
* [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal
@ 2019-01-22 10:31 Georgy Kirichenko
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
                   ` (5 more replies)
  0 siblings, 6 replies; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

The patchset eliminates LSN gaps from journal what is needed for
synchronous replication to enforce data stream consistency.
Replicaset vclock is promoted only after write, replication row applying
is allowed only if previous row was processed. All conflicting rows to
skip are replacing with NOPs to fill gaps. After all vclock following
protected with assert to ensure that this is strict one step increase.

Needed for: #980

Changes in v2:
  - Rebased against latest 2.1
  - Fix handling rows which were returned back

Issue: https://github.com/tarantool/tarantool/issues/980
Branch:https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-980-disable-lsn-gaps

Georgy Kirichenko (5):
  Do not promote wal vclock for failed writes
  Update replicaset vclock from wal
  Enforce applier out of order protection
  Emit NOP if an applier skips row
  Disallow lsn gaps while vclock following

 src/box/applier.cc                | 120 +++++++++++++++++-------------
 src/box/box.cc                    |   3 +-
 src/box/replication.cc            |   2 +
 src/box/replication.h             |   3 +
 src/box/vclock.c                  |  16 +++-
 src/box/vclock.h                  |   3 +
 src/box/wal.c                     |  57 +++++---------
 src/box/xrow.c                    |   2 +-
 test/unit/vclock.cc               |  10 +--
 test/xlog-py/dup_key.result       |  12 ++-
 test/xlog-py/dup_key.test.py      |  18 +++--
 test/xlog/errinj.result           |   1 -
 test/xlog/panic_on_lsn_gap.result |  65 ++++++++--------
 13 files changed, 174 insertions(+), 138 deletions(-)

-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
@ 2019-01-22 10:31 ` Georgy Kirichenko
  2019-01-28 11:20   ` Vladimir Davydov
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal Georgy Kirichenko
                   ` (4 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Increase replica lsn only if row was successfully written to disk. This
prevents wal from lsn gaps in case of IO errors and enforces wal
consistency.

Needs for #980
---
 src/box/wal.c                     | 19 ++++++---
 test/xlog/errinj.result           |  1 -
 test/xlog/panic_on_lsn_gap.result | 65 +++++++++++++++----------------
 3 files changed, 45 insertions(+), 40 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 3b50d3629..a55b544aa 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
 }
 
 static void
-wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
+wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
 	       struct xrow_header **end)
 {
 	/** Assign LSN to all local rows. */
 	for ( ; row < end; row++) {
 		if ((*row)->replica_id == 0) {
-			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
+			(*row)->lsn = vclock_inc(vclock, instance_id);
 			(*row)->replica_id = instance_id;
 		} else {
-			vclock_follow_xrow(&writer->vclock, *row);
+			vclock_follow_xrow(vclock, *row);
 		}
 	}
 }
@@ -922,6 +922,11 @@ wal_write_to_disk(struct cmsg *msg)
 	struct wal_msg *wal_msg = (struct wal_msg *) msg;
 	struct error *error;
 
+	/* Local vclock copy. */
+	struct vclock vclock;
+	vclock_create(&vclock);
+	vclock_copy(&vclock, &writer->vclock);
+
 	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
 	while (inj != NULL && inj->bparam)
 		usleep(10);
@@ -974,14 +979,15 @@ wal_write_to_disk(struct cmsg *msg)
 	struct journal_entry *entry;
 	struct stailq_entry *last_committed = NULL;
 	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
-		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
-		entry->res = vclock_sum(&writer->vclock);
+		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
+		entry->res = vclock_sum(&vclock);
 		rc = xlog_write_entry(l, entry);
 		if (rc < 0)
 			goto done;
 		if (rc > 0) {
 			writer->checkpoint_wal_size += rc;
 			last_committed = &entry->fifo;
+			vclock_copy(&writer->vclock, &vclock);
 		}
 		/* rc == 0: the write is buffered in xlog_tx */
 	}
@@ -991,6 +997,7 @@ wal_write_to_disk(struct cmsg *msg)
 
 	writer->checkpoint_wal_size += rc;
 	last_committed = stailq_last(&wal_msg->commit);
+	vclock_copy(&writer->vclock, &vclock);
 
 	/*
 	 * Notify TX if the checkpoint threshold has been exceeded.
@@ -1185,7 +1192,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 			   struct journal_entry *entry)
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
-	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
+	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
 	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
 	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
 	if (new_lsn > old_lsn) {
diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
index 390404b47..7f15bef35 100644
--- a/test/xlog/errinj.result
+++ b/test/xlog/errinj.result
@@ -43,7 +43,6 @@ require('fio').glob(name .. "/*.xlog")
 ---
 - - xlog/00000000000000000000.xlog
   - xlog/00000000000000000001.xlog
-  - xlog/00000000000000000002.xlog
 ...
 test_run:cmd('restart server default with cleanup=1')
 -- gh-881 iproto request with wal IO error
diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
index 4dd1291f8..8054baab4 100644
--- a/test/xlog/panic_on_lsn_gap.result
+++ b/test/xlog/panic_on_lsn_gap.result
@@ -105,7 +105,7 @@ test_run:cmd("restart server panic")
 --
 box.info.vclock
 ---
-- {1: 11}
+- {1: 1}
 ...
 box.space._schema:select{'key'}
 ---
@@ -153,7 +153,7 @@ t
 ...
 box.info.vclock
 ---
-- {1: 11}
+- {1: 1}
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", false)
 ---
@@ -176,12 +176,12 @@ s:replace{'key', 'test 2'}
 --
 box.info.vclock
 ---
-- {1: 22}
+- {1: 2}
 ...
 test_run:cmd("restart server panic")
 box.info.vclock
 ---
-- {1: 22}
+- {1: 2}
 ...
 box.space._schema:select{'key'}
 ---
@@ -194,8 +194,8 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
 ...
 -- now insert 10 rows - so that the next
 -- row will need to switch the WAL
@@ -217,8 +217,8 @@ test_run:cmd("setopt delimiter ''");
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", true)
 ---
@@ -230,14 +230,14 @@ box.space._schema:replace{"key", 'test 3'}
 ...
 box.info.vclock
 ---
-- {1: 32}
+- {1: 12}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 -- and the next one (just to be sure
 box.space._schema:replace{"key", 'test 3'}
@@ -246,14 +246,14 @@ box.space._schema:replace{"key", 'test 3'}
 ...
 box.info.vclock
 ---
-- {1: 32}
+- {1: 12}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 box.error.injection.set("ERRINJ_WAL_WRITE", false)
 ---
@@ -266,14 +266,14 @@ box.space._schema:replace{"key", 'test 4'}
 ...
 box.info.vclock
 ---
-- {1: 35}
+- {1: 13}
 ...
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
 ...
 -- restart is ok
 test_run:cmd("restart server panic")
@@ -332,12 +332,12 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
-  - panic/00000000000000000035.xlog
-  - panic/00000000000000000037.xlog
-  - panic/00000000000000000039.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
+  - panic/00000000000000000013.xlog
+  - panic/00000000000000000014.xlog
+  - panic/00000000000000000015.xlog
 ...
 test_run:cmd("restart server panic")
 box.space._schema:select{'key'}
@@ -355,13 +355,12 @@ name = string.match(arg[0], "([^,]+)%.lua")
 require('fio').glob(name .. "/*.xlog")
 ---
 - - panic/00000000000000000000.xlog
-  - panic/00000000000000000011.xlog
-  - panic/00000000000000000022.xlog
-  - panic/00000000000000000032.xlog
-  - panic/00000000000000000035.xlog
-  - panic/00000000000000000037.xlog
-  - panic/00000000000000000039.xlog
-  - panic/00000000000000000040.xlog
+  - panic/00000000000000000001.xlog
+  - panic/00000000000000000002.xlog
+  - panic/00000000000000000012.xlog
+  - panic/00000000000000000013.xlog
+  - panic/00000000000000000014.xlog
+  - panic/00000000000000000015.xlog
 ...
 test_run:cmd('switch default')
 ---
-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
@ 2019-01-22 10:31 ` Georgy Kirichenko
  2019-01-28 11:59   ` Vladimir Davydov
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection Georgy Kirichenko
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Journal maintains replicaset vclock for recovery, local and replicated
operations. Introduce replicaset.applier.vclock to prevent applier
races.

Prerequisite #980
---
 src/box/applier.cc           | 68 +++++++++++++++++++++---------------
 src/box/box.cc               |  3 +-
 src/box/replication.cc       |  1 +
 src/box/replication.h        |  3 ++
 src/box/vclock.c             | 14 ++++++++
 src/box/vclock.h             |  3 ++
 src/box/wal.c                | 38 ++++----------------
 test/xlog-py/dup_key.result  | 12 +++++--
 test/xlog-py/dup_key.test.py | 18 +++++++---
 9 files changed, 93 insertions(+), 67 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 21d2e6bcb..87873e970 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
 			continue;
 		try {
 			struct xrow_header xrow;
-			xrow_encode_vclock(&xrow, &replicaset.vclock);
+			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
 			coio_write_xrow(&io, &xrow);
 		} catch (SocketError *e) {
 			/*
@@ -300,7 +300,7 @@ applier_join(struct applier *applier)
 		 * Used to initialize the replica's initial
 		 * vclock in bootstrap_from_master()
 		 */
-		xrow_decode_vclock_xc(&row, &replicaset.vclock);
+		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
 	}
 
 	applier_set_state(applier, APPLIER_INITIAL_JOIN);
@@ -326,7 +326,8 @@ applier_join(struct applier *applier)
 				 * vclock yet, do it now. In 1.7+
 				 * this vclock is not used.
 				 */
-				xrow_decode_vclock_xc(&row, &replicaset.vclock);
+				xrow_decode_vclock_xc(&row,
+						      &replicaset.applier.vclock);
 			}
 			break; /* end of stream */
 		} else if (iproto_type_is_error(row.type)) {
@@ -336,6 +337,7 @@ applier_join(struct applier *applier)
 				  (uint32_t) row.type);
 		}
 	}
+	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
 	say_info("initial data received");
 
 	applier_set_state(applier, APPLIER_FINAL_JOIN);
@@ -355,7 +357,7 @@ applier_join(struct applier *applier)
 		coio_read_xrow(coio, ibuf, &row);
 		applier->last_row_time = ev_monotonic_now(loop());
 		if (iproto_type_is_dml(row.type)) {
-			vclock_follow_xrow(&replicaset.vclock, &row);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			xstream_write_xc(applier->subscribe_stream, &row);
 			if (++row_count % 100000 == 0)
 				say_info("%.1fM rows received", row_count / 1e6);
@@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
 {
 	assert(applier->subscribe_stream != NULL);
 
+	if (!vclock_is_set(&replicaset.applier.vclock))
+		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
+
 	/* Send SUBSCRIBE request */
 	struct ev_io *coio = &applier->io;
 	struct ibuf *ibuf = &applier->ibuf;
@@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
 			applier_set_state(applier, APPLIER_FOLLOW);
 		}
 
-		/*
-		 * Stay 'orphan' until appliers catch up with
-		 * the remote vclock at the time of SUBSCRIBE
-		 * and the lag is less than configured.
-		 */
-		if (applier->state == APPLIER_SYNC &&
-		    applier->lag <= replication_sync_lag &&
-		    vclock_compare(&remote_vclock_at_subscribe,
-				   &replicaset.vclock) <= 0) {
-			/* Applier is synced, switch to "follow". */
-			applier_set_state(applier, APPLIER_FOLLOW);
-		}
-
 		/*
 		 * Tarantool < 1.7.7 does not send periodic heartbeat
 		 * messages so we can't assume that if we haven't heard
@@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
-		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
+		if (vclock_get(&replicaset.applier.vclock,
+			       row.replica_id) < row.lsn) {
+			if (row.replica_id == instance_id &&
+			    vclock_get(&replicaset.vclock, instance_id) <
+			    row.lsn) {
+				/* Local row returned back. */
+				goto done;
+			}
+			/* Preserve old lsn value. */
+			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
+						     row.replica_id);
+			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			struct replica *replica = replica_by_id(row.replica_id);
 			struct latch *latch = (replica ? &replica->order_latch :
 					       &replicaset.applier.order_latch);
@@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					/* Rollback lsn to have a chance for a retry. */
+					vclock_set(&replicaset.applier.vclock,
+						   row.replica_id, old_lsn);
 					diag_raise();
+				}
 			}
 		}
+done:
+		/*
+		 * Stay 'orphan' until appliers catch up with
+		 * the remote vclock at the time of SUBSCRIBE
+		 * and the lag is less than configured.
+		 */
+		if (applier->state == APPLIER_SYNC &&
+		    applier->lag <= replication_sync_lag &&
+		    vclock_compare(&remote_vclock_at_subscribe,
+				   &replicaset.vclock) <= 0) {
+			/* Applier is synced, switch to "follow". */
+			applier_set_state(applier, APPLIER_FOLLOW);
+		}
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/box.cc b/src/box/box.cc
index 9f2fd6da1..0df0875dd 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
 		       struct journal_entry * /* entry */)
 {
 	struct recovery_journal *journal = (struct recovery_journal *) base;
+	vclock_copy(&replicaset.vclock, journal->vclock);
 	return vclock_sum(journal->vclock);
 }
 
@@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
 	 */
 	engine_begin_final_recovery_xc();
 	struct recovery_journal journal;
-	recovery_journal_create(&journal, &replicaset.vclock);
+	recovery_journal_create(&journal, &replicaset.applier.vclock);
 	journal_set(&journal.base);
 
 	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 2cb4ec0f8..51e08886c 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -90,6 +90,7 @@ replication_init(void)
 	fiber_cond_create(&replicaset.applier.cond);
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
+	vclock_create(&replicaset.applier.vclock);
 }
 
 void
diff --git a/src/box/replication.h b/src/box/replication.h
index 2ac620d86..b9aebed14 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -194,6 +194,9 @@ struct replicaset {
 	struct vclock vclock;
 	/** Applier state. */
 	struct {
+		/**
+		 * Vclock sent to process from appliers. */
+		struct vclock vclock;
 		/**
 		 * Total number of replicas with attached
 		 * appliers.
diff --git a/src/box/vclock.c b/src/box/vclock.c
index b5eb2800b..c297d1ff9 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -36,6 +36,20 @@
 
 #include "diag.h"
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
+{
+	assert(lsn >= 0);
+	assert(replica_id < VCLOCK_MAX);
+	int64_t prev_lsn = vclock->lsn[replica_id];
+	if (lsn > 0)
+		vclock->map |= 1 << replica_id;
+	else
+		vclock->map &= ~(1 << replica_id);
+	vclock->lsn[replica_id] = lsn;
+	vclock->signature += lsn - prev_lsn;
+}
+
 int64_t
 vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
 {
diff --git a/src/box/vclock.h b/src/box/vclock.h
index 111e29160..d6cb14c2a 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
 	return vclock->lsn[replica_id];
 }
 
+void
+vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
+
 static inline int64_t
 vclock_inc(struct vclock *vclock, uint32_t replica_id)
 {
diff --git a/src/box/wal.c b/src/box/wal.c
index a55b544aa..4c3537672 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
 	 * wal_msg memory disappears after the first
 	 * iteration of tx_schedule_queue loop.
 	 */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	if (! stailq_empty(&batch->rollback)) {
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
@@ -1028,6 +1032,8 @@ done:
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/* Set resulting vclock. */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
index f387e8e89..966fa1f4a 100644
--- a/test/xlog-py/dup_key.result
+++ b/test/xlog-py/dup_key.result
@@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
 ---
 - [2, 'second tuple']
 ...
-.xlog exists
+.xlog#1 exists
+box.space.test:insert{3, 'third tuple'}
+---
+- [3, 'third tuple']
+...
+box.space.test:insert{4, 'fourth tuple'}
+---
+- [4, 'fourth tuple']
+...
+.xlog#2 exists
 box.space.test:insert{1, 'third tuple'}
 ---
 - [1, 'third tuple']
@@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
 ---
 - [2, 'fourth tuple']
 ...
-.xlog does not exist
 check log line for 'Duplicate key'
 
 'Duplicate key' exists in server log
diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
index 1c033da40..3dacde771 100644
--- a/test/xlog-py/dup_key.test.py
+++ b/test/xlog-py/dup_key.test.py
@@ -26,19 +26,27 @@ server.stop()
 
 # Save wal#1
 if os.access(wal, os.F_OK):
-    print ".xlog exists"
+    print ".xlog#1 exists"
     os.rename(wal, wal_old)
 
-# Write wal#2
+# Write wal#2 to bump lsn
+server.start()
+server.admin("box.space.test:insert{3, 'third tuple'}")
+server.admin("box.space.test:insert{4, 'fourth tuple'}")
+server.stop()
+
+if os.access(wal, os.F_OK):
+    print ".xlog#2 exists"
+
+# Write wal#3 - confliction with wal#1
 server.start()
 server.admin("box.space.test:insert{1, 'third tuple'}")
 server.admin("box.space.test:insert{2, 'fourth tuple'}")
 server.stop()
 
 # Restore wal#1
-if not os.access(wal, os.F_OK):
-    print ".xlog does not exist"
-    os.rename(wal_old, wal)
+os.unlink(wal)
+os.rename(wal_old, wal)
 
 server.start()
 line = 'Duplicate key'
-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal Georgy Kirichenko
@ 2019-01-22 10:31 ` Georgy Kirichenko
  2019-01-28 12:09   ` Vladimir Davydov
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row Georgy Kirichenko
                   ` (2 subsequent siblings)
  5 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Do not skip row until the row is not processed by other appliers.

Prerequisite #980
---
 src/box/applier.cc | 35 ++++++++++++++++++-----------------
 1 file changed, 18 insertions(+), 17 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 87873e970..148c8ce5a 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -504,6 +504,22 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
+		struct replica *replica = replica_by_id(row.replica_id);
+		struct latch *latch = (replica ? &replica->order_latch :
+				       &replicaset.applier.order_latch);
+		/*
+		 * In a full mesh topology, the same set
+		 * of changes may arrive via two
+		 * concurrently running appliers. Thanks
+		 * to vclock_follow() above, the first row
+		 * in the set will be skipped - but the
+		 * remaining may execute out of order,
+		 * when the following xstream_write()
+		 * yields on WAL. Hence we need a latch to
+		 * strictly order all changes which belong
+		 * to the same server id.
+		 */
+		latch_lock(latch);
 		if (vclock_get(&replicaset.applier.vclock,
 			       row.replica_id) < row.lsn) {
 			if (row.replica_id == instance_id &&
@@ -516,24 +532,7 @@ applier_subscribe(struct applier *applier)
 			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
 						     row.replica_id);
 			vclock_follow_xrow(&replicaset.applier.vclock, &row);
-			struct replica *replica = replica_by_id(row.replica_id);
-			struct latch *latch = (replica ? &replica->order_latch :
-					       &replicaset.applier.order_latch);
-			/*
-			 * In a full mesh topology, the same set
-			 * of changes may arrive via two
-			 * concurrently running appliers. Thanks
-			 * to vclock_follow() above, the first row
-			 * in the set will be skipped - but the
-			 * remaining may execute out of order,
-			 * when the following xstream_write()
-			 * yields on WAL. Hence we need a latch to
-			 * strictly order all changes which belong
-			 * to the same server id.
-			 */
-			latch_lock(latch);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
@@ -548,11 +547,13 @@ applier_subscribe(struct applier *applier)
 					/* Rollback lsn to have a chance for a retry. */
 					vclock_set(&replicaset.applier.vclock,
 						   row.replica_id, old_lsn);
+					latch_unlock(latch);
 					diag_raise();
 				}
 			}
 		}
 done:
+		latch_unlock(latch);
 		/*
 		 * Stay 'orphan' until appliers catch up with
 		 * the remote vclock at the time of SUBSCRIBE
-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (2 preceding siblings ...)
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection Georgy Kirichenko
@ 2019-01-22 10:31 ` Georgy Kirichenko
  2019-01-28 12:15   ` Vladimir Davydov
  2019-02-08 16:50   ` [tarantool-patches] " Konstantin Osipov
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
  2019-01-28 11:15 ` [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Vladimir Davydov
  5 siblings, 2 replies; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Fill lsn gaps with NOP rows if applier configured to skip conflicting
rows. This enforces wal consistency.

Prerequisite #980
---
 src/box/applier.cc | 29 ++++++++++++++++-------------
 1 file changed, 16 insertions(+), 13 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 148c8ce5a..adbe88679 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -533,23 +533,26 @@ applier_subscribe(struct applier *applier)
 						     row.replica_id);
 			vclock_follow_xrow(&replicaset.applier.vclock, &row);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			if (res != 0) {
-				struct error *e = diag_last_error(diag_get());
+			struct error *e = diag_last_error(diag_get());
+			if (res != 0 && e->type == &type_ClientError &&
+			    box_error_code(e) == ER_TUPLE_FOUND &&
+			    replication_skip_conflict) {
 				/**
 				 * Silently skip ER_TUPLE_FOUND error if such
 				 * option is set in config.
 				 */
-				if (e->type == &type_ClientError &&
-				    box_error_code(e) == ER_TUPLE_FOUND &&
-				    replication_skip_conflict)
-					diag_clear(diag_get());
-				else {
-					/* Rollback lsn to have a chance for a retry. */
-					vclock_set(&replicaset.applier.vclock,
-						   row.replica_id, old_lsn);
-					latch_unlock(latch);
-					diag_raise();
-				}
+				diag_clear(diag_get());
+				row.type = IPROTO_NOP;
+				row.bodycnt = 0;
+				res = xstream_write(applier->subscribe_stream,
+						    &row);
+			}
+			if (res != 0) {
+				/* Rollback lsn to have a chance for a retry. */
+				vclock_set(&replicaset.applier.vclock,
+					   row.replica_id, old_lsn);
+				latch_unlock(latch);
+				diag_raise();
 			}
 		}
 done:
-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (3 preceding siblings ...)
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row Georgy Kirichenko
@ 2019-01-22 10:31 ` Georgy Kirichenko
  2019-01-28 12:18   ` Vladimir Davydov
  2019-01-28 11:15 ` [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Vladimir Davydov
  5 siblings, 1 reply; 18+ messages in thread
From: Georgy Kirichenko @ 2019-01-22 10:31 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

Only one-step vclock following is allowed. This enforces wal and
replication consistency against out of order execution.

Prerequisite #980
---
 src/box/replication.cc |  1 +
 src/box/vclock.c       |  2 +-
 src/box/xrow.c         |  2 +-
 test/unit/vclock.cc    | 10 +++++-----
 4 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/src/box/replication.cc b/src/box/replication.cc
index 51e08886c..ee92a941b 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -91,6 +91,7 @@ replication_init(void)
 	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
 	latch_create(&replicaset.applier.order_latch);
 	vclock_create(&replicaset.applier.vclock);
+	vclock_clear(&replicaset.applier.vclock);
 }
 
 void
diff --git a/src/box/vclock.c b/src/box/vclock.c
index c297d1ff9..807da9109 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -56,7 +56,7 @@ vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
 	assert(lsn >= 0);
 	assert(replica_id < VCLOCK_MAX);
 	int64_t prev_lsn = vclock->lsn[replica_id];
-	assert(lsn > prev_lsn);
+	assert(lsn == prev_lsn + 1);
 	/* Easier add each time than check. */
 	vclock->map |= 1 << replica_id;
 	vclock->lsn[replica_id] = lsn;
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 67019a68d..ef3f81add 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -83,7 +83,7 @@ mp_decode_vclock(const char **data, struct vclock *vclock)
 			return -1;
 		int64_t lsn = mp_decode_uint(data);
 		if (lsn > 0)
-			vclock_follow(vclock, id, lsn);
+			vclock_set(vclock, id, lsn);
 	}
 	return 0;
 }
diff --git a/test/unit/vclock.cc b/test/unit/vclock.cc
index 8498eba3b..6a8d498ef 100644
--- a/test/unit/vclock.cc
+++ b/test/unit/vclock.cc
@@ -50,11 +50,11 @@ test_compare_one(uint32_t a_count, const int64_t *lsns_a,
 	vclock_create(&b);
 	for (uint32_t node_id = 0; node_id < a_count; node_id++) {
 		if (lsns_a[node_id] > 0)
-			vclock_follow(&a, node_id, lsns_a[node_id]);
+			vclock_set(&a, node_id, lsns_a[node_id]);
 	}
 	for (uint32_t node_id = 0; node_id < b_count; node_id++) {
 		if (lsns_b[node_id] > 0)
-			vclock_follow(&b, node_id, lsns_b[node_id]);
+			vclock_set(&b, node_id, lsns_b[node_id]);
 	}
 
 	return vclock_compare(&a, &b);
@@ -119,7 +119,7 @@ testset_create(vclockset_t *set, int64_t *files, int files_n, int node_n)
 			signature += lsn;
 
 			/* Update cluster hash */
-			vclock_follow(vclock, node_id, lsn);
+			vclock_set(vclock, node_id, lsn);
 		}
 		vclockset_insert(set, vclock);
 	}
@@ -225,7 +225,7 @@ test_isearch()
 			if (lsn <= 0)
 				continue;
 
-			vclock_follow(&vclock, node_id, lsn);
+			vclock_set(&vclock, node_id, lsn);
 		}
 
 		int64_t check = *(query + NODE_N);
@@ -247,7 +247,7 @@ test_tostring_one(uint32_t count, const int64_t *lsns, const char *res)
 	vclock_create(&vclock);
 	for (uint32_t node_id = 0; node_id < count; node_id++) {
 		if (lsns[node_id] > 0)
-			vclock_follow(&vclock, node_id, lsns[node_id]);
+			vclock_set(&vclock, node_id, lsns[node_id]);
 	}
 	char *str = vclock_to_string(&vclock);
 	int result = strcmp(str, res);
-- 
2.20.1

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal
  2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
                   ` (4 preceding siblings ...)
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
@ 2019-01-28 11:15 ` Vladimir Davydov
  5 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 11:15 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:08PM +0300, Georgy Kirichenko wrote:
> The patchset eliminates LSN gaps from journal what is needed for
> synchronous replication to enforce data stream consistency.

Could you give an example how LSN gaps can break synchronous
replication?

> Replicaset vclock is promoted only after write, replication row applying
> is allowed only if previous row was processed. All conflicting rows to
> skip are replacing with NOPs to fill gaps. After all vclock following
> protected with assert to ensure that this is strict one step increase.

It looks very much like

  https://github.com/tarantool/tarantool/issues/2283

which was done by Serge P. a while back:

  https://www.freelists.org/post/tarantool-patches/PATCH-replication-do-not-skip-masters-rows-in-case-of-an-error

Shouldn't we rather apply his patch? Or there's some crucial difference?

> 
> Needed for: #980
> 
> Changes in v2:
>   - Rebased against latest 2.1
>   - Fix handling rows which were returned back
> 
> Issue: https://github.com/tarantool/tarantool/issues/980
> Branch:https://github.com/tarantool/tarantool/tree/g.kirichenko/gh-980-disable-lsn-gaps
> 
> Georgy Kirichenko (5):
>   Do not promote wal vclock for failed writes
>   Update replicaset vclock from wal
>   Enforce applier out of order protection
>   Emit NOP if an applier skips row
>   Disallow lsn gaps while vclock following

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
@ 2019-01-28 11:20   ` Vladimir Davydov
  2019-01-29 10:22     ` Георгий Кириченко
  0 siblings, 1 reply; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 11:20 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:09PM +0300, Georgy Kirichenko wrote:
> Increase replica lsn only if row was successfully written to disk. This
> prevents wal from lsn gaps in case of IO errors and enforces wal
> consistency.
> 
> Needs for #980
> ---
>  src/box/wal.c                     | 19 ++++++---
>  test/xlog/errinj.result           |  1 -
>  test/xlog/panic_on_lsn_gap.result | 65 +++++++++++++++----------------
>  3 files changed, 45 insertions(+), 40 deletions(-)
> 
> diff --git a/src/box/wal.c b/src/box/wal.c
> index 3b50d3629..a55b544aa 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
>  }
>  
>  static void
> -wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
> +wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
>  	       struct xrow_header **end)
>  {
>  	/** Assign LSN to all local rows. */
>  	for ( ; row < end; row++) {
>  		if ((*row)->replica_id == 0) {
> -			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> +			(*row)->lsn = vclock_inc(vclock, instance_id);
>  			(*row)->replica_id = instance_id;
>  		} else {
> -			vclock_follow_xrow(&writer->vclock, *row);
> +			vclock_follow_xrow(vclock, *row);
>  		}
>  	}
>  }
> @@ -922,6 +922,11 @@ wal_write_to_disk(struct cmsg *msg)
>  	struct wal_msg *wal_msg = (struct wal_msg *) msg;
>  	struct error *error;
>  
> +	/* Local vclock copy. */
> +	struct vclock vclock;
> +	vclock_create(&vclock);
> +	vclock_copy(&vclock, &writer->vclock);
> +
>  	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
>  	while (inj != NULL && inj->bparam)
>  		usleep(10);
> @@ -974,14 +979,15 @@ wal_write_to_disk(struct cmsg *msg)
>  	struct journal_entry *entry;
>  	struct stailq_entry *last_committed = NULL;
>  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> -		wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> -		entry->res = vclock_sum(&writer->vclock);
> +		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry->n_rows);
> +		entry->res = vclock_sum(&vclock);
>  		rc = xlog_write_entry(l, entry);
>  		if (rc < 0)
>  			goto done;
>  		if (rc > 0) {
>  			writer->checkpoint_wal_size += rc;
>  			last_committed = &entry->fifo;
> +			vclock_copy(&writer->vclock, &vclock);

I don't like that you copy a vclock after applying each entry.
Currently, it should be pretty cheap, but in future, when we make
vclock store any number of ids, this might get pretty heavy.
Can we minimize the number of memcpys somehow, ideally do it only
on the rollback path?

>  		}
>  		/* rc == 0: the write is buffered in xlog_tx */
>  	}
> @@ -991,6 +997,7 @@ wal_write_to_disk(struct cmsg *msg)
>  
>  	writer->checkpoint_wal_size += rc;
>  	last_committed = stailq_last(&wal_msg->commit);
> +	vclock_copy(&writer->vclock, &vclock);
>  
>  	/*
>  	 * Notify TX if the checkpoint threshold has been exceeded.
> @@ -1185,7 +1192,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  			   struct journal_entry *entry)
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
> -	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> +	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
>  	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
>  	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
>  	if (new_lsn > old_lsn) {
> diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
> index 390404b47..7f15bef35 100644
> --- a/test/xlog/errinj.result
> +++ b/test/xlog/errinj.result
> @@ -43,7 +43,6 @@ require('fio').glob(name .. "/*.xlog")
>  ---
>  - - xlog/00000000000000000000.xlog
>    - xlog/00000000000000000001.xlog
> -  - xlog/00000000000000000002.xlog
>  ...
>  test_run:cmd('restart server default with cleanup=1')
>  -- gh-881 iproto request with wal IO error
> diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
> index 4dd1291f8..8054baab4 100644
> --- a/test/xlog/panic_on_lsn_gap.result
> +++ b/test/xlog/panic_on_lsn_gap.result
> @@ -105,7 +105,7 @@ test_run:cmd("restart server panic")
>  --
>  box.info.vclock
>  ---
> -- {1: 11}
> +- {1: 1}

After this patch the comments contradict the expected result of this
test. Please fix.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal Georgy Kirichenko
@ 2019-01-28 11:59   ` Vladimir Davydov
  2019-01-29 10:33     ` [tarantool-patches] " Георгий Кириченко
  0 siblings, 1 reply; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 11:59 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote:
> Journal maintains replicaset vclock for recovery, local and replicated
> operations. Introduce replicaset.applier.vclock to prevent applier
> races.

What kind of races?

> 
> Prerequisite #980
> ---
>  src/box/applier.cc           | 68 +++++++++++++++++++++---------------
>  src/box/box.cc               |  3 +-
>  src/box/replication.cc       |  1 +
>  src/box/replication.h        |  3 ++
>  src/box/vclock.c             | 14 ++++++++
>  src/box/vclock.h             |  3 ++
>  src/box/wal.c                | 38 ++++----------------
>  test/xlog-py/dup_key.result  | 12 +++++--
>  test/xlog-py/dup_key.test.py | 18 +++++++---
>  9 files changed, 93 insertions(+), 67 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 21d2e6bcb..87873e970 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
>  			continue;
>  		try {
>  			struct xrow_header xrow;
> -			xrow_encode_vclock(&xrow, &replicaset.vclock);
> +			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
>  			coio_write_xrow(&io, &xrow);
>  		} catch (SocketError *e) {
>  			/*
> @@ -300,7 +300,7 @@ applier_join(struct applier *applier)
>  		 * Used to initialize the replica's initial
>  		 * vclock in bootstrap_from_master()
>  		 */
> -		xrow_decode_vclock_xc(&row, &replicaset.vclock);
> +		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
>  	}
>  
>  	applier_set_state(applier, APPLIER_INITIAL_JOIN);
> @@ -326,7 +326,8 @@ applier_join(struct applier *applier)
>  				 * vclock yet, do it now. In 1.7+
>  				 * this vclock is not used.
>  				 */
> -				xrow_decode_vclock_xc(&row, &replicaset.vclock);
> +				xrow_decode_vclock_xc(&row,
> +						      &replicaset.applier.vclock);
>  			}
>  			break; /* end of stream */
>  		} else if (iproto_type_is_error(row.type)) {
> @@ -336,6 +337,7 @@ applier_join(struct applier *applier)
>  				  (uint32_t) row.type);
>  		}
>  	}
> +	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
>  	say_info("initial data received");
>  
>  	applier_set_state(applier, APPLIER_FINAL_JOIN);
> @@ -355,7 +357,7 @@ applier_join(struct applier *applier)
>  		coio_read_xrow(coio, ibuf, &row);
>  		applier->last_row_time = ev_monotonic_now(loop());
>  		if (iproto_type_is_dml(row.type)) {
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
>  			xstream_write_xc(applier->subscribe_stream, &row);
>  			if (++row_count % 100000 == 0)
>  				say_info("%.1fM rows received", row_count / 1e6);
> @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
>  {
>  	assert(applier->subscribe_stream != NULL);
>  
> +	if (!vclock_is_set(&replicaset.applier.vclock))
> +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> +
>  	/* Send SUBSCRIBE request */
>  	struct ev_io *coio = &applier->io;
>  	struct ibuf *ibuf = &applier->ibuf;
> @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
>  			applier_set_state(applier, APPLIER_FOLLOW);
>  		}
>  
> -		/*
> -		 * Stay 'orphan' until appliers catch up with
> -		 * the remote vclock at the time of SUBSCRIBE
> -		 * and the lag is less than configured.
> -		 */
> -		if (applier->state == APPLIER_SYNC &&
> -		    applier->lag <= replication_sync_lag &&
> -		    vclock_compare(&remote_vclock_at_subscribe,
> -				   &replicaset.vclock) <= 0) {
> -			/* Applier is synced, switch to "follow". */
> -			applier_set_state(applier, APPLIER_FOLLOW);
> -		}
> -

Hmm, why move this chunk?

>  		/*
>  		 * Tarantool < 1.7.7 does not send periodic heartbeat
>  		 * messages so we can't assume that if we haven't heard
> @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
>  
>  		applier->lag = ev_now(loop()) - row.tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> -
> -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> -			/**
> -			 * Promote the replica set vclock before
> -			 * applying the row. If there is an
> -			 * exception (conflict) applying the row,
> -			 * the row is skipped when the replication
> -			 * is resumed.
> -			 */
> -			vclock_follow_xrow(&replicaset.vclock, &row);
> +		if (vclock_get(&replicaset.applier.vclock,
> +			       row.replica_id) < row.lsn) {
> +			if (row.replica_id == instance_id &&
> +			    vclock_get(&replicaset.vclock, instance_id) <
> +			    row.lsn) {
> +				/* Local row returned back. */
> +				goto done;

How can it happen? Why should we handle it? Can you please elaborate the
comment?

> +			}
> +			/* Preserve old lsn value. */
> +			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> +						     row.replica_id);
> +			vclock_follow_xrow(&replicaset.applier.vclock, &row);

So, AFAICS you want to promote replicaset.vclock after applying a row
received from a remote replica, and in order to avoid applying the same
row twice, you add replicaset.applier.vclock, is that correct?

However, in the next patch you wrap everything in a latch, rendering
this race impossible (BTW this is what Serge P. did too). So what's the
point in introducing another vclock here? If it is needed solely for
sync replication, then IMO we'd better do it in the scope of the
corresponding patch set, because currently it seems useless.

>  			struct replica *replica = replica_by_id(row.replica_id);
>  			struct latch *latch = (replica ? &replica->order_latch :
>  					       &replicaset.applier.order_latch);
> @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
>  				    box_error_code(e) == ER_TUPLE_FOUND &&
>  				    replication_skip_conflict)
>  					diag_clear(diag_get());
> -				else
> +				else {
> +					/* Rollback lsn to have a chance for a retry. */
> +					vclock_set(&replicaset.applier.vclock,
> +						   row.replica_id, old_lsn);
>  					diag_raise();
> +				}
>  			}
>  		}
> +done:
> +		/*
> +		 * Stay 'orphan' until appliers catch up with
> +		 * the remote vclock at the time of SUBSCRIBE
> +		 * and the lag is less than configured.
> +		 */
> +		if (applier->state == APPLIER_SYNC &&
> +		    applier->lag <= replication_sync_lag &&
> +		    vclock_compare(&remote_vclock_at_subscribe,
> +				   &replicaset.vclock) <= 0) {
> +			/* Applier is synced, switch to "follow". */
> +			applier_set_state(applier, APPLIER_FOLLOW);
> +		}
> +
>  		if (applier->state == APPLIER_SYNC ||
>  		    applier->state == APPLIER_FOLLOW)
>  			fiber_cond_signal(&applier->writer_cond);
> diff --git a/src/box/box.cc b/src/box/box.cc
> index 9f2fd6da1..0df0875dd 100644
> --- a/src/box/box.cc
> +++ b/src/box/box.cc
> @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
>  		       struct journal_entry * /* entry */)
>  {
>  	struct recovery_journal *journal = (struct recovery_journal *) base;
> +	vclock_copy(&replicaset.vclock, journal->vclock);
>  	return vclock_sum(journal->vclock);
>  }
>  
> @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
>  	 */
>  	engine_begin_final_recovery_xc();
>  	struct recovery_journal journal;
> -	recovery_journal_create(&journal, &replicaset.vclock);
> +	recovery_journal_create(&journal, &replicaset.applier.vclock);

I fail to understand how this works. If we bump applier.vclock during
remote bootstrap, then where is replicaset.vclock set?

>  	journal_set(&journal.base);
>  
>  	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 2cb4ec0f8..51e08886c 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -90,6 +90,7 @@ replication_init(void)
>  	fiber_cond_create(&replicaset.applier.cond);
>  	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
>  	latch_create(&replicaset.applier.order_latch);
> +	vclock_create(&replicaset.applier.vclock);
>  }
>  
>  void
> diff --git a/src/box/replication.h b/src/box/replication.h
> index 2ac620d86..b9aebed14 100644
> --- a/src/box/replication.h
> +++ b/src/box/replication.h
> @@ -194,6 +194,9 @@ struct replicaset {
>  	struct vclock vclock;
>  	/** Applier state. */
>  	struct {
> +		/**
> +		 * Vclock sent to process from appliers. */
> +		struct vclock vclock;
>  		/**
>  		 * Total number of replicas with attached
>  		 * appliers.
> diff --git a/src/box/vclock.c b/src/box/vclock.c
> index b5eb2800b..c297d1ff9 100644
> --- a/src/box/vclock.c
> +++ b/src/box/vclock.c
> @@ -36,6 +36,20 @@
>  
>  #include "diag.h"
>  
> +void
> +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> +{
> +	assert(lsn >= 0);
> +	assert(replica_id < VCLOCK_MAX);
> +	int64_t prev_lsn = vclock->lsn[replica_id];
> +	if (lsn > 0)
> +		vclock->map |= 1 << replica_id;
> +	else
> +		vclock->map &= ~(1 << replica_id);
> +	vclock->lsn[replica_id] = lsn;
> +	vclock->signature += lsn - prev_lsn;
> +}
> +
>  int64_t
>  vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
>  {
> diff --git a/src/box/vclock.h b/src/box/vclock.h
> index 111e29160..d6cb14c2a 100644
> --- a/src/box/vclock.h
> +++ b/src/box/vclock.h
> @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t replica_id)
>  	return vclock->lsn[replica_id];
>  }
>  
> +void
> +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
> +
>  static inline int64_t
>  vclock_inc(struct vclock *vclock, uint32_t replica_id)
>  {
> diff --git a/src/box/wal.c b/src/box/wal.c
> index a55b544aa..4c3537672 100644
> --- a/src/box/wal.c
> +++ b/src/box/wal.c
> @@ -171,6 +171,8 @@ struct wal_msg {
>  	 * be rolled back.
>  	 */
>  	struct stailq rollback;
> +	/** vclock after the batch processed. */
> +	struct vclock vclock;
>  };
>  
>  /**
> @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
>  	batch->approx_len = 0;
>  	stailq_create(&batch->commit);
>  	stailq_create(&batch->rollback);
> +	vclock_create(&batch->vclock);
>  }
>  
>  static struct wal_msg *
> @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
>  	 * wal_msg memory disappears after the first
>  	 * iteration of tx_schedule_queue loop.
>  	 */
> +	vclock_copy(&replicaset.vclock, &batch->vclock);
>  	if (! stailq_empty(&batch->rollback)) {
>  		/* Closes the input valve. */
>  		stailq_concat(&writer->rollback, &batch->rollback);
> @@ -1028,6 +1032,8 @@ done:
>  		error_log(error);
>  		diag_clear(diag_get());
>  	}
> +	/* Set resulting vclock. */
> +	vclock_copy(&wal_msg->vclock, &writer->vclock);
>  	/*
>  	 * We need to start rollback from the first request
>  	 * following the last committed request. If
> @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
>  	bool cancellable = fiber_set_cancellable(false);
>  	fiber_yield(); /* Request was inserted. */
>  	fiber_set_cancellable(cancellable);
> -	if (entry->res > 0) {
> -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> -		while (last >= entry->rows) {
> -			/*
> -			 * Find last row from local instance id
> -			 * and promote vclock.
> -			 */
> -			if ((*last)->replica_id == instance_id) {
> -				/*
> -				 * In master-master configuration, during sudden
> -				 * power-loss, if the data have not been written
> -				 * to WAL but have already been sent to others,
> -				 * they will send the data back. In this case
> -				 * vclock has already been promoted by applier.
> -				 */
> -				if (vclock_get(&replicaset.vclock,
> -					       instance_id) < (*last)->lsn) {
> -					vclock_follow_xrow(&replicaset.vclock,
> -							   *last);
> -				}
> -				break;
> -			}
> -			--last;
> -		}
> -	}
>  	return entry->res;
>  }
>  
> @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
>  {
>  	struct wal_writer *writer = (struct wal_writer *) journal;
>  	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
> -	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> -	if (new_lsn > old_lsn) {
> -		/* There were local writes, promote vclock. */
> -		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> -	}
> +	vclock_copy(&replicaset.vclock, &writer->vclock);
>  	return vclock_sum(&writer->vclock);
>  }
>  
> diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
> index f387e8e89..966fa1f4a 100644
> --- a/test/xlog-py/dup_key.result
> +++ b/test/xlog-py/dup_key.result
> @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
>  ---
>  - [2, 'second tuple']
>  ...
> -.xlog exists
> +.xlog#1 exists
> +box.space.test:insert{3, 'third tuple'}
> +---
> +- [3, 'third tuple']
> +...
> +box.space.test:insert{4, 'fourth tuple'}
> +---
> +- [4, 'fourth tuple']
> +...
> +.xlog#2 exists
>  box.space.test:insert{1, 'third tuple'}
>  ---
>  - [1, 'third tuple']

What happened to this test?

> @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
>  ---
>  - [2, 'fourth tuple']
>  ...
> -.xlog does not exist
>  check log line for 'Duplicate key'
>  
>  'Duplicate key' exists in server log
> diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> index 1c033da40..3dacde771 100644
> --- a/test/xlog-py/dup_key.test.py
> +++ b/test/xlog-py/dup_key.test.py
> @@ -26,19 +26,27 @@ server.stop()
>  
>  # Save wal#1
>  if os.access(wal, os.F_OK):
> -    print ".xlog exists"
> +    print ".xlog#1 exists"
>      os.rename(wal, wal_old)
>  
> -# Write wal#2
> +# Write wal#2 to bump lsn
> +server.start()
> +server.admin("box.space.test:insert{3, 'third tuple'}")
> +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> +server.stop()
> +
> +if os.access(wal, os.F_OK):
> +    print ".xlog#2 exists"
> +
> +# Write wal#3 - confliction with wal#1
>  server.start()
>  server.admin("box.space.test:insert{1, 'third tuple'}")
>  server.admin("box.space.test:insert{2, 'fourth tuple'}")
>  server.stop()
>  
>  # Restore wal#1
> -if not os.access(wal, os.F_OK):
> -    print ".xlog does not exist"
> -    os.rename(wal_old, wal)
> +os.unlink(wal)
> +os.rename(wal_old, wal)

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection Georgy Kirichenko
@ 2019-01-28 12:09   ` Vladimir Davydov
  2019-01-29 10:30     ` [tarantool-patches] " Георгий Кириченко
  0 siblings, 1 reply; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 12:09 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:11PM +0300, Georgy Kirichenko wrote:
> Do not skip row until the row is not processed by other appliers.

Looks like a fix for

  https://github.com/tarantool/tarantool/issues/3568

Worth adding a test?

> 
> Prerequisite #980
> ---
>  src/box/applier.cc | 35 ++++++++++++++++++-----------------
>  1 file changed, 18 insertions(+), 17 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 87873e970..148c8ce5a 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -504,6 +504,22 @@ applier_subscribe(struct applier *applier)
>  
>  		applier->lag = ev_now(loop()) - row.tm;
>  		applier->last_row_time = ev_monotonic_now(loop());
> +		struct replica *replica = replica_by_id(row.replica_id);
> +		struct latch *latch = (replica ? &replica->order_latch :
> +				       &replicaset.applier.order_latch);
> +		/*
> +		 * In a full mesh topology, the same set
> +		 * of changes may arrive via two
> +		 * concurrently running appliers. Thanks
> +		 * to vclock_follow() above, the first row

I don't see any vclock_follow() above. Please fix the comment.

> +		 * in the set will be skipped - but the
> +		 * remaining may execute out of order,
> +		 * when the following xstream_write()
> +		 * yields on WAL. Hence we need a latch to
> +		 * strictly order all changes which belong
> +		 * to the same server id.
> +		 */
> +		latch_lock(latch);
>  		if (vclock_get(&replicaset.applier.vclock,
>  			       row.replica_id) < row.lsn) {
>  			if (row.replica_id == instance_id &&

AFAIU this patch makes replicaset.applier.vclock, introduced by the
previous patch, useless.

> @@ -516,24 +532,7 @@ applier_subscribe(struct applier *applier)
>  			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
>  						     row.replica_id);
>  			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> -			struct replica *replica = replica_by_id(row.replica_id);
> -			struct latch *latch = (replica ? &replica->order_latch :
> -					       &replicaset.applier.order_latch);
> -			/*
> -			 * In a full mesh topology, the same set
> -			 * of changes may arrive via two
> -			 * concurrently running appliers. Thanks
> -			 * to vclock_follow() above, the first row
> -			 * in the set will be skipped - but the
> -			 * remaining may execute out of order,
> -			 * when the following xstream_write()
> -			 * yields on WAL. Hence we need a latch to
> -			 * strictly order all changes which belong
> -			 * to the same server id.
> -			 */
> -			latch_lock(latch);
>  			int res = xstream_write(applier->subscribe_stream, &row);
> -			latch_unlock(latch);
>  			if (res != 0) {
>  				struct error *e = diag_last_error(diag_get());
>  				/**
> @@ -548,11 +547,13 @@ applier_subscribe(struct applier *applier)
>  					/* Rollback lsn to have a chance for a retry. */
>  					vclock_set(&replicaset.applier.vclock,
>  						   row.replica_id, old_lsn);
> +					latch_unlock(latch);
>  					diag_raise();
>  				}
>  			}
>  		}
>  done:
> +		latch_unlock(latch);
>  		/*
>  		 * Stay 'orphan' until appliers catch up with
>  		 * the remote vclock at the time of SUBSCRIBE

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row Georgy Kirichenko
@ 2019-01-28 12:15   ` Vladimir Davydov
  2019-02-08 16:50   ` [tarantool-patches] " Konstantin Osipov
  1 sibling, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 12:15 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:12PM +0300, Georgy Kirichenko wrote:
> Fill lsn gaps with NOP rows if applier configured to skip conflicting
> rows. This enforces wal consistency.

The patch may be worthwhile, but the comment sounds very obscure.
What's "wal consistency"? How can the wal become inconsistent without
this patch? Shouldn't there be a test proving this patch is correct?

> 
> Prerequisite #980
> ---
>  src/box/applier.cc | 29 ++++++++++++++++-------------
>  1 file changed, 16 insertions(+), 13 deletions(-)
> 
> diff --git a/src/box/applier.cc b/src/box/applier.cc
> index 148c8ce5a..adbe88679 100644
> --- a/src/box/applier.cc
> +++ b/src/box/applier.cc
> @@ -533,23 +533,26 @@ applier_subscribe(struct applier *applier)
>  						     row.replica_id);
>  			vclock_follow_xrow(&replicaset.applier.vclock, &row);
>  			int res = xstream_write(applier->subscribe_stream, &row);
> -			if (res != 0) {
> -				struct error *e = diag_last_error(diag_get());
> +			struct error *e = diag_last_error(diag_get());
> +			if (res != 0 && e->type == &type_ClientError &&
> +			    box_error_code(e) == ER_TUPLE_FOUND &&
> +			    replication_skip_conflict) {
>  				/**
>  				 * Silently skip ER_TUPLE_FOUND error if such
>  				 * option is set in config.
>  				 */
> -				if (e->type == &type_ClientError &&
> -				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict)
> -					diag_clear(diag_get());
> -				else {
> -					/* Rollback lsn to have a chance for a retry. */
> -					vclock_set(&replicaset.applier.vclock,
> -						   row.replica_id, old_lsn);
> -					latch_unlock(latch);
> -					diag_raise();
> -				}
> +				diag_clear(diag_get());
> +				row.type = IPROTO_NOP;
> +				row.bodycnt = 0;
> +				res = xstream_write(applier->subscribe_stream,
> +						    &row);

A comment explaining why this is done would be nice to have.

> +			}
> +			if (res != 0) {
> +				/* Rollback lsn to have a chance for a retry. */
> +				vclock_set(&replicaset.applier.vclock,
> +					   row.replica_id, old_lsn);
> +				latch_unlock(latch);
> +				diag_raise();
>  			}
>  		}
>  done:

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
@ 2019-01-28 12:18   ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-28 12:18 UTC (permalink / raw)
  To: Georgy Kirichenko; +Cc: tarantool-patches

On Tue, Jan 22, 2019 at 01:31:13PM +0300, Georgy Kirichenko wrote:
> Only one-step vclock following is allowed. This enforces wal and
> replication consistency against out of order execution.

Again, quite an obscure commit message. Please try to explain why you do
what you do so that even a sync replication noob like me can understand
it.

> 
> Prerequisite #980
> ---
>  src/box/replication.cc |  1 +
>  src/box/vclock.c       |  2 +-
>  src/box/xrow.c         |  2 +-
>  test/unit/vclock.cc    | 10 +++++-----
>  4 files changed, 8 insertions(+), 7 deletions(-)
> 
> diff --git a/src/box/replication.cc b/src/box/replication.cc
> index 51e08886c..ee92a941b 100644
> --- a/src/box/replication.cc
> +++ b/src/box/replication.cc
> @@ -91,6 +91,7 @@ replication_init(void)
>  	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX, sizeof(struct replica *));
>  	latch_create(&replicaset.applier.order_latch);
>  	vclock_create(&replicaset.applier.vclock);
> +	vclock_clear(&replicaset.applier.vclock);
>  }
>  
>  void
> diff --git a/src/box/vclock.c b/src/box/vclock.c
> index c297d1ff9..807da9109 100644
> --- a/src/box/vclock.c
> +++ b/src/box/vclock.c
> @@ -56,7 +56,7 @@ vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
>  	assert(lsn >= 0);
>  	assert(replica_id < VCLOCK_MAX);
>  	int64_t prev_lsn = vclock->lsn[replica_id];
> -	assert(lsn > prev_lsn);
> +	assert(lsn == prev_lsn + 1);

AFAIU this may break recovery from a WAL created by an older Tarantool
version that allowed LSN gaps.

>  	/* Easier add each time than check. */
>  	vclock->map |= 1 << replica_id;
>  	vclock->lsn[replica_id] = lsn;

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes
  2019-01-28 11:20   ` Vladimir Davydov
@ 2019-01-29 10:22     ` Георгий Кириченко
  2019-01-29 11:58       ` Vladimir Davydov
  0 siblings, 1 reply; 18+ messages in thread
From: Георгий Кириченко @ 2019-01-29 10:22 UTC (permalink / raw)
  To: Vladimir Davydov; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 5070 bytes --]

On Monday, January 28, 2019 2:20:18 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:31:09PM +0300, Georgy Kirichenko wrote:
> > Increase replica lsn only if row was successfully written to disk. This
> > prevents wal from lsn gaps in case of IO errors and enforces wal
> > consistency.
> > 
> > Needs for #980
> > ---
> > 
> >  src/box/wal.c                     | 19 ++++++---
> >  test/xlog/errinj.result           |  1 -
> >  test/xlog/panic_on_lsn_gap.result | 65 +++++++++++++++----------------
> >  3 files changed, 45 insertions(+), 40 deletions(-)
> > 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index 3b50d3629..a55b544aa 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> > 
> >  }
> >  
> >  static void
> > 
> > -wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
> > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > 
> >  	       struct xrow_header **end)
> >  
> >  {
> >  
> >  	/** Assign LSN to all local rows. */
> >  	for ( ; row < end; row++) {
> >  	
> >  		if ((*row)->replica_id == 0) {
> > 
> > -			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> > +			(*row)->lsn = vclock_inc(vclock, instance_id);
> > 
> >  			(*row)->replica_id = instance_id;
> >  		
> >  		} else {
> > 
> > -			vclock_follow_xrow(&writer->vclock, *row);
> > +			vclock_follow_xrow(vclock, *row);
> > 
> >  		}
> >  	
> >  	}
> >  
> >  }
> > 
> > @@ -922,6 +922,11 @@ wal_write_to_disk(struct cmsg *msg)
> > 
> >  	struct wal_msg *wal_msg = (struct wal_msg *) msg;
> >  	struct error *error;
> > 
> > +	/* Local vclock copy. */
> > +	struct vclock vclock;
> > +	vclock_create(&vclock);
> > +	vclock_copy(&vclock, &writer->vclock);
> > +
> > 
> >  	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
> >  	while (inj != NULL && inj->bparam)
> >  	
> >  		usleep(10);
> > 
> > @@ -974,14 +979,15 @@ wal_write_to_disk(struct cmsg *msg)
> > 
> >  	struct journal_entry *entry;
> >  	struct stailq_entry *last_committed = NULL;
> >  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> > 
> > -		wal_assign_lsn(writer, entry->rows, entry->rows + entry-
>n_rows);
> > -		entry->res = vclock_sum(&writer->vclock);
> > +		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry-
>n_rows);
> > +		entry->res = vclock_sum(&vclock);
> > 
> >  		rc = xlog_write_entry(l, entry);
> >  		if (rc < 0)
> >  		
> >  			goto done;
> >  		
> >  		if (rc > 0) {
> >  		
> >  			writer->checkpoint_wal_size += rc;
> >  			last_committed = &entry->fifo;
> > 
> > +			vclock_copy(&writer->vclock, &vclock);
> 
> I don't like that you copy a vclock after applying each entry.
> Currently, it should be pretty cheap, but in future, when we make
> vclock store any number of ids, this might get pretty heavy.
> Can we minimize the number of memcpys somehow, ideally do it only
> on the rollback path?
In that case we should preserve vclock for rollback but it can be done only 
with vclock_copy to.  vclock_copy is used only for whole batch with all 
entries.
When we introduce unlimited vclock we should introduce vclock_diff also and 
then use them.
> 
> >  		}
> >  		/* rc == 0: the write is buffered in xlog_tx */
> >  	
> >  	}
> > 
> > @@ -991,6 +997,7 @@ wal_write_to_disk(struct cmsg *msg)
> > 
> >  	writer->checkpoint_wal_size += rc;
> >  	last_committed = stailq_last(&wal_msg->commit);
> > 
> > +	vclock_copy(&writer->vclock, &vclock);
> > 
> >  	/*
> >  	
> >  	 * Notify TX if the checkpoint threshold has been exceeded.
> > 
> > @@ -1185,7 +1192,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
> > 
> >  			   struct journal_entry *entry)
> >  
> >  {
> >  
> >  	struct wal_writer *writer = (struct wal_writer *) journal;
> > 
> > -	wal_assign_lsn(writer, entry->rows, entry->rows + entry->n_rows);
> > +	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows +
> > entry->n_rows);> 
> >  	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> >  	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> >  	if (new_lsn > old_lsn) {
> > 
> > diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
> > index 390404b47..7f15bef35 100644
> > --- a/test/xlog/errinj.result
> > +++ b/test/xlog/errinj.result
> > @@ -43,7 +43,6 @@ require('fio').glob(name .. "/*.xlog")
> > 
> >  ---
> >  - - xlog/00000000000000000000.xlog
> >  
> >    - xlog/00000000000000000001.xlog
> > 
> > -  - xlog/00000000000000000002.xlog
> > 
> >  ...
> >  test_run:cmd('restart server default with cleanup=1')
> >  -- gh-881 iproto request with wal IO error
> > 
> > diff --git a/test/xlog/panic_on_lsn_gap.result
> > b/test/xlog/panic_on_lsn_gap.result index 4dd1291f8..8054baab4 100644
> > --- a/test/xlog/panic_on_lsn_gap.result
> > +++ b/test/xlog/panic_on_lsn_gap.result
> > @@ -105,7 +105,7 @@ test_run:cmd("restart server panic")
> > 
> >  --
> >  box.info.vclock
> >  ---
> > 
> > -- {1: 11}
> > +- {1: 1}
> 
> After this patch the comments contradict the expected result of this
> test. Please fix.


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 3/5] Enforce applier out of order protection
  2019-01-28 12:09   ` Vladimir Davydov
@ 2019-01-29 10:30     ` Георгий Кириченко
  2019-01-29 12:00       ` Vladimir Davydov
  0 siblings, 1 reply; 18+ messages in thread
From: Георгий Кириченко @ 2019-01-29 10:30 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Vladimir Davydov

[-- Attachment #1: Type: text/plain, Size: 3637 bytes --]

On Monday, January 28, 2019 3:09:01 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:31:11PM +0300, Georgy Kirichenko wrote:
> > Do not skip row until the row is not processed by other appliers.
> 
> Looks like a fix for
> 
>   https://github.com/tarantool/tarantool/issues/3568
> 
> Worth adding a test?
> 
> > Prerequisite #980
> > ---
> > 
> >  src/box/applier.cc | 35 ++++++++++++++++++-----------------
> >  1 file changed, 18 insertions(+), 17 deletions(-)
> > 
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 87873e970..148c8ce5a 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -504,6 +504,22 @@ applier_subscribe(struct applier *applier)
> > 
> >  		applier->lag = ev_now(loop()) - row.tm;
> >  		applier->last_row_time = ev_monotonic_now(loop());
> > 
> > +		struct replica *replica = replica_by_id(row.replica_id);
> > +		struct latch *latch = (replica ? &replica->order_latch :
> > +				       &replicaset.applier.order_latch);
> > +		/*
> > +		 * In a full mesh topology, the same set
> > +		 * of changes may arrive via two
> > +		 * concurrently running appliers. Thanks
> > +		 * to vclock_follow() above, the first row
> 
> I don't see any vclock_follow() above. Please fix the comment.
> 
> > +		 * in the set will be skipped - but the
> > +		 * remaining may execute out of order,
> > +		 * when the following xstream_write()
> > +		 * yields on WAL. Hence we need a latch to
> > +		 * strictly order all changes which belong
> > +		 * to the same server id.
> > +		 */
> > +		latch_lock(latch);
> > 
> >  		if (vclock_get(&replicaset.applier.vclock,
> >  		
> >  			       row.replica_id) < row.lsn) {
> >  			
> >  			if (row.replica_id == instance_id &&
> 
> AFAIU this patch makes replicaset.applier.vclock, introduced by the
> previous patch, useless.
You are right now, but I plan to release this latch just before commit in case 
of parallel applier.
> 
> > @@ -516,24 +532,7 @@ applier_subscribe(struct applier *applier)
> > 
> >  			int64_t old_lsn = 
vclock_get(&replicaset.applier.vclock,
> >  			
> >  						     row.replica_id);
> >  			
> >  			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> > 
> > -			struct replica *replica = replica_by_id(row.replica_id);
> > -			struct latch *latch = (replica ? &replica->order_latch :
> > -					       &replicaset.applier.order_latch);
> > -			/*
> > -			 * In a full mesh topology, the same set
> > -			 * of changes may arrive via two
> > -			 * concurrently running appliers. Thanks
> > -			 * to vclock_follow() above, the first row
> > -			 * in the set will be skipped - but the
> > -			 * remaining may execute out of order,
> > -			 * when the following xstream_write()
> > -			 * yields on WAL. Hence we need a latch to
> > -			 * strictly order all changes which belong
> > -			 * to the same server id.
> > -			 */
> > -			latch_lock(latch);
> > 
> >  			int res = xstream_write(applier->subscribe_stream, 
&row);
> > 
> > -			latch_unlock(latch);
> > 
> >  			if (res != 0) {
> >  			
> >  				struct error *e = diag_last_error(diag_get());
> >  				/**
> > 
> > @@ -548,11 +547,13 @@ applier_subscribe(struct applier *applier)
> > 
> >  					/* Rollback lsn to have a chance for a 
retry. */
> >  					vclock_set(&replicaset.applier.vclock,
> >  					
> >  						   row.replica_id, old_lsn);
> > 
> > +					latch_unlock(latch);
> > 
> >  					diag_raise();
> >  				
> >  				}
> >  			
> >  			}
> >  		
> >  		}
> >  
> >  done:
> > +		latch_unlock(latch);
> > 
> >  		/*
> >  		
> >  		 * Stay 'orphan' until appliers catch up with
> >  		 * the remote vclock at the time of SUBSCRIBE


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 2/5] Update replicaset vclock from wal
  2019-01-28 11:59   ` Vladimir Davydov
@ 2019-01-29 10:33     ` Георгий Кириченко
  0 siblings, 0 replies; 18+ messages in thread
From: Георгий Кириченко @ 2019-01-29 10:33 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Vladimir Davydov

[-- Attachment #1: Type: text/plain, Size: 15845 bytes --]

On Monday, January 28, 2019 2:59:22 PM MSK Vladimir Davydov wrote:
> On Tue, Jan 22, 2019 at 01:31:10PM +0300, Georgy Kirichenko wrote:
> > Journal maintains replicaset vclock for recovery, local and replicated
> > operations. Introduce replicaset.applier.vclock to prevent applier
> > races.
> 
> What kind of races?
> 
> > Prerequisite #980
> > ---
> > 
> >  src/box/applier.cc           | 68 +++++++++++++++++++++---------------
> >  src/box/box.cc               |  3 +-
> >  src/box/replication.cc       |  1 +
> >  src/box/replication.h        |  3 ++
> >  src/box/vclock.c             | 14 ++++++++
> >  src/box/vclock.h             |  3 ++
> >  src/box/wal.c                | 38 ++++----------------
> >  test/xlog-py/dup_key.result  | 12 +++++--
> >  test/xlog-py/dup_key.test.py | 18 +++++++---
> >  9 files changed, 93 insertions(+), 67 deletions(-)
> > 
> > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > index 21d2e6bcb..87873e970 100644
> > --- a/src/box/applier.cc
> > +++ b/src/box/applier.cc
> > @@ -139,7 +139,7 @@ applier_writer_f(va_list ap)
> > 
> >  			continue;
> >  		
> >  		try {
> >  		
> >  			struct xrow_header xrow;
> > 
> > -			xrow_encode_vclock(&xrow, &replicaset.vclock);
> > +			xrow_encode_vclock(&xrow, &replicaset.applier.vclock);
> > 
> >  			coio_write_xrow(&io, &xrow);
> >  		
> >  		} catch (SocketError *e) {
> >  		
> >  			/*
> > 
> > @@ -300,7 +300,7 @@ applier_join(struct applier *applier)
> > 
> >  		 * Used to initialize the replica's initial
> >  		 * vclock in bootstrap_from_master()
> >  		 */
> > 
> > -		xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > +		xrow_decode_vclock_xc(&row, &replicaset.applier.vclock);
> > 
> >  	}
> >  	
> >  	applier_set_state(applier, APPLIER_INITIAL_JOIN);
> > 
> > @@ -326,7 +326,8 @@ applier_join(struct applier *applier)
> > 
> >  				 * vclock yet, do it now. In 1.7+
> >  				 * this vclock is not used.
> >  				 */
> > 
> > -				xrow_decode_vclock_xc(&row, &replicaset.vclock);
> > +				xrow_decode_vclock_xc(&row,
> > +						      &replicaset.applier.vclock);
> > 
> >  			}
> >  			break; /* end of stream */
> >  		
> >  		} else if (iproto_type_is_error(row.type)) {
> > 
> > @@ -336,6 +337,7 @@ applier_join(struct applier *applier)
> > 
> >  				  (uint32_t) row.type);
> >  		
> >  		}
> >  	
> >  	}
> > 
> > +	vclock_copy(&replicaset.vclock, &replicaset.applier.vclock);
> > 
> >  	say_info("initial data received");
> >  	
> >  	applier_set_state(applier, APPLIER_FINAL_JOIN);
> > 
> > @@ -355,7 +357,7 @@ applier_join(struct applier *applier)
> > 
> >  		coio_read_xrow(coio, ibuf, &row);
> >  		applier->last_row_time = ev_monotonic_now(loop());
> >  		if (iproto_type_is_dml(row.type)) {
> > 
> > -			vclock_follow_xrow(&replicaset.vclock, &row);
> > +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> > 
> >  			xstream_write_xc(applier->subscribe_stream, &row);
> >  			if (++row_count % 100000 == 0)
> >  			
> >  				say_info("%.1fM rows received", row_count / 
1e6);
> > 
> > @@ -386,6 +388,9 @@ applier_subscribe(struct applier *applier)
> > 
> >  {
> >  
> >  	assert(applier->subscribe_stream != NULL);
> > 
> > +	if (!vclock_is_set(&replicaset.applier.vclock))
> > +		vclock_copy(&replicaset.applier.vclock, &replicaset.vclock);
> > +
> > 
> >  	/* Send SUBSCRIBE request */
> >  	struct ev_io *coio = &applier->io;
> >  	struct ibuf *ibuf = &applier->ibuf;
> > 
> > @@ -470,19 +475,6 @@ applier_subscribe(struct applier *applier)
> > 
> >  			applier_set_state(applier, APPLIER_FOLLOW);
> >  		
> >  		}
> > 
> > -		/*
> > -		 * Stay 'orphan' until appliers catch up with
> > -		 * the remote vclock at the time of SUBSCRIBE
> > -		 * and the lag is less than configured.
> > -		 */
> > -		if (applier->state == APPLIER_SYNC &&
> > -		    applier->lag <= replication_sync_lag &&
> > -		    vclock_compare(&remote_vclock_at_subscribe,
> > -				   &replicaset.vclock) <= 0) {
> > -			/* Applier is synced, switch to "follow". */
> > -			applier_set_state(applier, APPLIER_FOLLOW);
> > -		}
> > -
> 
> Hmm, why move this chunk?
> 
> >  		/*
> >  		
> >  		 * Tarantool < 1.7.7 does not send periodic heartbeat
> >  		 * messages so we can't assume that if we haven't heard
> > 
> > @@ -512,16 +504,18 @@ applier_subscribe(struct applier *applier)
> > 
> >  		applier->lag = ev_now(loop()) - row.tm;
> >  		applier->last_row_time = ev_monotonic_now(loop());
> > 
> > -
> > -		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
> > -			/**
> > -			 * Promote the replica set vclock before
> > -			 * applying the row. If there is an
> > -			 * exception (conflict) applying the row,
> > -			 * the row is skipped when the replication
> > -			 * is resumed.
> > -			 */
> > -			vclock_follow_xrow(&replicaset.vclock, &row);
> > +		if (vclock_get(&replicaset.applier.vclock,
> > +			       row.replica_id) < row.lsn) {
> > +			if (row.replica_id == instance_id &&
> > +			    vclock_get(&replicaset.vclock, instance_id) <
> > +			    row.lsn) {
> > +				/* Local row returned back. */
> > +				goto done;
> 
> How can it happen? Why should we handle it? Can you please elaborate the
> comment?
> 
> > +			}
> > +			/* Preserve old lsn value. */
> > +			int64_t old_lsn = vclock_get(&replicaset.applier.vclock,
> > +						     row.replica_id);
> > +			vclock_follow_xrow(&replicaset.applier.vclock, &row);
> 
> So, AFAICS you want to promote replicaset.vclock after applying a row
> received from a remote replica, and in order to avoid applying the same
> row twice, you add replicaset.applier.vclock, is that correct?
> 
> However, in the next patch you wrap everything in a latch, rendering
> this race impossible (BTW this is what Serge P. did too). So what's the
> point in introducing another vclock here? If it is needed solely for
> sync replication, then IMO we'd better do it in the scope of the
> corresponding patch set, because currently it seems useless.
You are right but it is a preparation part, I hope I will be able to unlock 
latch just before commit to allow parallel applier.
I think one of patches 2 or 3 could be eliminated right now. Lets talk about 
it.
> 
> >  			struct replica *replica = 
replica_by_id(row.replica_id);
> >  			
> >  			struct latch *latch = (replica ? &replica->order_latch 
:
> >  					       &replicaset.applier.order_latch);
> > 
> > @@ -550,10 +544,28 @@ applier_subscribe(struct applier *applier)
> > 
> >  				    box_error_code(e) == ER_TUPLE_FOUND &&
> >  				    replication_skip_conflict)
> >  					
> >  					diag_clear(diag_get());
> > 
> > -				else
> > +				else {
> > +					/* Rollback lsn to have a chance for a 
retry. */
> > +					vclock_set(&replicaset.applier.vclock,
> > +						   row.replica_id, old_lsn);
> > 
> >  					diag_raise();
> > 
> > +				}
> > 
> >  			}
> >  		
> >  		}
> > 
> > +done:
> > +		/*
> > +		 * Stay 'orphan' until appliers catch up with
> > +		 * the remote vclock at the time of SUBSCRIBE
> > +		 * and the lag is less than configured.
> > +		 */
> > +		if (applier->state == APPLIER_SYNC &&
> > +		    applier->lag <= replication_sync_lag &&
> > +		    vclock_compare(&remote_vclock_at_subscribe,
> > +				   &replicaset.vclock) <= 0) {
> > +			/* Applier is synced, switch to "follow". */
> > +			applier_set_state(applier, APPLIER_FOLLOW);
> > +		}
> > +
> > 
> >  		if (applier->state == APPLIER_SYNC ||
> >  		
> >  		    applier->state == APPLIER_FOLLOW)
> >  			
> >  			fiber_cond_signal(&applier->writer_cond);
> > 
> > diff --git a/src/box/box.cc b/src/box/box.cc
> > index 9f2fd6da1..0df0875dd 100644
> > --- a/src/box/box.cc
> > +++ b/src/box/box.cc
> > @@ -292,6 +292,7 @@ recovery_journal_write(struct journal *base,
> > 
> >  		       struct journal_entry * /* entry */)
> >  
> >  {
> >  
> >  	struct recovery_journal *journal = (struct recovery_journal *) base;
> > 
> > +	vclock_copy(&replicaset.vclock, journal->vclock);
> > 
> >  	return vclock_sum(journal->vclock);
> >  
> >  }
> > 
> > @@ -1809,7 +1810,7 @@ bootstrap_from_master(struct replica *master)
> > 
> >  	 */
> >  	
> >  	engine_begin_final_recovery_xc();
> >  	struct recovery_journal journal;
> > 
> > -	recovery_journal_create(&journal, &replicaset.vclock);
> > +	recovery_journal_create(&journal, &replicaset.applier.vclock);
> 
> I fail to understand how this works. If we bump applier.vclock during
> remote bootstrap, then where is replicaset.vclock set?
> 
> >  	journal_set(&journal.base);
> >  	
> >  	applier_resume_to_state(applier, APPLIER_JOINED, TIMEOUT_INFINITY);
> > 
> > diff --git a/src/box/replication.cc b/src/box/replication.cc
> > index 2cb4ec0f8..51e08886c 100644
> > --- a/src/box/replication.cc
> > +++ b/src/box/replication.cc
> > @@ -90,6 +90,7 @@ replication_init(void)
> > 
> >  	fiber_cond_create(&replicaset.applier.cond);
> >  	replicaset.replica_by_id = (struct replica **)calloc(VCLOCK_MAX,
> >  	sizeof(struct replica *));
> >  	latch_create(&replicaset.applier.order_latch);
> > 
> > +	vclock_create(&replicaset.applier.vclock);
> > 
> >  }
> >  
> >  void
> > 
> > diff --git a/src/box/replication.h b/src/box/replication.h
> > index 2ac620d86..b9aebed14 100644
> > --- a/src/box/replication.h
> > +++ b/src/box/replication.h
> > @@ -194,6 +194,9 @@ struct replicaset {
> > 
> >  	struct vclock vclock;
> >  	/** Applier state. */
> >  	struct {
> > 
> > +		/**
> > +		 * Vclock sent to process from appliers. */
> > +		struct vclock vclock;
> > 
> >  		/**
> >  		
> >  		 * Total number of replicas with attached
> >  		 * appliers.
> > 
> > diff --git a/src/box/vclock.c b/src/box/vclock.c
> > index b5eb2800b..c297d1ff9 100644
> > --- a/src/box/vclock.c
> > +++ b/src/box/vclock.c
> > @@ -36,6 +36,20 @@
> > 
> >  #include "diag.h"
> > 
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> > +{
> > +	assert(lsn >= 0);
> > +	assert(replica_id < VCLOCK_MAX);
> > +	int64_t prev_lsn = vclock->lsn[replica_id];
> > +	if (lsn > 0)
> > +		vclock->map |= 1 << replica_id;
> > +	else
> > +		vclock->map &= ~(1 << replica_id);
> > +	vclock->lsn[replica_id] = lsn;
> > +	vclock->signature += lsn - prev_lsn;
> > +}
> > +
> > 
> >  int64_t
> >  vclock_follow(struct vclock *vclock, uint32_t replica_id, int64_t lsn)
> >  {
> > 
> > diff --git a/src/box/vclock.h b/src/box/vclock.h
> > index 111e29160..d6cb14c2a 100644
> > --- a/src/box/vclock.h
> > +++ b/src/box/vclock.h
> > @@ -161,6 +161,9 @@ vclock_get(const struct vclock *vclock, uint32_t
> > replica_id)> 
> >  	return vclock->lsn[replica_id];
> >  
> >  }
> > 
> > +void
> > +vclock_set(struct vclock *vclock, uint32_t replica_id, int64_t lsn);
> > +
> > 
> >  static inline int64_t
> >  vclock_inc(struct vclock *vclock, uint32_t replica_id)
> >  {
> > 
> > diff --git a/src/box/wal.c b/src/box/wal.c
> > index a55b544aa..4c3537672 100644
> > --- a/src/box/wal.c
> > +++ b/src/box/wal.c
> > @@ -171,6 +171,8 @@ struct wal_msg {
> > 
> >  	 * be rolled back.
> >  	 */
> >  	
> >  	struct stailq rollback;
> > 
> > +	/** vclock after the batch processed. */
> > +	struct vclock vclock;
> > 
> >  };
> >  
> >  /**
> > 
> > @@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
> > 
> >  	batch->approx_len = 0;
> >  	stailq_create(&batch->commit);
> >  	stailq_create(&batch->rollback);
> > 
> > +	vclock_create(&batch->vclock);
> > 
> >  }
> >  
> >  static struct wal_msg *
> > 
> > @@ -280,6 +283,7 @@ tx_schedule_commit(struct cmsg *msg)
> > 
> >  	 * wal_msg memory disappears after the first
> >  	 * iteration of tx_schedule_queue loop.
> >  	 */
> > 
> > +	vclock_copy(&replicaset.vclock, &batch->vclock);
> > 
> >  	if (! stailq_empty(&batch->rollback)) {
> >  	
> >  		/* Closes the input valve. */
> >  		stailq_concat(&writer->rollback, &batch->rollback);
> > 
> > @@ -1028,6 +1032,8 @@ done:
> >  		error_log(error);
> >  		diag_clear(diag_get());
> >  	
> >  	}
> > 
> > +	/* Set resulting vclock. */
> > +	vclock_copy(&wal_msg->vclock, &writer->vclock);
> > 
> >  	/*
> >  	
> >  	 * We need to start rollback from the first request
> >  	 * following the last committed request. If
> > 
> > @@ -1159,31 +1165,6 @@ wal_write(struct journal *journal, struct
> > journal_entry *entry)> 
> >  	bool cancellable = fiber_set_cancellable(false);
> >  	fiber_yield(); /* Request was inserted. */
> >  	fiber_set_cancellable(cancellable);
> > 
> > -	if (entry->res > 0) {
> > -		struct xrow_header **last = entry->rows + entry->n_rows - 1;
> > -		while (last >= entry->rows) {
> > -			/*
> > -			 * Find last row from local instance id
> > -			 * and promote vclock.
> > -			 */
> > -			if ((*last)->replica_id == instance_id) {
> > -				/*
> > -				 * In master-master configuration, during sudden
> > -				 * power-loss, if the data have not been written
> > -				 * to WAL but have already been sent to others,
> > -				 * they will send the data back. In this case
> > -				 * vclock has already been promoted by applier.
> > -				 */
> > -				if (vclock_get(&replicaset.vclock,
> > -					       instance_id) < (*last)->lsn) {
> > -					vclock_follow_xrow(&replicaset.vclock,
> > -							   *last);
> > -				}
> > -				break;
> > -			}
> > -			--last;
> > -		}
> > -	}
> > 
> >  	return entry->res;
> >  
> >  }
> > 
> > @@ -1193,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
> > 
> >  {
> >  
> >  	struct wal_writer *writer = (struct wal_writer *) journal;
> >  	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows +
> >  	entry->n_rows);
> > 
> > -	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
> > -	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
> > -	if (new_lsn > old_lsn) {
> > -		/* There were local writes, promote vclock. */
> > -		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
> > -	}
> > +	vclock_copy(&replicaset.vclock, &writer->vclock);
> > 
> >  	return vclock_sum(&writer->vclock);
> >  
> >  }
> > 
> > diff --git a/test/xlog-py/dup_key.result b/test/xlog-py/dup_key.result
> > index f387e8e89..966fa1f4a 100644
> > --- a/test/xlog-py/dup_key.result
> > +++ b/test/xlog-py/dup_key.result
> > @@ -16,7 +16,16 @@ box.space.test:insert{2, 'second tuple'}
> > 
> >  ---
> >  - [2, 'second tuple']
> >  ...
> > 
> > -.xlog exists
> > +.xlog#1 exists
> > +box.space.test:insert{3, 'third tuple'}
> > +---
> > +- [3, 'third tuple']
> > +...
> > +box.space.test:insert{4, 'fourth tuple'}
> > +---
> > +- [4, 'fourth tuple']
> > +...
> > +.xlog#2 exists
> > 
> >  box.space.test:insert{1, 'third tuple'}
> >  ---
> >  - [1, 'third tuple']
> 
> What happened to this test?
> 
> > @@ -25,7 +34,6 @@ box.space.test:insert{2, 'fourth tuple'}
> > 
> >  ---
> >  - [2, 'fourth tuple']
> >  ...
> > 
> > -.xlog does not exist
> > 
> >  check log line for 'Duplicate key'
> >  
> >  'Duplicate key' exists in server log
> > 
> > diff --git a/test/xlog-py/dup_key.test.py b/test/xlog-py/dup_key.test.py
> > index 1c033da40..3dacde771 100644
> > --- a/test/xlog-py/dup_key.test.py
> > +++ b/test/xlog-py/dup_key.test.py
> > @@ -26,19 +26,27 @@ server.stop()
> > 
> >  # Save wal#1
> > 
> >  if os.access(wal, os.F_OK):
> > -    print ".xlog exists"
> > +    print ".xlog#1 exists"
> > 
> >      os.rename(wal, wal_old)
> > 
> > -# Write wal#2
> > +# Write wal#2 to bump lsn
> > +server.start()
> > +server.admin("box.space.test:insert{3, 'third tuple'}")
> > +server.admin("box.space.test:insert{4, 'fourth tuple'}")
> > +server.stop()
> > +
> > +if os.access(wal, os.F_OK):
> > +    print ".xlog#2 exists"
> > +
> > +# Write wal#3 - confliction with wal#1
> > 
> >  server.start()
> >  server.admin("box.space.test:insert{1, 'third tuple'}")
> >  server.admin("box.space.test:insert{2, 'fourth tuple'}")
> >  server.stop()
> >  
> >  # Restore wal#1
> > 
> > -if not os.access(wal, os.F_OK):
> > -    print ".xlog does not exist"
> > -    os.rename(wal_old, wal)
> > +os.unlink(wal)
> > +os.rename(wal_old, wal)


[-- Attachment #2: This is a digitally signed message part. --]
[-- Type: application/pgp-signature, Size: 488 bytes --]

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes
  2019-01-29 10:22     ` Георгий Кириченко
@ 2019-01-29 11:58       ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-29 11:58 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Tue, Jan 29, 2019 at 01:22:21PM +0300, Георгий Кириченко wrote:
> On Monday, January 28, 2019 2:20:18 PM MSK Vladimir Davydov wrote:
> > On Tue, Jan 22, 2019 at 01:31:09PM +0300, Georgy Kirichenko wrote:
> > > Increase replica lsn only if row was successfully written to disk. This
> > > prevents wal from lsn gaps in case of IO errors and enforces wal
> > > consistency.
> > > 
> > > Needs for #980
> > > ---
> > > 
> > >  src/box/wal.c                     | 19 ++++++---
> > >  test/xlog/errinj.result           |  1 -
> > >  test/xlog/panic_on_lsn_gap.result | 65 +++++++++++++++----------------
> > >  3 files changed, 45 insertions(+), 40 deletions(-)
> > > 
> > > diff --git a/src/box/wal.c b/src/box/wal.c
> > > index 3b50d3629..a55b544aa 100644
> > > --- a/src/box/wal.c
> > > +++ b/src/box/wal.c
> > > @@ -901,16 +901,16 @@ wal_writer_begin_rollback(struct wal_writer *writer)
> > > 
> > >  }
> > >  
> > >  static void
> > > 
> > > -wal_assign_lsn(struct wal_writer *writer, struct xrow_header **row,
> > > +wal_assign_lsn(struct vclock *vclock, struct xrow_header **row,
> > > 
> > >  	       struct xrow_header **end)
> > >  
> > >  {
> > >  
> > >  	/** Assign LSN to all local rows. */
> > >  	for ( ; row < end; row++) {
> > >  	
> > >  		if ((*row)->replica_id == 0) {
> > > 
> > > -			(*row)->lsn = vclock_inc(&writer->vclock, instance_id);
> > > +			(*row)->lsn = vclock_inc(vclock, instance_id);
> > > 
> > >  			(*row)->replica_id = instance_id;
> > >  		
> > >  		} else {
> > > 
> > > -			vclock_follow_xrow(&writer->vclock, *row);
> > > +			vclock_follow_xrow(vclock, *row);
> > > 
> > >  		}
> > >  	
> > >  	}
> > >  
> > >  }
> > > 
> > > @@ -922,6 +922,11 @@ wal_write_to_disk(struct cmsg *msg)
> > > 
> > >  	struct wal_msg *wal_msg = (struct wal_msg *) msg;
> > >  	struct error *error;
> > > 
> > > +	/* Local vclock copy. */
> > > +	struct vclock vclock;
> > > +	vclock_create(&vclock);
> > > +	vclock_copy(&vclock, &writer->vclock);
> > > +
> > > 
> > >  	struct errinj *inj = errinj(ERRINJ_WAL_DELAY, ERRINJ_BOOL);
> > >  	while (inj != NULL && inj->bparam)
> > >  	
> > >  		usleep(10);
> > > 
> > > @@ -974,14 +979,15 @@ wal_write_to_disk(struct cmsg *msg)
> > > 
> > >  	struct journal_entry *entry;
> > >  	struct stailq_entry *last_committed = NULL;
> > >  	stailq_foreach_entry(entry, &wal_msg->commit, fifo) {
> > > 
> > > -		wal_assign_lsn(writer, entry->rows, entry->rows + entry-
> >n_rows);
> > > -		entry->res = vclock_sum(&writer->vclock);
> > > +		wal_assign_lsn(&vclock, entry->rows, entry->rows + entry-
> >n_rows);
> > > +		entry->res = vclock_sum(&vclock);
> > > 
> > >  		rc = xlog_write_entry(l, entry);
> > >  		if (rc < 0)
> > >  		
> > >  			goto done;
> > >  		
> > >  		if (rc > 0) {
> > >  		
> > >  			writer->checkpoint_wal_size += rc;
> > >  			last_committed = &entry->fifo;
> > > 
> > > +			vclock_copy(&writer->vclock, &vclock);
> > 
> > I don't like that you copy a vclock after applying each entry.
> > Currently, it should be pretty cheap, but in future, when we make
> > vclock store any number of ids, this might get pretty heavy.
> > Can we minimize the number of memcpys somehow, ideally do it only
> > on the rollback path?
> In that case we should preserve vclock for rollback but it can be done only 
> with vclock_copy to.  vclock_copy is used only for whole batch with all 
> entries.
> When we introduce unlimited vclock we should introduce vclock_diff also and 
> then use them.

Fair enough.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* Re: [tarantool-patches] Re: [PATCH v2 3/5] Enforce applier out of order protection
  2019-01-29 10:30     ` [tarantool-patches] " Георгий Кириченко
@ 2019-01-29 12:00       ` Vladimir Davydov
  0 siblings, 0 replies; 18+ messages in thread
From: Vladimir Davydov @ 2019-01-29 12:00 UTC (permalink / raw)
  To: Георгий
	Кириченко
  Cc: tarantool-patches

On Tue, Jan 29, 2019 at 01:30:40PM +0300, Георгий Кириченко wrote:
> On Monday, January 28, 2019 3:09:01 PM MSK Vladimir Davydov wrote:
> > On Tue, Jan 22, 2019 at 01:31:11PM +0300, Georgy Kirichenko wrote:
> > > Do not skip row until the row is not processed by other appliers.
> > 
> > Looks like a fix for
> > 
> >   https://github.com/tarantool/tarantool/issues/3568
> > 
> > Worth adding a test?
> > 
> > > Prerequisite #980
> > > ---
> > > 
> > >  src/box/applier.cc | 35 ++++++++++++++++++-----------------
> > >  1 file changed, 18 insertions(+), 17 deletions(-)
> > > 
> > > diff --git a/src/box/applier.cc b/src/box/applier.cc
> > > index 87873e970..148c8ce5a 100644
> > > --- a/src/box/applier.cc
> > > +++ b/src/box/applier.cc
> > > @@ -504,6 +504,22 @@ applier_subscribe(struct applier *applier)
> > > 
> > >  		applier->lag = ev_now(loop()) - row.tm;
> > >  		applier->last_row_time = ev_monotonic_now(loop());
> > > 
> > > +		struct replica *replica = replica_by_id(row.replica_id);
> > > +		struct latch *latch = (replica ? &replica->order_latch :
> > > +				       &replicaset.applier.order_latch);
> > > +		/*
> > > +		 * In a full mesh topology, the same set
> > > +		 * of changes may arrive via two
> > > +		 * concurrently running appliers. Thanks
> > > +		 * to vclock_follow() above, the first row
> > 
> > I don't see any vclock_follow() above. Please fix the comment.
> > 
> > > +		 * in the set will be skipped - but the
> > > +		 * remaining may execute out of order,
> > > +		 * when the following xstream_write()
> > > +		 * yields on WAL. Hence we need a latch to
> > > +		 * strictly order all changes which belong
> > > +		 * to the same server id.
> > > +		 */
> > > +		latch_lock(latch);
> > > 
> > >  		if (vclock_get(&replicaset.applier.vclock,
> > >  		
> > >  			       row.replica_id) < row.lsn) {
> > >  			
> > >  			if (row.replica_id == instance_id &&
> > 
> > AFAIU this patch makes replicaset.applier.vclock, introduced by the
> > previous patch, useless.
> You are right now, but I plan to release this latch just before commit in case 
> of parallel applier.

Then let's please introduce applier.vclock when we get to implement
parallel applier, because currently I can't say for sure whether we
really need it or not.

^ permalink raw reply	[flat|nested] 18+ messages in thread

* [tarantool-patches] Re: [PATCH v2 4/5] Emit NOP if an applier skips row
  2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row Georgy Kirichenko
  2019-01-28 12:15   ` Vladimir Davydov
@ 2019-02-08 16:50   ` Konstantin Osipov
  1 sibling, 0 replies; 18+ messages in thread
From: Konstantin Osipov @ 2019-02-08 16:50 UTC (permalink / raw)
  To: tarantool-patches; +Cc: Georgy Kirichenko

* Georgy Kirichenko <georgy@tarantool.org> [19/01/22 15:45]:
>  				 */
> -				if (e->type == &type_ClientError &&
> -				    box_error_code(e) == ER_TUPLE_FOUND &&
> -				    replication_skip_conflict)
> -					diag_clear(diag_get());
> -				else {
> -					/* Rollback lsn to have a chance for a retry. */
> -					vclock_set(&replicaset.applier.vclock,
> -						   row.replica_id, old_lsn);
> -					latch_unlock(latch);
> -					diag_raise();
> -				}
> +				diag_clear(diag_get());
> +				row.type = IPROTO_NOP;
> +				row.bodycnt = 0;
> +				res = xstream_write(applier->subscribe_stream,
> +						    &row);
> +			}
> +			if (res != 0) {
> +				/* Rollback lsn to have a chance for a retry. */
> +				vclock_set(&replicaset.applier.vclock,
> +					   row.replica_id, old_lsn);
> +				latch_unlock(latch);
> +				diag_raise();
>  			}

Why do you need to manually promote applier.vclock here?


-- 
Konstantin Osipov, Moscow, Russia, +7 903 626 22 32
http://tarantool.io - www.twitter.com/kostja_osipov

^ permalink raw reply	[flat|nested] 18+ messages in thread

end of thread, other threads:[~2019-02-08 16:50 UTC | newest]

Thread overview: 18+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2019-01-22 10:31 [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Georgy Kirichenko
2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 1/5] Do not promote wal vclock for failed writes Georgy Kirichenko
2019-01-28 11:20   ` Vladimir Davydov
2019-01-29 10:22     ` Георгий Кириченко
2019-01-29 11:58       ` Vladimir Davydov
2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 2/5] Update replicaset vclock from wal Georgy Kirichenko
2019-01-28 11:59   ` Vladimir Davydov
2019-01-29 10:33     ` [tarantool-patches] " Георгий Кириченко
2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 3/5] Enforce applier out of order protection Georgy Kirichenko
2019-01-28 12:09   ` Vladimir Davydov
2019-01-29 10:30     ` [tarantool-patches] " Георгий Кириченко
2019-01-29 12:00       ` Vladimir Davydov
2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 4/5] Emit NOP if an applier skips row Georgy Kirichenko
2019-01-28 12:15   ` Vladimir Davydov
2019-02-08 16:50   ` [tarantool-patches] " Konstantin Osipov
2019-01-22 10:31 ` [tarantool-patches] [PATCH v2 5/5] Disallow lsn gaps while vclock following Georgy Kirichenko
2019-01-28 12:18   ` Vladimir Davydov
2019-01-28 11:15 ` [tarantool-patches] [PATCH v2 0/5] Strong sequentially LSN in journal Vladimir Davydov

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox