Tarantool development patches archive
 help / color / mirror / Atom feed
* [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance
@ 2020-02-28  8:34 Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 1/4] box: expose box_is_orphan method Serge Petrenko
                   ` (5 more replies)
  0 siblings, 6 replies; 9+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:34 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: kirichenkoga, tarantool-patches

https://github.com/tarantool/tarantool/issues/4739
https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4

@ChangeLog
 - fix possible vclock ordering violations in
   master-master replication when one of the
   masters restarts (gh-4739)

Changes in v5:
 - review fixes as per reviews from Vlad and Kostja
 - added a test
 - applied Vlads test amendments
 - added a changelog

Changes in v4:
 - move row skipping logic from recovery to relay
 - encode a list of instances whose rows to skip
   in SUBSCRIBE request insead of encoding
   is_orhpan status

Changes in v3:
 - review fixes as per review from Vlad
 - instead of skipping rows on replica side,
   do it on master side, by patching recovery
   to silently follow rows coming from a certain
   instance.

Changes in v2:
- review fixes as per review from Kostja

Serge Petrenko (4):
  box: expose box_is_orphan method
  wal: warn when trying to write a record with a broken lsn
  replication: implement an instance id filter for relay
  replication: do not relay rows coming from a remote instance back to
    it

 src/box/applier.cc                            |  7 +-
 src/box/box.cc                                | 13 ++-
 src/box/box.h                                 |  3 +
 src/box/iproto_constants.h                    |  1 +
 src/box/relay.cc                              | 15 +++-
 src/box/relay.h                               |  3 +-
 src/box/wal.c                                 | 20 ++++-
 src/box/xrow.c                                | 40 ++++++++-
 src/box/xrow.h                                | 36 +++++---
 src/lib/core/cbus.h                           |  7 ++
 src/lib/core/errinj.h                         |  3 +-
 test/box/errinj.result                        |  1 +
 test/replication/gh-4739-vclock-assert.result | 88 +++++++++++++++++++
 .../gh-4739-vclock-assert.test.lua            | 36 ++++++++
 test/replication/suite.cfg                    |  1 +
 test/replication/suite.ini                    |  2 +-
 16 files changed, 250 insertions(+), 26 deletions(-)
 create mode 100644 test/replication/gh-4739-vclock-assert.result
 create mode 100644 test/replication/gh-4739-vclock-assert.test.lua

-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [Tarantool-patches] [PATCH v5 1/4] box: expose box_is_orphan method
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
@ 2020-02-28  8:34 ` Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn Serge Petrenko
                   ` (4 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:34 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: kirichenkoga, tarantool-patches

is_orphan status check is needed by applier in order to tell relay
whether to send the instance's own rows back or not.

Prerequisite #4739
---
 src/box/box.cc | 6 ++++++
 src/box/box.h  | 3 +++
 2 files changed, 9 insertions(+)

diff --git a/src/box/box.cc b/src/box/box.cc
index 9e8311d1e..5850894de 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -247,6 +247,12 @@ box_is_ro(void)
 	return is_ro || is_orphan;
 }
 
+bool
+box_is_orphan(void)
+{
+	return is_orphan;
+}
+
 int
 box_wait_ro(bool ro, double timeout)
 {
diff --git a/src/box/box.h b/src/box/box.h
index a212e6510..f37a945eb 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -105,6 +105,9 @@ box_set_ro();
 bool
 box_is_ro(void);
 
+bool
+box_is_orphan(void);
+
 /**
  * Wait until the instance switches to a desired mode.
  * \param ro wait read-only if set or read-write if unset
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 1/4] box: expose box_is_orphan method Serge Petrenko
@ 2020-02-28  8:34 ` Serge Petrenko
  2020-02-28 23:24   ` Vladislav Shpilevoy
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 3/4] replication: implement an instance id filter for relay Serge Petrenko
                   ` (3 subsequent siblings)
  5 siblings, 1 reply; 9+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:34 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: kirichenkoga, tarantool-patches

There is an assertion in vclock_follow `lsn > prev_lsn`, which doesn't
fire in release builds, of course. Let's at least warn the user on an
attemt to write a record with a duplicate or otherwise broken lsn, and
not follow such an lsn.

Follow-up #4739
---
 src/box/wal.c | 17 ++++++++++++++---
 1 file changed, 14 insertions(+), 3 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index ac977c16e..27bff662a 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -951,9 +951,20 @@ wal_assign_lsn(struct vclock *vclock_diff, struct vclock *base,
 			(*row)->tsn = tsn;
 			(*row)->is_commit = row == end - 1;
 		} else {
-			vclock_follow(vclock_diff, (*row)->replica_id,
-				      (*row)->lsn - vclock_get(base,
-							       (*row)->replica_id));
+			int64_t diff = (*row)->lsn - vclock_get(base, (*row)->replica_id);
+			if (diff <= vclock_get(vclock_diff,
+					       (*row)->replica_id)) {
+				say_crit("Attempt to write a broken LSN to WAL:"
+					 " replica id: %d, confirmed lsn: %d,"
+					 " new lsn %d", (*row)->replica_id,
+					 vclock_get(base, (*row)->replica_id) +
+					 vclock_get(vclock_diff,
+						    (*row)->replica_id),
+						    (*row)->lsn);
+				assert(false);
+			} else {
+				vclock_follow(vclock_diff, (*row)->replica_id, diff);
+			}
 		}
 	}
 }
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [Tarantool-patches] [PATCH v5 3/4] replication: implement an instance id filter for relay
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 1/4] box: expose box_is_orphan method Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn Serge Petrenko
@ 2020-02-28  8:34 ` Serge Petrenko
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 4/4] replication: do not relay rows coming from a remote instance back to it Serge Petrenko
                   ` (2 subsequent siblings)
  5 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:34 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: kirichenkoga, tarantool-patches

Add a filter for relay to skip rows coming from unwanted instances.
A list of instance ids whose rows replica doesn't want to fetch is encoded
together with SUBSCRIBE request after a freshly introduced flag IPROTO_ID_FILTER.

Filtering rows is needed to prevent an instance from fetching its own
rows from a remote master, which is useful on initial configuration and
harmful on resubscribe.

Prerequisite #4739, #3294

@TarantoolBot document

Title: document new binary protocol key and subscribe request changes

Add key `IPROTO_ID_FILTER = 0x51` to the internals reference.
This is an optional key used in SUBSCRIBE request followed by an array
of ids of instances whose rows won't be relayed to the replica.

SUBSCRIBE request is supplemented with an optional field of the
following structure:
```
+====================+
|      ID_FILTER     |
|   0x51 : ID LIST   |
| MP_INT : MP_ARRRAY |
|                    |
+====================+
```
The field is encoded only when the id list is not empty.
---
 src/box/applier.cc         |  2 +-
 src/box/box.cc             |  7 ++++---
 src/box/iproto_constants.h |  1 +
 src/box/relay.cc           | 15 +++++++++++++-
 src/box/relay.h            |  3 ++-
 src/box/xrow.c             | 40 +++++++++++++++++++++++++++++++++++---
 src/box/xrow.h             | 36 ++++++++++++++++++++++------------
 7 files changed, 83 insertions(+), 21 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index ae3d281a5..911353425 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier)
 	vclock_create(&vclock);
 	vclock_copy(&vclock, &replicaset.vclock);
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &vclock, replication_anon);
+				 &vclock, replication_anon, 0);
 	coio_write_xrow(coio, &row);
 
 	/* Read SUBSCRIBE response */
diff --git a/src/box/box.cc b/src/box/box.cc
index 5850894de..09dd67ab4 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	uint32_t replica_version_id;
 	vclock_create(&replica_clock);
 	bool anon;
-	xrow_decode_subscribe_xc(header, NULL, &replica_uuid,
-				 &replica_clock, &replica_version_id, &anon);
+	uint32_t id_filter;
+	xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock,
+				 &replica_version_id, &anon, &id_filter);
 
 	/* Forbid connection to itself */
 	if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID))
@@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header)
 	 * indefinitely).
 	 */
 	relay_subscribe(replica, io->fd, header->sync, &replica_clock,
-			replica_version_id);
+			replica_version_id, id_filter);
 }
 
 void
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index b66c05c06..f9d413a31 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -125,6 +125,7 @@ enum iproto_key {
 	IPROTO_STMT_ID = 0x43,
 	/* Leave a gap between SQL keys and additional request keys */
 	IPROTO_REPLICA_ANON = 0x50,
+	IPROTO_ID_FILTER = 0x51,
 	IPROTO_KEY_MAX
 };
 
diff --git a/src/box/relay.cc b/src/box/relay.cc
index b89632273..95245a3cf 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -109,6 +109,13 @@ struct relay {
 	struct vclock recv_vclock;
 	/** Replicatoin slave version. */
 	uint32_t version_id;
+	/**
+	 * A filter of replica ids whose rows should be ignored.
+	 * Each set filter bit corresponds to a replica id whose
+	 * rows shouldn't be relayed. The list of ids to ignore
+	 * is passed by the replica on subscribe.
+	 */
+	uint32_t id_filter;
 	/**
 	 * Local vclock at the moment of subscribe, used to check
 	 * dataset on the other side and send missing data rows if any.
@@ -676,7 +683,8 @@ relay_subscribe_f(va_list ap)
 /** Replication acceptor fiber handler. */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_clock, uint32_t replica_version_id)
+		struct vclock *replica_clock, uint32_t replica_version_id,
+		uint32_t replica_id_filter)
 {
 	assert(replica->anon || replica->id != REPLICA_ID_NIL);
 	struct relay *relay = replica->relay;
@@ -705,6 +713,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync,
 	vclock_copy(&relay->tx.vclock, replica_clock);
 	relay->version_id = replica_version_id;
 
+	relay->id_filter = replica_id_filter;
+
 	int rc = cord_costart(&relay->cord, "subscribe",
 			      relay_subscribe_f, relay);
 	if (rc == 0)
@@ -763,6 +773,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 		packet->group_id = GROUP_DEFAULT;
 		packet->bodycnt = 0;
 	}
+	/* Check if the rows from the instance are filtered. */
+	if ((1 << packet->replica_id & relay->id_filter) != 0)
+		return;
 	/*
 	 * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE
 	 * request. If this is FINAL JOIN (i.e. relay->replica is NULL),
diff --git a/src/box/relay.h b/src/box/relay.h
index e1782d78f..0632fa912 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock,
  */
 void
 relay_subscribe(struct replica *replica, int fd, uint64_t sync,
-		struct vclock *replica_vclock, uint32_t replica_version_id);
+		struct vclock *replica_vclock, uint32_t replica_version_id,
+		uint32_t replica_id_filter);
 
 #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 968c3a202..602049004 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1194,7 +1194,8 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon)
+		      const struct vclock *vclock, bool anon,
+		      uint32_t id_filter)
 {
 	memset(row, 0, sizeof(*row));
 	size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock);
@@ -1204,7 +1205,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 		return -1;
 	}
 	char *data = buf;
-	data = mp_encode_map(data, 5);
+	int filter_size = __builtin_popcount(id_filter);
+	data = mp_encode_map(data, filter_size != 0 ? 6 : 5);
 	data = mp_encode_uint(data, IPROTO_CLUSTER_UUID);
 	data = xrow_encode_uuid(data, replicaset_uuid);
 	data = mp_encode_uint(data, IPROTO_INSTANCE_UUID);
@@ -1215,6 +1217,17 @@ xrow_encode_subscribe(struct xrow_header *row,
 	data = mp_encode_uint(data, tarantool_version_id());
 	data = mp_encode_uint(data, IPROTO_REPLICA_ANON);
 	data = mp_encode_bool(data, anon);
+	if (filter_size != 0) {
+		data = mp_encode_uint(data, IPROTO_ID_FILTER);
+		data = mp_encode_array(data, filter_size);
+		struct bit_iterator it;
+		bit_iterator_init(&it, &id_filter, sizeof(id_filter),
+				  true);
+		for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX;
+		     id = bit_iterator_next(&it)) {
+			data = mp_encode_uint(data, id);
+		}
+	}
 	assert(data <= buf + size);
 	row->body[0].iov_base = buf;
 	row->body[0].iov_len = (data - buf);
@@ -1226,7 +1239,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon)
+		      uint32_t *version_id, bool *anon,
+		      uint32_t *id_filter)
 {
 	if (row->bodycnt == 0) {
 		diag_set(ClientError, ER_INVALID_MSGPACK, "request body");
@@ -1244,6 +1258,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 
 	if (anon)
 		*anon = false;
+	if (id_filter)
+		*id_filter = 0;
 	d = data;
 	uint32_t map_size = mp_decode_map(&d);
 	for (uint32_t i = 0; i < map_size; i++) {
@@ -1301,6 +1317,24 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 			}
 			*anon = mp_decode_bool(&d);
 			break;
+		case IPROTO_ID_FILTER:
+			if (id_filter == NULL)
+				goto skip;
+			if (mp_typeof(*d) != MP_ARRAY) {
+id_filter_decode_err:		xrow_on_decode_err(data, end, ER_INVALID_MSGPACK,
+						   "invalid ID_FILTER");
+				return -1;
+			}
+			uint32_t len = mp_decode_array(&d);
+			for (uint32_t i = 0; i < len; ++i) {
+				if (mp_typeof(*d) != MP_UINT)
+					goto id_filter_decode_err;
+				uint64_t val = mp_decode_uint(&d);
+				if (val >= VCLOCK_MAX)
+					goto id_filter_decode_err;
+				*id_filter |= 1 << val;
+			}
+			break;
 		default: skip:
 			mp_next(&d); /* value */
 		}
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 0973c497d..2a0a9c852 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -48,7 +48,7 @@ enum {
 	XROW_BODY_IOVMAX = 2,
 	XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX,
 	XROW_HEADER_LEN_MAX = 52,
-	XROW_BODY_LEN_MAX = 128,
+	XROW_BODY_LEN_MAX = 256,
 	IPROTO_HEADER_LEN = 28,
 	/** 7 = sizeof(iproto_body_bin). */
 	IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
@@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row,
  * @param instance_uuid Instance uuid.
  * @param vclock Replication clock.
  * @param anon Whether it is an anonymous subscribe request or not.
+ * @param id_filter A List of replica ids to skip rows from
+ *		    when feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory error.
@@ -330,7 +332,8 @@ int
 xrow_encode_subscribe(struct xrow_header *row,
 		      const struct tt_uuid *replicaset_uuid,
 		      const struct tt_uuid *instance_uuid,
-		      const struct vclock *vclock, bool anon);
+		      const struct vclock *vclock, bool anon,
+		      uint32_t id_filter);
 
 /**
  * Decode SUBSCRIBE command.
@@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row,
  * @param[out] vclock.
  * @param[out] version_id.
  * @param[out] anon Whether it is an anonymous subscribe.
+ * @param[out] id_filter A list of ids to skip rows from when
+ *			 feeding a replica.
  *
  * @retval  0 Success.
  * @retval -1 Memory or format error.
@@ -347,7 +352,8 @@ xrow_encode_subscribe(struct xrow_header *row,
 int
 xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid,
 		      struct tt_uuid *instance_uuid, struct vclock *vclock,
-		      uint32_t *version_id, bool *anon);
+		      uint32_t *version_id, bool *anon,
+		      uint32_t *id_filter);
 
 /**
  * Encode JOIN command.
@@ -371,7 +377,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid);
 static inline int
 xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL,
+				     NULL);
 }
 
 /**
@@ -386,7 +393,8 @@ static inline int
 xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid,
 		     struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -411,7 +419,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock);
 static inline int
 xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL);
 }
 
 /**
@@ -442,7 +450,8 @@ xrow_decode_subscribe_response(struct xrow_header *row,
 			       struct tt_uuid *replicaset_uuid,
 			       struct vclock *vclock)
 {
-	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL);
+	return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL,
+				     NULL, NULL);
 }
 
 /**
@@ -817,10 +826,11 @@ static inline void
 xrow_encode_subscribe_xc(struct xrow_header *row,
 			 const struct tt_uuid *replicaset_uuid,
 			 const struct tt_uuid *instance_uuid,
-			 const struct vclock *vclock, bool anon)
+			 const struct vclock *vclock, bool anon,
+			 uint32_t id_filter)
 {
 	if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, anon) != 0)
+				  vclock, anon, id_filter) != 0)
 		diag_raise();
 }
 
@@ -828,11 +838,13 @@ xrow_encode_subscribe_xc(struct xrow_header *row,
 static inline void
 xrow_decode_subscribe_xc(struct xrow_header *row,
 			 struct tt_uuid *replicaset_uuid,
-		         struct tt_uuid *instance_uuid, struct vclock *vclock,
-			 uint32_t *replica_version_id, bool *anon)
+			 struct tt_uuid *instance_uuid, struct vclock *vclock,
+			 uint32_t *replica_version_id, bool *anon,
+			 uint32_t *id_filter)
 {
 	if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid,
-				  vclock, replica_version_id, anon) != 0)
+				  vclock, replica_version_id, anon,
+				  id_filter) != 0)
 		diag_raise();
 }
 
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [Tarantool-patches] [PATCH v5 4/4] replication: do not relay rows coming from a remote instance back to it
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
                   ` (2 preceding siblings ...)
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 3/4] replication: implement an instance id filter for relay Serge Petrenko
@ 2020-02-28  8:34 ` Serge Petrenko
  2020-02-28 23:24 ` [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
  2020-03-02  4:32 ` Kirill Yukhin
  5 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko @ 2020-02-28  8:34 UTC (permalink / raw)
  To: v.shpilevoy; +Cc: kirichenkoga, tarantool-patches

We have a mechanism for restoring rows originating from an instance that
suffered a sudden power loss: remote masters resend the isntance's rows
received before a certain point in time, defined by remote master vclock
at the moment of subscribe.
However, this is useful only on initial replication configuraiton, when
an instance has just recovered, so that it can receive what it has
relayed but haven't synced to disk.
In other cases, when an instance is operating normally and master-master
replication is configured, the mechanism described above may lead to
instance re-applying instance's own rows, coming from a master it has just
subscribed to.
To fix the problem do not relay rows coming from a remote instance, if
the instance has already recovered.

Closes #4739
---
 src/box/applier.cc                            |  7 +-
 src/box/wal.c                                 |  3 +
 src/lib/core/cbus.h                           |  7 ++
 src/lib/core/errinj.h                         |  3 +-
 test/box/errinj.result                        |  1 +
 test/replication/gh-4739-vclock-assert.result | 88 +++++++++++++++++++
 .../gh-4739-vclock-assert.test.lua            | 36 ++++++++
 test/replication/suite.cfg                    |  1 +
 test/replication/suite.ini                    |  2 +-
 9 files changed, 145 insertions(+), 3 deletions(-)
 create mode 100644 test/replication/gh-4739-vclock-assert.result
 create mode 100644 test/replication/gh-4739-vclock-assert.test.lua

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 911353425..78f3d8a73 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -866,8 +866,13 @@ applier_subscribe(struct applier *applier)
 	struct vclock vclock;
 	vclock_create(&vclock);
 	vclock_copy(&vclock, &replicaset.vclock);
+	/*
+	 * Stop accepting local rows coming from a remote
+	 * instance as soon as local WAL starts accepting writes.
+	 */
+	uint32_t id_filter = box_is_orphan() ? 0 : 1 << instance_id;
 	xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID,
-				 &vclock, replication_anon, 0);
+				 &vclock, replication_anon, id_filter);
 	coio_write_xrow(coio, &row);
 
 	/* Read SUBSCRIBE response */
diff --git a/src/box/wal.c b/src/box/wal.c
index 27bff662a..1668c9348 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -1114,6 +1114,7 @@ done:
 	}
 	fiber_gc();
 	wal_notify_watchers(writer, WAL_EVENT_WRITE);
+	ERROR_INJECT_SLEEP(ERRINJ_RELAY_FASTER_THAN_TX);
 }
 
 /** WAL writer main loop.  */
@@ -1325,6 +1326,8 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events)
 	msg->events = events;
 	cmsg_init(&msg->cmsg, watcher->route);
 	cpipe_push(&watcher->watcher_pipe, &msg->cmsg);
+	ERROR_INJECT(ERRINJ_RELAY_FASTER_THAN_TX,
+		     cpipe_deliver_now(&watcher->watcher_pipe));
 }
 
 static void
diff --git a/src/lib/core/cbus.h b/src/lib/core/cbus.h
index 16d122779..f0101cb8b 100644
--- a/src/lib/core/cbus.h
+++ b/src/lib/core/cbus.h
@@ -176,6 +176,13 @@ cpipe_set_max_input(struct cpipe *pipe, int max_input)
 	pipe->max_input = max_input;
 }
 
+static inline void
+cpipe_deliver_now(struct cpipe *pipe)
+{
+	if (pipe->n_input > 0)
+		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
+}
+
 /**
  * Flush all staged messages into the pipe and eventually to the
  * consumer.
diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h
index ed0cba903..d8cdf3f27 100644
--- a/src/lib/core/errinj.h
+++ b/src/lib/core/errinj.h
@@ -136,7 +136,8 @@ struct errinj {
 	_(ERRINJ_SWIM_FD_ONLY, ERRINJ_BOOL, {.bparam = false}) \
 	_(ERRINJ_DYN_MODULE_COUNT, ERRINJ_INT, {.iparam = 0}) \
 	_(ERRINJ_FIBER_MADVISE, ERRINJ_BOOL, {.bparam = false}) \
-	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1})
+	_(ERRINJ_FIBER_MPROTECT, ERRINJ_INT, {.iparam = -1}) \
+	_(ERRINJ_RELAY_FASTER_THAN_TX, ERRINJ_BOOL, {.bparam = false}) \
 
 ENUM0(errinj_id, ERRINJ_LIST);
 extern struct errinj errinjs[];
diff --git a/test/box/errinj.result b/test/box/errinj.result
index daa27ed24..4ad24d0c1 100644
--- a/test/box/errinj.result
+++ b/test/box/errinj.result
@@ -59,6 +59,7 @@ evals
   - ERRINJ_PORT_DUMP: false
   - ERRINJ_RELAY_BREAK_LSN: -1
   - ERRINJ_RELAY_EXIT_DELAY: 0
+  - ERRINJ_RELAY_FASTER_THAN_TX: false
   - ERRINJ_RELAY_FINAL_JOIN: false
   - ERRINJ_RELAY_FINAL_SLEEP: false
   - ERRINJ_RELAY_REPORT_INTERVAL: 0
diff --git a/test/replication/gh-4739-vclock-assert.result b/test/replication/gh-4739-vclock-assert.result
new file mode 100644
index 000000000..83896c4e1
--- /dev/null
+++ b/test/replication/gh-4739-vclock-assert.result
@@ -0,0 +1,88 @@
+-- test-run result file version 2
+env = require('test_run')
+ | ---
+ | ...
+test_run = env.new()
+ | ---
+ | ...
+
+SERVERS = {'rebootstrap1', 'rebootstrap2'}
+ | ---
+ | ...
+test_run:create_cluster(SERVERS, "replication")
+ | ---
+ | ...
+test_run:wait_fullmesh(SERVERS)
+ | ---
+ | ...
+
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+fiber = require('fiber')
+ | ---
+ | ...
+-- Stop updating replicaset vclock to simulate a situation, when
+-- a row is already relayed to the remote master, but the local
+-- vclock update hasn't happened yet.
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
+ | ---
+ | - ok
+ | ...
+lsn = box.info.lsn
+ | ---
+ | ...
+f = fiber.create(function() box.space._schema:replace{'something'} end)
+ | ---
+ | ...
+f:status()
+ | ---
+ | - suspended
+ | ...
+-- Vclock isn't updated.
+box.info.lsn == lsn
+ | ---
+ | - true
+ | ...
+
+-- Wait until the remote instance gets the row.
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
+ | ---
+ | - true
+ | ...
+
+-- Restart the remote instance. This will make the first instance
+-- resubscribe without entering orphan mode.
+test_run:cmd('restart server rebootstrap2 with wait=False')
+ | ---
+ | - true
+ | ...
+test_run:cmd('switch rebootstrap1')
+ | ---
+ | - true
+ | ...
+-- Wait until resubscribe is sent
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
+ | ---
+ | - true
+ | ...
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
+ | ---
+ | - ok
+ | ...
+box.space._schema:get{'something'}
+ | ---
+ | - ['something']
+ | ...
+test_run:cmd('switch default')
+ | ---
+ | - true
+ | ...
+test_run:drop_cluster(SERVERS)
+ | ---
+ | ...
diff --git a/test/replication/gh-4739-vclock-assert.test.lua b/test/replication/gh-4739-vclock-assert.test.lua
new file mode 100644
index 000000000..5755ad752
--- /dev/null
+++ b/test/replication/gh-4739-vclock-assert.test.lua
@@ -0,0 +1,36 @@
+env = require('test_run')
+test_run = env.new()
+
+SERVERS = {'rebootstrap1', 'rebootstrap2'}
+test_run:create_cluster(SERVERS, "replication")
+test_run:wait_fullmesh(SERVERS)
+
+test_run:cmd('switch rebootstrap1')
+fiber = require('fiber')
+-- Stop updating replicaset vclock to simulate a situation, when
+-- a row is already relayed to the remote master, but the local
+-- vclock update hasn't happened yet.
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true)
+lsn = box.info.lsn
+f = fiber.create(function() box.space._schema:replace{'something'} end)
+f:status()
+-- Vclock isn't updated.
+box.info.lsn == lsn
+
+-- Wait until the remote instance gets the row.
+test_run:wait_cond(function()\
+    return test_run:get_vclock('rebootstrap2')[box.info.id] > lsn\
+end, 10)
+
+-- Restart the remote instance. This will make the first instance
+-- resubscribe without entering orphan mode.
+test_run:cmd('restart server rebootstrap2 with wait=False')
+test_run:cmd('switch rebootstrap1')
+-- Wait until resubscribe is sent
+test_run:wait_cond(function()\
+    return box.info.replication[2].upstream.status == 'sync'\
+end, 10)
+box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false)
+box.space._schema:get{'something'}
+test_run:cmd('switch default')
+test_run:drop_cluster(SERVERS)
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 429c64df3..90fd53ca6 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -15,6 +15,7 @@
     "gh-4402-info-errno.test.lua": {},
     "gh-4605-empty-password.test.lua": {},
     "gh-4606-admin-creds.test.lua": {},
+    "gh-4739-vclock-assert.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
diff --git a/test/replication/suite.ini b/test/replication/suite.ini
index ed1de3140..b4e09744a 100644
--- a/test/replication/suite.ini
+++ b/test/replication/suite.ini
@@ -3,7 +3,7 @@ core = tarantool
 script =  master.lua
 description = tarantool/box, replication
 disabled = consistent.test.lua
-release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua
+release_disabled = catch.test.lua errinj.test.lua gc.test.lua gc_no_space.test.lua before_replace.test.lua quorum.test.lua recover_missing_xlog.test.lua sync.test.lua long_row_timeout.test.lua gh-4739-vclock-assert.test.lua
 config = suite.cfg
 lua_libs = lua/fast_replica.lua lua/rlimit.lua
 use_unix_sockets = True
-- 
2.21.1 (Apple Git-122.3)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
                   ` (3 preceding siblings ...)
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 4/4] replication: do not relay rows coming from a remote instance back to it Serge Petrenko
@ 2020-02-28 23:24 ` Vladislav Shpilevoy
  2020-03-02  4:32 ` Kirill Yukhin
  5 siblings, 0 replies; 9+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-28 23:24 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches

Thanks for the patchset!

LGTM.

On 28/02/2020 09:34, Serge Petrenko wrote:
> https://github.com/tarantool/tarantool/issues/4739
> https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4
> 
> @ChangeLog
>  - fix possible vclock ordering violations in
>    master-master replication when one of the
>    masters restarts (gh-4739)
> 
> Changes in v5:
>  - review fixes as per reviews from Vlad and Kostja
>  - added a test
>  - applied Vlads test amendments
>  - added a changelog
> 
> Changes in v4:
>  - move row skipping logic from recovery to relay
>  - encode a list of instances whose rows to skip
>    in SUBSCRIBE request insead of encoding
>    is_orhpan status
> 
> Changes in v3:
>  - review fixes as per review from Vlad
>  - instead of skipping rows on replica side,
>    do it on master side, by patching recovery
>    to silently follow rows coming from a certain
>    instance.
> 
> Changes in v2:
> - review fixes as per review from Kostja
> 
> Serge Petrenko (4):
>   box: expose box_is_orphan method
>   wal: warn when trying to write a record with a broken lsn
>   replication: implement an instance id filter for relay
>   replication: do not relay rows coming from a remote instance back to
>     it
> 
>  src/box/applier.cc                            |  7 +-
>  src/box/box.cc                                | 13 ++-
>  src/box/box.h                                 |  3 +
>  src/box/iproto_constants.h                    |  1 +
>  src/box/relay.cc                              | 15 +++-
>  src/box/relay.h                               |  3 +-
>  src/box/wal.c                                 | 20 ++++-
>  src/box/xrow.c                                | 40 ++++++++-
>  src/box/xrow.h                                | 36 +++++---
>  src/lib/core/cbus.h                           |  7 ++
>  src/lib/core/errinj.h                         |  3 +-
>  test/box/errinj.result                        |  1 +
>  test/replication/gh-4739-vclock-assert.result | 88 +++++++++++++++++++
>  .../gh-4739-vclock-assert.test.lua            | 36 ++++++++
>  test/replication/suite.cfg                    |  1 +
>  test/replication/suite.ini                    |  2 +-
>  16 files changed, 250 insertions(+), 26 deletions(-)
>  create mode 100644 test/replication/gh-4739-vclock-assert.result
>  create mode 100644 test/replication/gh-4739-vclock-assert.test.lua
> 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn
  2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn Serge Petrenko
@ 2020-02-28 23:24   ` Vladislav Shpilevoy
  2020-02-29  9:22     ` Serge Petrenko
  0 siblings, 1 reply; 9+ messages in thread
From: Vladislav Shpilevoy @ 2020-02-28 23:24 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches

On 28/02/2020 09:34, Serge Petrenko wrote:
> There is an assertion in vclock_follow `lsn > prev_lsn`, which doesn't
> fire in release builds, of course. Let's at least warn the user on an
> attemt to write a record with a duplicate or otherwise broken lsn, and

attemt -> attempt

I fixed that on the branch by a force push.

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn
  2020-02-28 23:24   ` Vladislav Shpilevoy
@ 2020-02-29  9:22     ` Serge Petrenko
  0 siblings, 0 replies; 9+ messages in thread
From: Serge Petrenko @ 2020-02-29  9:22 UTC (permalink / raw)
  To: Vladislav Shpilevoy; +Cc: tarantool-patches

[-- Attachment #1: Type: text/plain, Size: 476 bytes --]


  
>Суббота, 29 февраля 2020, 2:24 +03:00 от Vladislav Shpilevoy <v.shpilevoy@tarantool.org>:
> 
>On 28/02/2020 09:34, Serge Petrenko wrote:
>> There is an assertion in vclock_follow `lsn > prev_lsn`, which doesn't
>> fire in release builds, of course. Let's at least warn the user on an
>> attemt to write a record with a duplicate or otherwise broken lsn, and
>attemt -> attempt
>
>I fixed that on the branch by a force push.
Thanks!
--
Serge Petrenko
 

[-- Attachment #2: Type: text/html, Size: 876 bytes --]

^ permalink raw reply	[flat|nested] 9+ messages in thread

* Re: [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance
  2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
                   ` (4 preceding siblings ...)
  2020-02-28 23:24 ` [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
@ 2020-03-02  4:32 ` Kirill Yukhin
  5 siblings, 0 replies; 9+ messages in thread
From: Kirill Yukhin @ 2020-03-02  4:32 UTC (permalink / raw)
  To: Serge Petrenko; +Cc: kirichenkoga, tarantool-patches, v.shpilevoy

Hello,

On 28 фев 11:34, Serge Petrenko wrote:
> https://github.com/tarantool/tarantool/issues/4739
> https://github.com/tarantool/tarantool/tree/sp/gh-4739-vclock-assert-v4
> 
> @ChangeLog
>  - fix possible vclock ordering violations in
>    master-master replication when one of the
>    masters restarts (gh-4739)
> 
> Changes in v5:
>  - review fixes as per reviews from Vlad and Kostja
>  - added a test
>  - applied Vlads test amendments
>  - added a changelog
> 
> Changes in v4:
>  - move row skipping logic from recovery to relay
>  - encode a list of instances whose rows to skip
>    in SUBSCRIBE request insead of encoding
>    is_orhpan status
> 
> Changes in v3:
>  - review fixes as per review from Vlad
>  - instead of skipping rows on replica side,
>    do it on master side, by patching recovery
>    to silently follow rows coming from a certain
>    instance.
> 
> Changes in v2:
> - review fixes as per review from Kostja
> 
> Serge Petrenko (4):
>   box: expose box_is_orphan method
>   wal: warn when trying to write a record with a broken lsn
>   replication: implement an instance id filter for relay
>   replication: do not relay rows coming from a remote instance back to
>     it

I've checked your patchset into 2.2, 2.3 and master.

--
Regards, Kirill Yukhin

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2020-03-02  4:32 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2020-02-28  8:34 [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Serge Petrenko
2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 1/4] box: expose box_is_orphan method Serge Petrenko
2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 2/4] wal: warn when trying to write a record with a broken lsn Serge Petrenko
2020-02-28 23:24   ` Vladislav Shpilevoy
2020-02-29  9:22     ` Serge Petrenko
2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 3/4] replication: implement an instance id filter for relay Serge Petrenko
2020-02-28  8:34 ` [Tarantool-patches] [PATCH v5 4/4] replication: do not relay rows coming from a remote instance back to it Serge Petrenko
2020-02-28 23:24 ` [Tarantool-patches] [PATCH v5 0/4] replication: fix applying of rows originating from local instance Vladislav Shpilevoy
2020-03-02  4:32 ` Kirill Yukhin

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox